From d34aefe9de4a62f8cb0cc0d785e7d23a4f8ca859 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 20 May 2026 10:37:25 +0200 Subject: [PATCH 1/4] Updated to recent ouroboros-network Based on `coot/tracing-instances-1` branch from `ouroboros-network`. To use `main` branch we need to wait for `cardano-crypto-class`, `ouroboros-consensus`, `kes-agent-crypto` to support `contra-tracer-0.2.1.0`. --- cabal.project | 32 +++++++++++++++---- dmq-node/dmq-node.cabal | 2 +- dmq-node/src/DMQ/Diffusion/NodeKernel.hs | 6 ++-- .../src/DMQ/Diffusion/NodeKernel/Types.hs | 6 ++-- dmq-node/src/DMQ/NodeToClient/Version.hs | 6 ++-- dmq-node/src/DMQ/NodeToNode.hs | 4 +-- dmq-node/src/DMQ/NodeToNode/Version.hs | 5 ++- dmq-node/src/DMQ/Tracer.hs | 4 --- .../DMQ/Protocol/LocalMsgNotification/Test.hs | 4 --- dmq-node/test/Test/DMQ/SigSubmission/Types.hs | 3 ++ flake.lock | 12 +++---- 11 files changed, 52 insertions(+), 32 deletions(-) diff --git a/cabal.project b/cabal.project index ca730fe9..67c77eea 100644 --- a/cabal.project +++ b/cabal.project @@ -15,10 +15,10 @@ repository cardano-haskell-packages -- repeat the index-state for hackage to work around haskell.nix parsing limitation index-state: -- Bump this if you need newer packages from Hackage - , hackage.haskell.org 2026-02-17T10:15:41Z + , hackage.haskell.org 2026-05-13T07:31:22Z -- Bump this if you need newer packages from CHaP - , cardano-haskell-packages 2026-04-07T08:02:00Z + , cardano-haskell-packages 2026-05-11T20:15:43Z active-repositories: , :rest @@ -44,11 +44,31 @@ if impl(ghc >= 9.12.0) -- rejecting: cardano-crypto-class-2.3.1.0 (conflict: cardano-crypto-tests => cardano-crypto-class>=2.2.2 && <2.2.4) -- allow-newer: cardano-crypto-tests:cardano-crypto-class +-- ouroboros-network with PR#5376, but without contra-tracer-0.2.1.0 (PR#5368) source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: f2d88695147d9f236669caa214e11dba6b75eadc - --sha256: sha256-mRP9WxZ4xPOus/nFzSOMXCYopBGZcERDk+QqOD1LmQc= - subdir: ouroboros-network + tag: 5a80699bca834a4de47aca4fe4d322832309e374 + --sha256: sha256-FXp0gIfsUDVEE4ZuEM5y55qx0I7d5+/wQ7AjUz6UDcE= + subdir: network-mux + cardano-diffusion + ouroboros-network + +source-repository-package + type: git + location: https://github.com/IntersectMBO/ouroboros-consensus + tag: 9a44d4bb1e8316f2ceb60c68f4bef712754e470a + --sha256: sha256-hnbQJ91B6WDWA6/u76Q16YWr3T4hWgvc3n3XfUX5ZMc= + +-- https://github.com/bgamari/monoidal-containers/pull/112 +source-repository-package + type: git + location: https://github.com/coot/monoidal-containers + tag: 8319c34e260c7b44adaf0ac04074b3e37e510cea + --sha256: sha256-HkQC2Hb1OAyN8mI2BPcIylcUYF0Mts4DrffJ7Fkkh6Q= + +constraints: semialign >= 1.4, + validation < 1.2 +allow-newer: hedgehog-quickcheck:QuickCheck, + monoidal-containers:semialign -allow-newer: ouroboros-network:trace-dispatcher diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index d05ff97e..67a19fa8 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -131,7 +131,7 @@ library nothunks, optparse-applicative >=0.18 && <0.20, ouroboros-consensus:{ouroboros-consensus, cardano, diffusion}, - ouroboros-network:{ouroboros-network, api, framework, framework-tracing, orphan-instances, protocols, tracing} ^>=1.1.0.0, + ouroboros-network:{ouroboros-network, api, framework, orphan-instances, protocols, tracing} ^>=1.1.0.0, psqueues, quiet, random ^>=1.3, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index 6abb1672..a716ad02 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -28,7 +28,7 @@ import Data.Void (Void) import System.Random (StdGen) import System.Random qualified as Random -import Ouroboros.Network.BlockFetch (newFetchClientRegistry) +import Ouroboros.Network.KeepAlive (newKeepAliveRegistry) import Ouroboros.Network.Magic (NetworkMagic (..)) import Ouroboros.Network.PeerSelection.Governor.Types (makePublicPeerSelectionStateVar) @@ -56,7 +56,7 @@ newNodeKernel :: forall crypto ntnAddr m. newNodeKernel rng = do publicPeerSelectionStateVar <- makePublicPeerSelectionStateVar - fetchClientRegistry <- newFetchClientRegistry + keepAliveRegistry <- newKeepAliveRegistry peerSharingRegistry <- newPeerSharingRegistry mempool <- Mempool.empty @@ -97,7 +97,7 @@ newNodeKernel rng = do peerMetric <- mkPeerMetric - pure NodeKernel { fetchClientRegistry + pure NodeKernel { keepAliveRegistry , peerSharingRegistry , peerSharingAPI , mempool diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs index ba5cfca7..f73740a6 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs @@ -18,8 +18,8 @@ import System.Random (StdGen) import Cardano.Ledger.Api.State.Query qualified as LedgerQuery import Cardano.Ledger.Shelley.API qualified as Ledger -import Ouroboros.Network.BlockFetch (FetchClientRegistry) import Ouroboros.Network.ConnectionId (ConnectionId (..)) +import Ouroboros.Network.KeepAlive (KeepAliveRegistry) import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot, LedgerPeersKind (..)) import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry) @@ -32,8 +32,8 @@ import DMQ.Protocol.SigSubmission.Type (Sig, SigId) data NodeKernel crypto ntnAddr m = NodeKernel { - -- | The fetch client registry, used for the keep alive clients. - fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m) + -- | The keep alive registry, used for the keep alive clients. + keepAliveRegistry :: !(KeepAliveRegistry (ConnectionId ntnAddr) m) -- | Read the current peer sharing registry, used for interacting with -- the PeerSharing protocol diff --git a/dmq-node/src/DMQ/NodeToClient/Version.hs b/dmq-node/src/DMQ/NodeToClient/Version.hs index a702b5e0..cd8d74ae 100644 --- a/dmq-node/src/DMQ/NodeToClient/Version.hs +++ b/dmq-node/src/DMQ/NodeToClient/Version.hs @@ -33,11 +33,13 @@ import Ouroboros.Network.Handshake.Acceptable (Acceptable (..)) import Ouroboros.Network.Handshake.Queryable (Queryable (..)) import Ouroboros.Network.Magic (NetworkMagic (..)) import Ouroboros.Network.Protocol.Handshake (Accept (..), Handshake) +import Ouroboros.Network.Util (PrettyShow (..)) data NodeToClientVersion = NodeToClientV_1 - deriving (Eq, Ord, Enum, Bounded, Show, Generic, NFData) + deriving stock (Eq, Ord, Enum, Bounded, Show, Generic) + deriving anyclass (NFData, PrettyShow) instance Aeson.ToJSON NodeToClientVersion where toJSON NodeToClientV_1 = Aeson.toJSON (1 :: Int) @@ -84,7 +86,7 @@ data NodeToClientVersionData = NodeToClientVersionData , query :: !Bool } deriving stock (Eq, Show, Generic) - deriving anyclass NFData + deriving anyclass (NFData, PrettyShow) instance Aeson.ToJSON NodeToClientVersionData where toJSON NodeToClientVersionData { diff --git a/dmq-node/src/DMQ/NodeToNode.hs b/dmq-node/src/DMQ/NodeToNode.hs index 9d2111c4..09be7004 100644 --- a/dmq-node/src/DMQ/NodeToNode.hs +++ b/dmq-node/src/DMQ/NodeToNode.hs @@ -203,7 +203,7 @@ ntnApps mempoolWriter sigSize NodeKernel { - fetchClientRegistry + keepAliveRegistry , peerSharingRegistry , peerSharingAPI , sigChannelVar @@ -387,7 +387,7 @@ ntnApps dqCtx (KeepAliveInterval 10) - ((), trailing) <- bracketKeepAliveClient fetchClientRegistry connId kacApp + ((), trailing) <- bracketKeepAliveClient keepAliveRegistry connId kacApp return ((), trailing) aKeepAliveServer diff --git a/dmq-node/src/DMQ/NodeToNode/Version.hs b/dmq-node/src/DMQ/NodeToNode/Version.hs index 3d24b105..78151f09 100644 --- a/dmq-node/src/DMQ/NodeToNode/Version.hs +++ b/dmq-node/src/DMQ/NodeToNode/Version.hs @@ -33,6 +33,7 @@ import Ouroboros.Network.Handshake.Queryable (Queryable (..)) import Ouroboros.Network.Magic (NetworkMagic (..)) import Ouroboros.Network.PeerSelection (PeerSharing (..)) import Ouroboros.Network.Protocol.Handshake (Accept (..), Handshake (..)) +import Ouroboros.Network.Util (PrettyShow (..)) import Ouroboros.Network.OrphanInstances () @@ -42,6 +43,8 @@ data NodeToNodeVersion | NodeToNodeV_2 deriving (Eq, Ord, Enum, Bounded, Show, Generic, NFData) +instance PrettyShow NodeToNodeVersion where + instance Aeson.ToJSON NodeToNodeVersion where toJSON NodeToNodeV_1 = Aeson.toJSON (1 :: Int) toJSON NodeToNodeV_2 = Aeson.toJSON (2 :: Int) @@ -75,7 +78,7 @@ data NodeToNodeVersionData = NodeToNodeVersionData , query :: !Bool } deriving stock (Show, Eq, Generic) - deriving anyclass NFData + deriving anyclass (NFData, PrettyShow) instance Aeson.ToJSON NodeToNodeVersionData where toJSON NodeToNodeVersionData { diff --git a/dmq-node/src/DMQ/Tracer.hs b/dmq-node/src/DMQ/Tracer.hs index 1f458f91..585e75b1 100644 --- a/dmq-node/src/DMQ/Tracer.hs +++ b/dmq-node/src/DMQ/Tracer.hs @@ -41,8 +41,6 @@ import Network.Socket (HostName, PortNumber) import Network.TypedProtocol.Codec (AnyMessage (..)) import Ouroboros.Network.Tracing () -import Ouroboros.Network.Tracing.PeerSelection () -import Ouroboros.Network.Tracing.TxSubmission () import Ouroboros.Network.ConnectionId import Ouroboros.Network.Diffusion qualified as Diffusion @@ -58,8 +56,6 @@ import Ouroboros.Network.Protocol.PeerSharing.Type qualified as PS import Ouroboros.Network.Protocol.TxSubmission2.Type (TxSubmission2) import Ouroboros.Network.Protocol.TxSubmission2.Type qualified as STX import Ouroboros.Network.Snocket (RemoteAddress) -import Ouroboros.Network.Tracing.TxSubmission.Inbound () -import Ouroboros.Network.Tracing.TxSubmission.Outbound () import Ouroboros.Network.TxSubmission.Inbound.V2 (TraceTxLogic) import Ouroboros.Network.TxSubmission.Inbound.V2.Types (TraceTxSubmissionInbound) diff --git a/dmq-node/test/DMQ/Protocol/LocalMsgNotification/Test.hs b/dmq-node/test/DMQ/Protocol/LocalMsgNotification/Test.hs index 2829fe09..df894aeb 100644 --- a/dmq-node/test/DMQ/Protocol/LocalMsgNotification/Test.hs +++ b/dmq-node/test/DMQ/Protocol/LocalMsgNotification/Test.hs @@ -41,7 +41,6 @@ import Ouroboros.Network.Channel import Ouroboros.Network.Driver.Simple import Ouroboros.Network.Protocol.Codec.Utils (WithBytes (..)) import Ouroboros.Network.Protocol.Codec.Utils qualified as Utils -import Ouroboros.Network.Util.ShowProxy (ShowProxy (..)) import Test.Ouroboros.Network.Protocol.Utils import Test.Ouroboros.Network.Utils @@ -226,9 +225,6 @@ instance NFData (WithBytes Msg) where `seq` rnf cborPayload -instance ShowProxy (WithBytes Msg) where - showProxy _ = "WithBytes Msg" - codec :: MonadST m => LocalMsgNotificationCodec m MsgWithBytes codec = codecLocalMsgNotification' Utils.runWithByteSpan encodeMsg decodeMsg diff --git a/dmq-node/test/Test/DMQ/SigSubmission/Types.hs b/dmq-node/test/Test/DMQ/SigSubmission/Types.hs index ee8fe518..71856cd2 100644 --- a/dmq-node/test/Test/DMQ/SigSubmission/Types.hs +++ b/dmq-node/test/Test/DMQ/SigSubmission/Types.hs @@ -71,6 +71,7 @@ sigSubmissionCodec2 = codecSigSubmissionV2 CBOR.encodeInt CBOR.decodeInt encodeSig decodeSig where + encodeSig :: Tx TxId -> CBOR.Encoding encodeSig Tx {getTxId, getTxSize, getTxAdvSize, getTxValid} = CBOR.encodeListLen 4 <> CBOR.encodeInt getTxId @@ -78,12 +79,14 @@ sigSubmissionCodec2 = <> CBOR.encodeWord32 (getSizeInBytes getTxAdvSize) <> CBOR.encodeBool getTxValid + decodeSig :: CBOR.Decoder s (Tx TxId) decodeSig = do _ <- CBOR.decodeListLen Tx <$> CBOR.decodeInt <*> (SizeInBytes <$> CBOR.decodeWord32) <*> (SizeInBytes <$> CBOR.decodeWord32) <*> CBOR.decodeBool + <*> pure Nothing data SigSubmissionState = diff --git a/flake.lock b/flake.lock index 1f26cfa9..6476700e 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "CHaP": { "flake": false, "locked": { - "lastModified": 1775608993, - "narHash": "sha256-AHi1GtyIWNox83Hzwdt9lyaTCI2OK7DuztJtBVIzRsQ=", + "lastModified": 1779260980, + "narHash": "sha256-5SvJkobPmCtSYKwQWu1M/iFXj3g1PDy2bMJ9g1vm/No=", "owner": "IntersectMBO", "repo": "cardano-haskell-packages", - "rev": "32b5f2ab66b36dd3acf57641ef14a192d1bb6f3b", + "rev": "4b6a661aa6463529c0ab0207567d62f85a6b899c", "type": "github" }, "original": { @@ -155,11 +155,11 @@ "hackage": { "flake": false, "locked": { - "lastModified": 1772001687, - "narHash": "sha256-ptZzrJLnqA7P4AYYFv4mA2f+xTmXsHclTFhe6aRmwLs=", + "lastModified": 1779358087, + "narHash": "sha256-+tHfNr4EZabb5HaEzHXIOmG6WLkImGfG5DJg/JwKBE4=", "owner": "input-output-hk", "repo": "hackage.nix", - "rev": "e12ead50fba1c5b6e9e7e8b31680550ef38f3242", + "rev": "5ee161948ef9f17ff778d86e03eae8f10e3fd1f1", "type": "github" }, "original": { From e8f2a1e5b675c7b41d5e7160f03b4a70d496111b Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 20 May 2026 14:45:53 +0200 Subject: [PATCH 2/4] signature validation: modification Removed the case where we validate signatures based on mark set. Now signatures validation fails with `SigExipred` only in the `ZeroSetSnapshot` case. Because of that also the other case where `ssMarkSet` was used becomes obsolete, and thus we don't use the mark set at all. If a pool reaches zero stake in epoch `i`, then it will still get rewards in the epochs `i+1` and `i+2`, so there's nothing wrong allowing it to mint signatures in the epoch `i+1`, e.g. there's a natural inertia in the system based on trust in the stake snapshots. --- .../src/DMQ/Diffusion/NodeKernel/Types.hs | 4 +++ .../DMQ/NodeToClient/LocalStateQueryClient.hs | 14 +++++---- .../DMQ/Protocol/SigSubmission/Validate.hs | 30 ++++--------------- 3 files changed, 18 insertions(+), 30 deletions(-) diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs index f73740a6..58815a2c 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs @@ -56,6 +56,10 @@ type PoolId = Ledger.KeyHash Ledger.StakePool data StakePools m = StakePools { -- | contains map of cardano pool stake snapshot obtained -- via local state query client + -- + -- NOTE: StakeSnapshot is taken from `VolatileTip`, this means that + -- `ssMarkSet` is not safe to be used as we could be on an adversarial fork + -- which crossed boundary. stakePoolsVar :: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot)) -- | Acquire and update validation context for signature validation diff --git a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs index e19e8b13..0d92d5e3 100644 --- a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs +++ b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs @@ -58,10 +58,13 @@ data QueryError = UnsupportedEra instance Exception QueryError where --- TODO generalize to handle ledger eras other than Conway --- | connects the dmq node to cardano node via local state query --- and updates the node kernel with stake pool data necessary to perform message --- validation +-- | Connect the dmq node to cardano node via local state query protocol and +-- update the node kernel with stake pool data necessary to perform mithril +-- signature validation +-- +-- NOTE: we are querying using VolatileTip, e.g. for stake snapshot this +-- means that the mark set should not be trusted as it might be different +-- on different forks. -- cardanoClient :: forall block query point crypto m. (MonadDelay m, MonadSTM m, MonadThrow m, MonadTime m) @@ -82,8 +85,7 @@ cardanoClient tracer ledgerPeers where idle mSystemStart = do traceWith tracer $ Acquiring mSystemStart - -- FIXME: switched to volatiletip for prerelease testing purposes - pure $ SendMsgAcquire VolatileTip {-ImmutableTip-} acquire + pure $ SendMsgAcquire VolatileTip acquire where acquire :: ClientStAcquiring block point query m Void acquire = ClientStAcquiring { diff --git a/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs b/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs index 3e179544..a2e0d3f7 100644 --- a/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs +++ b/dmq-node/src/DMQ/Protocol/SigSubmission/Validate.hs @@ -36,7 +36,10 @@ import Cardano.Crypto.Hash.Class (castHash, hashWith) import Cardano.Crypto.KES.Class (KESAlgorithm (..)) import Cardano.KESAgent.KES.Crypto as KES import Cardano.KESAgent.KES.OCert (OCert (..), OCertSignable, validateOCert) -import Cardano.Ledger.Api.State.Query (StakeSnapshot (..)) +-- NOTE: one should be careful with `ssMarkPool` in this module, so it is not +-- imported, see a note in +-- `DMQ.NodeToClient.LocalStateQueryClient.cardanoClient`. +import Cardano.Ledger.Api.State.Query (StakeSnapshot (ssSetPool)) import Cardano.Ledger.BaseTypes.NonZero qualified as Ledger import Cardano.Ledger.Keys qualified as Ledger @@ -50,13 +53,10 @@ c_MAX_CLOCK_SKEW_SEC = 5 pattern NotZeroSetSnapshot :: StakeSnapshot pattern NotZeroSetSnapshot <- (Ledger.isZero . ssSetPool -> False) -pattern NotZeroMarkSnapshot :: StakeSnapshot -pattern NotZeroMarkSnapshot <- (Ledger.isZero . ssMarkPool -> False) - pattern ZeroSetSnapshot :: StakeSnapshot pattern ZeroSetSnapshot <- (Ledger.isZero . ssSetPool -> True) -{-# COMPLETE NotZeroSetSnapshot, NotZeroMarkSnapshot, ZeroSetSnapshot #-} +{-# COMPLETE NotZeroSetSnapshot, ZeroSetSnapshot #-} validateSigId :: Sig crypto -> Bool @@ -129,28 +129,10 @@ validateSig now sigs ctx0 = | otherwise -> left UnrecognizedPool - Just ss@NotZeroSetSnapshot -> + Just NotZeroSetSnapshot -> if | now <= addUTCTime c_MAX_CLOCK_SKEW_SEC nextEpoch -> return () - -- local-state-query is late, but the pool is about to expire - | Ledger.isZero (ssMarkPool ss) - -> left SigExpired - - | otherwise - -> left ClockSkew - - Just NotZeroMarkSnapshot -> - -- we take abs time in case we're late with our own local-state-query - -- update, and/or the other side's clock is ahead, and we're just - -- about or have just crossed the epoch and the pool is expected to - -- move into the set mark - if | abs (diffUTCTime nextEpoch now) <= c_MAX_CLOCK_SKEW_SEC - -> return () - - | diffUTCTime nextEpoch now > c_MAX_CLOCK_SKEW_SEC - -> left PoolNotEligible - | otherwise -> left ClockSkew From b4df9165444c8d7fc1db6890ae50eb5f728bd675 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 20 May 2026 16:27:04 +0200 Subject: [PATCH 3/4] Added changelog entry --- cabal.project | 4 +-- ...20_161705_coot_ouroboros_network_update.md | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 dmq-node/changelog.d/20260520_161705_coot_ouroboros_network_update.md diff --git a/cabal.project b/cabal.project index 67c77eea..b3f83016 100644 --- a/cabal.project +++ b/cabal.project @@ -57,8 +57,8 @@ source-repository-package source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-consensus - tag: 9a44d4bb1e8316f2ceb60c68f4bef712754e470a - --sha256: sha256-hnbQJ91B6WDWA6/u76Q16YWr3T4hWgvc3n3XfUX5ZMc= + tag: ec9df6ee1d7ab918e75219107fc17e1b2cf1a0e9 + --sha256: sha256-c+yGPftvlas2CcnFLUPPqjnitz5yTu/Qud5LaSMyY34= -- https://github.com/bgamari/monoidal-containers/pull/112 source-repository-package diff --git a/dmq-node/changelog.d/20260520_161705_coot_ouroboros_network_update.md b/dmq-node/changelog.d/20260520_161705_coot_ouroboros_network_update.md new file mode 100644 index 00000000..eb8017b7 --- /dev/null +++ b/dmq-node/changelog.d/20260520_161705_coot_ouroboros_network_update.md @@ -0,0 +1,31 @@ + + +### Breaking + +- Using `KeepAliveRegistry` instead of `FetchClientRegistry` introduced in + a recent `ouroboros-network` PR. The `fetchClientRegistry` field of + `NodeKernel` was replaced with `keepAliveRegistry` field. + +### Non-Breaking + +- Added `PrettyShow` instances for + - `NodeToNodeVersion` + - `NodeToNodeVersionData` + - `NodeToClientVersion` + - `NodeToClientVersionData` +- Signature validation changed, we're no longer using the `mark set`, pools + with no stake will be able to mint signatures as long as they have non zero + stake in the `set set`. + + + From dc9d778f12d9739ae6afb4e79a52964ac778c5f6 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 4 Mar 2026 09:09:10 +0100 Subject: [PATCH 4/4] Connect to cardano-node in dmq kernel. --- cabal.project | 1 + dmq-node/app/Main.hs | 17 +- dmq-node/dmq-node.cabal | 1 + dmq-node/src/DMQ/Diffusion/NodeKernel.hs | 141 ++++++++++++-- .../DMQ/NodeToClient/LocalStateQueryClient.hs | 174 ++++++------------ 5 files changed, 189 insertions(+), 145 deletions(-) diff --git a/cabal.project b/cabal.project index b3f83016..20e18500 100644 --- a/cabal.project +++ b/cabal.project @@ -53,6 +53,7 @@ source-repository-package subdir: network-mux cardano-diffusion ouroboros-network + acts-generic source-repository-package type: git diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index e4fbcfe2..9d3a9188 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -50,7 +50,6 @@ import DMQ.Diffusion.NodeKernel as NodeKernel import DMQ.Diffusion.PeerSelectionPolicy (policy) import DMQ.Handlers.TopLevel (toplevelExceptionHandler) import DMQ.NodeToClient qualified as NtC -import DMQ.NodeToClient.LocalStateQueryClient import DMQ.NodeToNode (NodeToNodeVersion, dmqCodecs, dmqLimitsAndTimeouts, ntnApps) import DMQ.Policy qualified as Policy @@ -93,8 +92,7 @@ runDMQ commandLineConfig = do dmqConfig@Configuration { dmqcTopologyFile = I topologyFile, dmqcCardanoNodeSocket = I socketPath, - dmqcVersion = I version, - dmqcLedgerPeers = I ledgerPeers + dmqcVersion = I version } = fromRight mempty config' <> commandLineConfig `act` @@ -122,7 +120,6 @@ runDMQ commandLineConfig = do ( dmqTracers@DMQTracers { dmqStartupTracer, - localStateQueryClientTracer, sigValidationTracer, localSigValidationTracer, cardanoNodeHandshakeTracer @@ -170,18 +167,12 @@ runDMQ commandLineConfig = do -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port. withIOManager \iocp -> do - let localSnocket' = localSnocket iocp - mkStakePoolMonitor = connectToCardanoNode - localStateQueryClientTracer - ledgerPeers - localSnocket' - socketPath - withNodeKernel @StandardCrypto dmqTracers + (localSnocket iocp) + makeLocalBearer dmqConfig - psRng - mkStakePoolMonitor $ \nodeKernel -> do + psRng $ \nodeKernel -> do dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt nodeKernel.stakePools.ledgerBigPeersVar diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index 67a19fa8..579329e7 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -111,6 +111,7 @@ library cardano-ledger-byron, cardano-ledger-core, cardano-ledger-shelley, + cardano-protocol-tpraos, cardano-slotting, cardano-strict-containers, cborg >=0.2.1 && <0.3, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index a716ad02..e4268231 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -1,15 +1,19 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE PackageImports #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE PackageImports #-} +{-# LANGUAGE RankNTypes #-} module DMQ.Diffusion.NodeKernel ( module DMQ.Diffusion.NodeKernel.Types , withNodeKernel ) where +import Control.Applicative (Alternative) import Control.Concurrent.Class.MonadMVar import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadST import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI @@ -18,22 +22,44 @@ import "contra-tracer" Control.Tracer (nullTracer) import Data.Function (on) import Data.Hashable import Data.Map.Strict qualified as Map +import Data.Proxy import Data.Sequence (Seq) import Data.Sequence qualified as Seq import Data.Set (Set) import Data.Set qualified as Set import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX qualified as Time -import Data.Void (Void) +import Data.Void (Void, absurd) import System.Random (StdGen) import System.Random qualified as Random +import Network.Mux qualified as Mx + +import Cardano.Chain.Slotting (EpochSlots (..)) +import Cardano.Network.NodeToClient qualified as Cardano.NtoC +import Cardano.Protocol.Crypto qualified as Cardano (StandardCrypto) + +import Ouroboros.Consensus.Cardano.Block (CardanoBlock) +import Ouroboros.Consensus.Cardano.Node +import Ouroboros.Consensus.Network.NodeToClient +import Ouroboros.Consensus.Node.NetworkProtocolVersion +import Ouroboros.Consensus.Node.ProtocolInfo + +import Ouroboros.Network.Handshake.Queryable (Queryable (..)) import Ouroboros.Network.KeepAlive (newKeepAliveRegistry) -import Ouroboros.Network.Magic (NetworkMagic (..)) +import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.Governor.Types (makePublicPeerSelectionStateVar) import Ouroboros.Network.PeerSharing (newPeerSharingAPI, newPeerSharingRegistry, ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME) +import Ouroboros.Network.Protocol.Handshake (Acceptable (..)) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.LocalStateQuery.Client +import Ouroboros.Network.Protocol.LocalStateQuery.Type +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath) +import Ouroboros.Network.Socket (ConnectToArgs (..), + HandshakeCallbacks (HandshakeCallbacks), connectToNode) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..), MempoolSeq (..), WithIndex (..)) @@ -42,10 +68,12 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool import DMQ.Configuration import DMQ.Diffusion.NodeKernel.Types import DMQ.Diffusion.PeerSelection.PeerMetric (mkPeerMetric) +import DMQ.NodeToClient.LocalStateQueryClient import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId) import DMQ.Tracer + newNodeKernel :: forall crypto ntnAddr m. ( MonadLabelledSTM m , MonadMVar m @@ -111,30 +139,43 @@ newNodeKernel rng = do withNodeKernel :: forall crypto ntnAddr ntcAddr m a. - ( MonadAsync m - , MonadFork m - , MonadDelay m - , MonadLabelledSTM m - , MonadMask m - , MonadMVar m - , MonadTime m + ( Alternative (STM m) + , MonadAsync m + , MonadEvaluate m + , MonadFork m + , MonadDelay m + , MonadLabelledSTM m + , MonadMask m + , MonadMVar m + , Mx.MonadReadBuffer m + , MonadST m + , MonadThrow (STM m) + , MonadTime m + , MonadTimer m , Ord ntnAddr , Hashable ntnAddr ) => DMQTracers crypto ntnAddr ntcAddr m + -> Snocket m Cardano.NtoC.LocalSocket LocalAddress + -> Mx.MakeBearer m Cardano.NtoC.LocalSocket -> Configuration -> StdGen - -> (NetworkMagic -> NodeKernel crypto ntnAddr m -> m (Either SomeException Void)) -> (NodeKernel crypto ntnAddr m -> m a) -- ^ as soon as the callback exits the `mempoolWorker` and all -- decision logic threads will be killed -> m a -withNodeKernel DMQTracers { sigSubmissionLogicTracer } +withNodeKernel DMQTracers { sigSubmissionLogicTracer, + localStateQueryClientTracer + } + localSnocket + mkLocalBearer Configuration { - dmqcCardanoNetworkMagic = I networkMagic + dmqcCardanoNetworkMagic = I networkMagic, + dmqcCardanoNodeSocket = I cardanoNodeSocketPath, + dmqcLedgerPeers = I ledgerPeers } rng - mkStakePoolMonitor k = do + k = do nodeKernel@NodeKernel { mempool, sigChannelVar, sigSharedTxStateVar @@ -149,11 +190,77 @@ withNodeKernel DMQTracers { sigSubmissionLogicTracer } sigChannelVar sigSharedTxStateVar) $ \sigLogicThread -> - withAsync (mkStakePoolMonitor networkMagic nodeKernel) \spmAid -> do + withAsync (connectToCardanoNode nodeKernel) \spmAid -> do link mempoolThread link sigLogicThread link spmAid k nodeKernel + where + connectToCardanoNode :: NodeKernel crypto ntnAddr m + -> m (Either SomeException Void) + connectToCardanoNode nodeKernel = + fmap fn <$> + connectToNode + localSnocket + mkLocalBearer + ConnectToArgs { + ctaHandshakeCodec = Cardano.NtoC.nodeToClientHandshakeCodec, + ctaHandshakeTimeLimits = noTimeLimitsHandshake, + ctaVersionDataCodec = cborTermVersionDataCodec Cardano.NtoC.nodeToClientCodecCBORTerm, + ctaConnectTracers = Cardano.NtoC.nullNetworkConnectTracers, --debuggingNetworkConnectTracers, + ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion + } + (\_ -> return ()) + (Cardano.NtoC.combineVersions + [ Cardano.NtoC.simpleSingletonVersions + version + Cardano.NtoC.NodeToClientVersionData { + Cardano.NtoC.networkMagic + , Cardano.NtoC.query = False + } + \_version -> + Mx.OuroborosApplication + [ Mx.MiniProtocol + { Mx.miniProtocolNum = Mx.MiniProtocolNum 7 + , Mx.miniProtocolStart = Mx.StartEagerly + , Mx.miniProtocolLimits = + Mx.MiniProtocolLimits + { Mx.maximumIngressQueue = 0xffffffff + } + , Mx.miniProtocolRun = + Mx.InitiatorProtocolOnly + . Mx.mkMiniProtocolCbFromPeerSt + . const + $ ( nullTracer -- TODO: add tracer + , cStateQueryCodec + , StateIdle + , localStateQueryClientPeer $ + cardanoLocalStateQueryClient + localStateQueryClientTracer + ledgerPeers + (stakePools nodeKernel) + (nextEpochVar nodeKernel) + ) + } + ] + | version <- [minBound..maxBound] + , let -- NOTE: the query protocol is running using + -- `Cardano.StandardCrypto`, while `dmq-node` is using + -- `StandardCrypto` defined in `kes-agent-krypto`. A priori + -- cryptography could differ but it shouldn't be a problem. We + -- are querying + supportedVersionMap = + supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock Cardano.StandardCrypto)) + blk = supportedVersionMap Map.! version + Codecs {cStateQueryCodec} = + clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) + blk version + ]) + Nothing + (localAddressFromPath cardanoNodeSocketPath) + where + fn :: forall x. Either x Void -> x + fn = either id absurd mempoolWorker :: forall crypto m. diff --git a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs index 0d92d5e3..7c875f2d 100644 --- a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs +++ b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs @@ -5,8 +5,8 @@ module DMQ.NodeToClient.LocalStateQueryClient ( TraceLocalStateQueryClient (..) - , cardanoClient - , connectToCardanoNode + , CardanoLocalStateQueryClient + , cardanoLocalStateQueryClient ) where import Control.Concurrent.Class.MonadSTM.Strict @@ -15,38 +15,28 @@ import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Monad.Trans.Except -import "contra-tracer" Control.Tracer (Tracer, nullTracer, traceWith) +import "contra-tracer" Control.Tracer (Tracer, traceWith) import Data.Functor ((<&>)) import Data.List.NonEmpty qualified as NonEmpty -import Data.Map.Strict qualified as Map -import Data.Proxy import Data.Void -import Cardano.Chain.Slotting (EpochSlots (..)) import Cardano.Ledger.Api.State.Query (StakeSnapshots (..)) -import Cardano.Network.NodeToClient import Cardano.Network.PeerSelection (LedgerPeerSnapshot (..), LedgerRelayAccessPoint (..), SingLedgerPeersKind (..)) import Cardano.Slotting.EpochInfo.API import Cardano.Slotting.Time -import DMQ.Diffusion.NodeKernel +import DMQ.Diffusion.NodeKernel.Types (StakePools (..)) + import Ouroboros.Consensus.Cardano.Block -import Ouroboros.Consensus.Cardano.Node import Ouroboros.Consensus.HardFork.Combinator.Ledger.Query import Ouroboros.Consensus.HardFork.History.EpochInfo (interpreterToEpochInfo) import Ouroboros.Consensus.Ledger.Query (Query (..)) -import Ouroboros.Consensus.Network.NodeToClient -import Ouroboros.Consensus.Node.NetworkProtocolVersion -import Ouroboros.Consensus.Node.ProtocolInfo import Ouroboros.Consensus.Shelley.Ledger.Query import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () import Ouroboros.Network.Block -import Ouroboros.Network.Magic -import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersKind (..), - accumulateBigLedgerStake) -import Ouroboros.Network.PeerSelection.LedgerPeers.Type (RawBlockHash) + RawBlockHash, accumulateBigLedgerStake) import Ouroboros.Network.Point (Block (..)) import Ouroboros.Network.Protocol.LocalStateQuery.Client import Ouroboros.Network.Protocol.LocalStateQuery.Type @@ -58,36 +48,66 @@ data QueryError = UnsupportedEra instance Exception QueryError where --- | Connect the dmq node to cardano node via local state query protocol and --- update the node kernel with stake pool data necessary to perform mithril --- signature validation +-- +-- Type aliases +-- + +-- | `LocalStateQuery` using `CardanoBlock` +type CardanoLocalStateQueryClient crypto m a = + LocalStateQueryClient (CardanoBlock crypto) + (Point (CardanoBlock crypto)) + (Query (CardanoBlock crypto)) m Void + +-- | `ClientStAcuiring` using `CardanoBlock` +type CardanoClientStAcquiring crypto m a = + ClientStAcquiring (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStAcuired` using `CardanoBlock` +type CardanoClientStAcquired crypto m a = + ClientStAcquired (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStQuerying` using `CardanoBlock` +type CardanoClientStQuerying crypto m a b = + ClientStQuerying (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a b + + +-- | Local state query client which queries cardano node for +-- +-- * stake pool data (for signature validation) +-- * ledger peers (for peer selection) -- -- NOTE: we are querying using VolatileTip, e.g. for stake snapshot this -- means that the mark set should not be trusted as it might be different -- on different forks. -- -cardanoClient - :: forall block query point crypto m. (MonadDelay m, MonadSTM m, MonadThrow m, MonadTime m) - => (block ~ CardanoBlock crypto, query ~ Query block, point ~ Point block) +cardanoLocalStateQueryClient + :: forall crypto m. + ( MonadDelay m + , MonadSTM m + , MonadThrow m + , MonadTime m + ) => Tracer m TraceLocalStateQueryClient -> Bool -- ^ use ledger peers -> StakePools m -> StrictTVar m (Maybe UTCTime) -- ^ from node kernel - -> LocalStateQueryClient (CardanoBlock crypto) (Point block) (Query block) m Void -cardanoClient tracer ledgerPeers - StakePools { - stakePoolsVar, - ledgerPeersVar, - ledgerBigPeersVar - } - nextEpochVar = - LocalStateQueryClient (idle Nothing) + -> CardanoLocalStateQueryClient crypto m Void +cardanoLocalStateQueryClient + tracer ledgerPeers + StakePools { + stakePoolsVar, + ledgerPeersVar, + ledgerBigPeersVar + } + nextEpochVar + = + LocalStateQueryClient (idle Nothing) where idle mSystemStart = do traceWith tracer $ Acquiring mSystemStart pure $ SendMsgAcquire VolatileTip acquire where - acquire :: ClientStAcquiring block point query m Void + acquire :: CardanoClientStAcquiring crypto m Void acquire = ClientStAcquiring { recvMsgAcquired = let epochQry systemStart = pure $ @@ -103,8 +123,8 @@ cardanoClient tracer ledgerPeers } wrappingMismatch :: forall err r. - (r -> m (ClientStAcquired block point query m Void)) - -> ClientStQuerying block point query m Void (Either err r) + (r -> m (CardanoClientStAcquired crypto m Void)) + -> CardanoClientStQuerying crypto m Void (Either err r) wrappingMismatch k = ClientStQuerying $ either (const . throwIO . userError $ "mismatch era info") k @@ -137,12 +157,7 @@ cardanoClient tracer ledgerPeers queryCurrentEra :: SystemStart -> UTCTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryCurrentEra systemStart nextEpoch = SendMsgQuery (BlockQuery (QueryHardFork GetCurrentEra)) $ ClientStQuerying $ \era -> queryStakeSnapshots systemStart nextEpoch era @@ -152,12 +167,7 @@ cardanoClient tracer ledgerPeers :: SystemStart -> UTCTime -> EraIndex (CardanoEras crypto) - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) queryStakeSnapshots systemStart nextEpoch era = case era of EraByron{} -> throwIO UnsupportedEra @@ -178,12 +188,7 @@ cardanoClient tracer ledgerPeers where handleStakeSnapshots :: StakeSnapshots - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) handleStakeSnapshots StakeSnapshots { ssStakeSnapshots } = do atomically do writeTVar stakePoolsVar ssStakeSnapshots @@ -203,12 +208,7 @@ cardanoClient tracer ledgerPeers queryLedgerPeers :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryLedgerPeers systemStart toNextEpoch = SendMsgQuery (BlockQuery . QueryIfCurrentConway $ GetLedgerPeerSnapshot SingAllLedgerPeers) $ wrappingMismatch handleLedgerPeers @@ -258,63 +258,7 @@ cardanoClient tracer ledgerPeers -- release, continue the loop in `idle` release :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void release systemStart toNextEpoch = SendMsgRelease do threadDelay $ min (realToFrac toNextEpoch) 86400 -- TODO fuzz this? idle $ Just systemStart - - -connectToCardanoNode :: Tracer IO TraceLocalStateQueryClient - -> Bool -- ^ use ledger peers - -> LocalSnocket - -> FilePath - -> NetworkMagic - -> NodeKernel crypto ntnAddr IO - -> IO (Either SomeException Void) -connectToCardanoNode tracer ledgerPeers localSnocket' snocketPath networkMagic nodeKernel = - connectTo - localSnocket' - nullNetworkConnectTracers --debuggingNetworkConnectTracers - (combineVersions - [ simpleSingletonVersions - version - NodeToClientVersionData { - networkMagic - , query = False - } - \_version -> - Mx.OuroborosApplication - [ Mx.MiniProtocol - { miniProtocolNum = Mx.MiniProtocolNum 7 - , miniProtocolStart = Mx.StartEagerly - , miniProtocolLimits = - Mx.MiniProtocolLimits - { maximumIngressQueue = 0xffffffff - } - , miniProtocolRun = - Mx.InitiatorProtocolOnly - . Mx.mkMiniProtocolCbFromPeerSt - . const - $ ( nullTracer -- TODO: add tracer - , cStateQueryCodec - , StateIdle - , localStateQueryClientPeer - $ cardanoClient tracer - ledgerPeers - (stakePools nodeKernel) - (nextEpochVar nodeKernel) - ) - } - ] - | version <- [minBound..maxBound] - , let supportedVersionMap = supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock StandardCrypto)) - blk = supportedVersionMap Map.! version - Codecs {cStateQueryCodec} = - clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) blk version - ]) - snocketPath