From 3c1135df62e4856692ab524143278d4ac931de76 Mon Sep 17 00:00:00 2001 From: Tobias Dammers Date: Tue, 17 Dec 2024 18:00:08 +0100 Subject: [PATCH 1/3] POC: chainsync client as pipe producer --- .../ouroboros-network-protocols.cabal | 2 + .../Network/Protocol/ChainSync/PipeClient.hs | 70 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs diff --git a/ouroboros-network-protocols/ouroboros-network-protocols.cabal b/ouroboros-network-protocols/ouroboros-network-protocols.cabal index d36d2743ed..d8f7cf5b8f 100644 --- a/ouroboros-network-protocols/ouroboros-network-protocols.cabal +++ b/ouroboros-network-protocols/ouroboros-network-protocols.cabal @@ -41,6 +41,7 @@ library Ouroboros.Network.Protocol.BlockFetch.Type Ouroboros.Network.Protocol.ChainSync.Client Ouroboros.Network.Protocol.ChainSync.ClientPipelined + Ouroboros.Network.Protocol.ChainSync.PipeClient Ouroboros.Network.Protocol.ChainSync.Codec Ouroboros.Network.Protocol.ChainSync.PipelineDecision Ouroboros.Network.Protocol.ChainSync.Server @@ -104,6 +105,7 @@ library io-classes ^>=1.5.0, nothunks, ouroboros-network-api ^>=0.11, + pipes, quiet, serialise, si-timers, diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs new file mode 100644 index 0000000000..65b10643b0 --- /dev/null +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs @@ -0,0 +1,70 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RankNTypes #-} + +module Ouroboros.Network.Protocol.ChainSync.PipeClient +where + +import Network.TypedProtocol +import Ouroboros.Network.Protocol.ChainSync.Client +import Ouroboros.Network.Protocol.ChainSync.Type +import Pipes + +data ChainSyncResponse header point tip + = RollForward header tip + | RollBackward point tip + | IntersectFound point tip + | IntersectNotFound tip + deriving (Show) + +-- | Run a chain sync client as a 'Producer'. +-- Pass a suitable 'Driver', and a list of known 'point's. The 'Producer' will +-- initialize itself by requesting an intersection, and yielding either +-- 'IntersectFound' or 'IntersectNotFound' to indicate the server's response. +-- After that, it will indefinitely request updates and 'yield' them as +-- 'RollForward' and 'RollBackward', respectively. +chainSyncClientProducer :: forall header point tip m dstate + . ( Monad m + ) + => Driver + (ChainSync header point tip) + AsClient + dstate + (Producer (ChainSyncResponse header point tip) m) + -> [point] + -> Producer (ChainSyncResponse header point tip) m dstate +chainSyncClientProducer driver known = + snd <$> runPeerWithDriver driver peer + where + peer = chainSyncClientPeer (ChainSyncClient runPeer) + + runPeer :: Producer + (ChainSyncResponse header point tip) + m + (ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ()) + runPeer = + pure . SendMsgFindIntersect known $ + ClientStIntersect + { recvMsgIntersectFound = \p t -> ChainSyncClient $ do + yield (IntersectFound p t) + runProducer + , recvMsgIntersectNotFound = \t -> ChainSyncClient $ do + yield (IntersectNotFound t) + runProducer + } + runProducer :: Producer + (ChainSyncResponse header point tip) + m + (ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ()) + runProducer = + pure . SendMsgRequestNext (pure ()) $ + ClientStNext + { recvMsgRollForward = \h t -> ChainSyncClient $ do + yield (RollForward h t) + runProducer + , recvMsgRollBackward = \p t -> ChainSyncClient $ do + yield (RollBackward p t) + runProducer + } From 762bf460d94f0c0241e64cb0d03cff0789882984 Mon Sep 17 00:00:00 2001 From: Tobias Dammers Date: Thu, 19 Dec 2024 14:42:07 +0100 Subject: [PATCH 2/3] Pass initial points via `await` --- .../Network/Protocol/ChainSync/PipeClient.hs | 55 +++++++------------ 1 file changed, 19 insertions(+), 36 deletions(-) diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs index 65b10643b0..bf5ca86145 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs @@ -1,63 +1,46 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeOperators #-} module Ouroboros.Network.Protocol.ChainSync.PipeClient where -import Network.TypedProtocol import Ouroboros.Network.Protocol.ChainSync.Client -import Ouroboros.Network.Protocol.ChainSync.Type import Pipes -data ChainSyncResponse header point tip - = RollForward header tip +data ChainSyncResponse block point tip + = RollForward block tip | RollBackward point tip - | IntersectFound point tip | IntersectNotFound tip deriving (Show) --- | Run a chain sync client as a 'Producer'. --- Pass a suitable 'Driver', and a list of known 'point's. The 'Producer' will --- initialize itself by requesting an intersection, and yielding either --- 'IntersectFound' or 'IntersectNotFound' to indicate the server's response. --- After that, it will indefinitely request updates and 'yield' them as --- 'RollForward' and 'RollBackward', respectively. -chainSyncClientProducer :: forall header point tip m dstate +-- | Run a chain sync client as a 'Pipe'. +chainSyncClientProducer :: forall block point tip m p . ( Monad m + , p ~ Pipe [point] (ChainSyncResponse block point tip) m ) - => Driver - (ChainSync header point tip) - AsClient - dstate - (Producer (ChainSyncResponse header point tip) m) - -> [point] - -> Producer (ChainSyncResponse header point tip) m dstate -chainSyncClientProducer driver known = - snd <$> runPeerWithDriver driver peer + => (ChainSyncClient block point tip p () -> p ()) + -> p () +chainSyncClientProducer runClient = + runClient $ ChainSyncClient runProtocol where - peer = chainSyncClientPeer (ChainSyncClient runPeer) - - runPeer :: Producer - (ChainSyncResponse header point tip) - m - (ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ()) - runPeer = + runProtocol :: p (ClientStIdle block point tip p ()) + runProtocol = do + known <- await pure . SendMsgFindIntersect known $ ClientStIntersect { recvMsgIntersectFound = \p t -> ChainSyncClient $ do - yield (IntersectFound p t) + yield (RollBackward p t) runProducer , recvMsgIntersectNotFound = \t -> ChainSyncClient $ do yield (IntersectNotFound t) runProducer } - runProducer :: Producer - (ChainSyncResponse header point tip) - m - (ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ()) + + runProducer :: p (ClientStIdle block point tip p ()) runProducer = pure . SendMsgRequestNext (pure ()) $ ClientStNext From 4ef89303d24967ec7fc37a619a0e2543600dee1b Mon Sep 17 00:00:00 2001 From: Tobias Dammers Date: Thu, 19 Dec 2024 16:42:37 +0100 Subject: [PATCH 3/3] Use client / server setup --- .../Network/Protocol/ChainSync/PipeClient.hs | 90 ++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs index bf5ca86145..51780d67df 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ChainSync/PipeClient.hs @@ -10,6 +10,8 @@ where import Ouroboros.Network.Protocol.ChainSync.Client import Pipes +import Pipes.Core +import Control.Monad (forever) data ChainSyncResponse block point tip = RollForward block tip @@ -17,37 +19,65 @@ data ChainSyncResponse block point tip | IntersectNotFound tip deriving (Show) --- | Run a chain sync client as a 'Pipe'. -chainSyncClientProducer :: forall block point tip m p +data ChainSyncRequest point + = RequestIntersect [point] + | RequestNext + deriving (Show) + +example :: ( Monad m + , p ~ Proxy X () (ChainSyncRequest point) (ChainSyncResponse block point tip) m + ) + => [point] + -> (ChainSyncResponse block point tip -> m ()) + -> (ChainSyncClient block point tip p () -> p ()) + -> m () +example initialPoints handleMutation runClient = + runEffect + $ chainSyncClient initialPoints handleMutation + <<+ chainSyncServer runClient + +chainSyncClient :: forall block point tip m p . ( Monad m - , p ~ Pipe [point] (ChainSyncResponse block point tip) m + , p ~ Client + (ChainSyncRequest point) + (ChainSyncResponse block point tip) + m ) - => (ChainSyncClient block point tip p () -> p ()) + => [point] + -> (ChainSyncResponse block point tip -> m ()) -> p () -chainSyncClientProducer runClient = - runClient $ ChainSyncClient runProtocol - where - runProtocol :: p (ClientStIdle block point tip p ()) - runProtocol = do - known <- await - pure . SendMsgFindIntersect known $ - ClientStIntersect - { recvMsgIntersectFound = \p t -> ChainSyncClient $ do - yield (RollBackward p t) - runProducer - , recvMsgIntersectNotFound = \t -> ChainSyncClient $ do - yield (IntersectNotFound t) - runProducer - } +chainSyncClient known handleResponse = do + request (RequestIntersect known) >>= lift . handleResponse + forever $ do + request RequestNext >>= lift . handleResponse + - runProducer :: p (ClientStIdle block point tip p ()) - runProducer = - pure . SendMsgRequestNext (pure ()) $ - ClientStNext - { recvMsgRollForward = \h t -> ChainSyncClient $ do - yield (RollForward h t) - runProducer - , recvMsgRollBackward = \p t -> ChainSyncClient $ do - yield (RollBackward p t) - runProducer - } +chainSyncServer :: forall block point tip m p + . ( Monad m + , p ~ Server + (ChainSyncRequest point) + (ChainSyncResponse block point tip) + m + ) + => (ChainSyncClient block point tip p () -> p ()) + -> ChainSyncRequest point -> p () +chainSyncServer runClient rq0 = + runClient $ ChainSyncClient (handleRequest rq0) + where + handleRequest rq = case rq of + RequestIntersect known -> + pure . SendMsgFindIntersect known $ + ClientStIntersect + { recvMsgIntersectFound = \p t -> ChainSyncClient $ do + respond (RollBackward p t) >>= handleRequest + , recvMsgIntersectNotFound = \t -> ChainSyncClient $ do + respond (IntersectNotFound t) >>= handleRequest + } + RequestNext -> + pure . SendMsgRequestNext (pure ()) $ + ClientStNext + { recvMsgRollForward = \h t -> ChainSyncClient $ do + respond (RollForward h t) >>= handleRequest + , recvMsgRollBackward = \p t -> ChainSyncClient $ do + respond (RollBackward p t) >>= handleRequest + }