From 69a286017450df56c33ac302b5f2dc9e219add25 Mon Sep 17 00:00:00 2001 From: Dima Kniazev Date: Wed, 28 Jun 2023 11:08:38 -0700 Subject: [PATCH 1/3] Update vault_test.go (#862) The latest tag was removed for some reason: https://github.com/docker-library/official-images/commit/32c2455241a13e31b9c46cb109294d2bc6c46ba0 Let's pin `1.13.3`. --- app/client/keybase/hashicorp/vault_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/client/keybase/hashicorp/vault_test.go b/app/client/keybase/hashicorp/vault_test.go index b8b04cbc1..e19376a65 100644 --- a/app/client/keybase/hashicorp/vault_test.go +++ b/app/client/keybase/hashicorp/vault_test.go @@ -42,7 +42,7 @@ func TestMain(m *testing.M) { // pulls an image, creates a container based on it and runs it resource, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: "vault", - Tag: "latest", + Tag: "1.13.3", Env: []string{ "VAULT_DEV_ROOT_TOKEN_ID=dev-only-token", "VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200", From 26238f9e1c9ca8018138b02f3b51b247c093341e Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 28 Jun 2023 11:36:45 -0700 Subject: [PATCH 2/3] Update E2E_FEATURE_LIST.md Minor updates related to feature flags --- utility/doc/E2E_FEATURE_LIST.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/utility/doc/E2E_FEATURE_LIST.md b/utility/doc/E2E_FEATURE_LIST.md index 8aef6fe94..3710e5488 100644 --- a/utility/doc/E2E_FEATURE_LIST.md +++ b/utility/doc/E2E_FEATURE_LIST.md @@ -35,6 +35,7 @@ - [2. Enabling Feature 🔴](#2-enabling-feature-) - [3. Disabling Feature 🔴](#3-disabling-feature-) - [4. Modifying Feature Value 🔴](#4-modifying-feature-value-) + - [5. Protocol Upgrades 🔴](#5-protocol-upgrades-) - [G. E2E Governance 🔴](#g-e2e-governance-) ## Legend @@ -120,6 +121,8 @@ _NOTE: Actor may refer to any relevant actor and custom paths may need to be cre ## F. E2E Feature Flags 🔴 +_Note: If it is easier, features 1-4 can be captured in a single ticket_ + ### 1. Adding New Feature 🔴 ### 2. Enabling Feature 🔴 @@ -128,4 +131,6 @@ _NOTE: Actor may refer to any relevant actor and custom paths may need to be cre ### 4. Modifying Feature Value 🔴 +### 5. Protocol Upgrades 🔴 + ## G. E2E Governance 🔴 From fd305265457d39346059e8305ac621a371ae6410 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 29 Jun 2023 09:29:34 +0200 Subject: [PATCH 3/3] [P2P] refactor: unicast router (#844) ## Description Factor out "unicast routing" concerns which will be common to both RainTree and background routers. To implement the `Router` interface (which I believe is applied appropriately), **each** must be able to both send and receive messages directly to/from individual peers. In libp2p this is done via streams. ### Before ```mermaid classDiagram class RainTreeMessage { <> +Level uint32 +Data []byte } class PocketEnvelope { <> +Content *anypb.Any +Nonce uint64 } RainTreeMessage --* PocketEnvelope : serialized as `Data` %% class p2pModule { %% -handlePocketEnvelope([]byte) error %% } %% class P2PModule { %% <> %% GetAddress() (Address, error) %% HandleEvent(*anypb.Any) error %% Send([]byte, address Address) error %% Broadcast([]byte) error %% BroadcastStaked([]byte) error %% } %% p2pModule --|> P2PModule class RainTreeRouter { -handler RouterHandler +Broadcast([]byte) error +Send([]byte, address Address) error -handleStream(stream libp2pNetwork.Stream) -readStream(stream libp2pNetwork.Stream) -handleRainTreeMsg([]byte) ([]byte, error) } %% p2pModule --* RainTreeRouter %% RainTreeRouter --> p2pModule : `handler` == `handlePocketEnvelope` RainTreeRouter --o RainTreeMessage %% p2pModule --o PocketEnvelope %% p2pModule --* NonceDeduper class Router { <> +Send([]byte, address Address) error +Broadcast([]byte) error } RainTreeRouter --|> Router ``` ### After ```mermaid classDiagram class RainTreeMessage { <> +Level uint32 +Data []byte } class PocketEnvelope { <> +Content *anypb.Any +Nonce uint64 } RainTreeMessage --* PocketEnvelope : serialized as `Data` %% class p2pModule { %% -handlePocketEnvelope([]byte) error %% } %% class P2PModule { %% <> %% GetAddress() (Address, error) %% HandleEvent(*anypb.Any) error %% Send([]byte, address Address) error %% Broadcast([]byte) error %% BroadcastStaked([]byte) error %% } %% p2pModule --|> P2PModule class RainTreeRouter { UnicastRouter -handler MessageHandler +Broadcast([]byte) error -handleRainTreeMsg([]byte) error } class UnicastRouter { -messageHandler MessageHandler -peerHandler PeerHandler +Send([]byte, address Address) error -handleStream(stream libp2pNetwork.Stream) -readStream(stream libp2pNetwork.Stream) } RainTreeRouter --* UnicastRouter : (embedded) %% UnicastRouter --> RainTreeRouter : via `messageHandler` %% p2pModule --* RainTreeRouter %% RainTreeRouter --> p2pModule : `handler` == `handlePocketEnvelope` RainTreeRouter --o RainTreeMessage %% p2pModule --o PocketEnvelope %% p2pModule --* NonceDeduper class Router { <> +Send([]byte, address Address) error +Broadcast([]byte) error } RainTreeRouter --|> Router ``` See #505 "Integration / Architecture" class diagrams for more context. ### Summary generated by Reviewpad on 29 Jun 23 06:52 UTC This pull request includes several changes related to improving testability, reducing technical debt, and implementing unicast functionality. The overall changes aim to improve the code structure, make it more maintainable and testable, and add support for handling unicast messages in a peer-to-peer network. Here is a summary of the changes per file: 1. In the file `setup.go`, changes were made to register providers and routers to the module registry and improve testability. 2. In the file `router.go`, changes were made to improve peer discovery, introduce encapsulated approaches, and share interfaces among modules. Additionally, type and function names were updated. 3. The file `logging.go` underwent a rename and changes were made to the package name, logging functionality, and function names. 4. In the file `network.go`, changes were made to imports, type names, function names, and the implementation of the `handleRainTreeMsg` function to support the unicast functionality. 5. The file `testutil.go` was added as a new file to provide testing utilities for the unicast functionality. 6. In the file `p2p/raintree/testutil.go`, changes were made to imports and a function to call the `HandleStream` method from the `UnicastRouter` struct. These changes collectively improve the testability, code structure, and unicast functionality in the project. ## Issue - #505 ## Dependants - #732 ## Type of change Please mark the relevant option(s): - [ ] New feature, functionality or library - [ ] Bug fix - [x] Code health or cleanup - [ ] Major breaking change - [ ] Documentation - [ ] Other ## List of changes - Refactored stream handling logic (i.e. unicast routing) out of `rainTreeRouter` and into new `UnicastRouter` type ## Testing - [x] `make develop_test`; if any code changes were made - [x] `make test_e2e` on [k8s LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md); if any code changes were made - [x] `e2e-devnet-test` passes tests on [DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd); if any code was changed - [ ] [Docker Compose LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md); if any major functionality was changed or introduced - [x] [k8s LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md); if any infrastructure or configuration changes were made ## Required Checklist - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have added, or updated, [`godoc` format comments](https://go.dev/blog/godoc) on touched members (see: [tip.golang.org/doc/comment](https://tip.golang.org/doc/comment)) - [x] I have tested my changes using the available tooling - [ ] I have updated the corresponding CHANGELOG ### If Applicable Checklist - [ ] I have updated the corresponding README(s); local and/or global - [x] I have added tests that prove my fix is effective or that my feature works - [ ] I have added, or updated, [mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding README(s) - [ ] I have added, or updated, documentation and [mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*` if I updated `shared/*`README(s) --------- Co-authored-by: @Olshansk Co-authored-by: Daniel Olshansky --- p2p/config/config.go | 44 +++++++- p2p/module.go | 15 +++ p2p/raintree/router.go | 148 +++++++++----------------- p2p/raintree/testutil.go | 6 +- p2p/types/router.go | 17 ++- p2p/{raintree => unicast}/logging.go | 18 +++- p2p/unicast/router.go | 149 +++++++++++++++++++++++++++ p2p/unicast/testutil.go | 10 ++ 8 files changed, 298 insertions(+), 109 deletions(-) rename p2p/{raintree => unicast}/logging.go (50%) create mode 100644 p2p/unicast/router.go create mode 100644 p2p/unicast/testutil.go diff --git a/p2p/config/config.go b/p2p/config/config.go index e64fe8519..d98f4b969 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -4,9 +4,19 @@ import ( "fmt" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/multierr" + "github.com/pokt-network/pocket/p2p/providers" + typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/crypto" - "go.uber.org/multierr" + "github.com/pokt-network/pocket/shared/modules" +) + +var ( + _ typesP2P.RouterConfig = &baseConfig{} + _ typesP2P.RouterConfig = &UnicastRouterConfig{} + _ typesP2P.RouterConfig = &RainTreeConfig{} ) // baseConfig implements `RouterConfig` using the given libp2p host and current @@ -22,6 +32,14 @@ type baseConfig struct { PeerstoreProvider providers.PeerstoreProvider } +type UnicastRouterConfig struct { + Logger *modules.Logger + Host host.Host + ProtocolID protocol.ID + MessageHandler typesP2P.MessageHandler + PeerHandler func(peer typesP2P.Peer) error +} + // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. type BackgroundConfig struct { Host host.Host @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) { if cfg.PeerstoreProvider == nil { err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) } + return nil +} + +// IsValid implements the respective member of the `RouterConfig` interface. +func (cfg *UnicastRouterConfig) IsValid() (err error) { + if cfg.Logger == nil { + err = multierr.Append(err, fmt.Errorf("logger not configured")) + } + + if cfg.Host == nil { + err = multierr.Append(err, fmt.Errorf("host not configured")) + } + + if cfg.ProtocolID == "" { + err = multierr.Append(err, fmt.Errorf("protocol id not configured")) + } + + if cfg.MessageHandler == nil { + err = multierr.Append(err, fmt.Errorf("message handler not configured")) + } + + if cfg.PeerHandler == nil { + err = multierr.Append(err, fmt.Errorf("peer handler not configured")) + } return err } diff --git a/p2p/module.go b/p2p/module.go index d9e164fc7..5f87d5b6e 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -43,11 +43,17 @@ type p2pModule struct { identity libp2p.Option listenAddrs libp2p.Option + // TECHDEBT(#810): register the providers to the module registry instead of + // holding a reference in the module struct and passing via router config. + // // Assigned during creation via `#setupDependencies()`. currentHeightProvider providers.CurrentHeightProvider pstoreProvider providers.PeerstoreProvider nonceDeduper *mempool.GenericFIFOSet[uint64, uint64] + // TECHDEBT(#810): register the routers to the module registry instead of + // holding a reference in the module struct. This will improve testability. + // // Assigned during `#Start()`. TLDR; `host` listens on instantiation. // and `router` depends on `host`. router typesP2P.Router @@ -252,6 +258,9 @@ func (m *p2pModule) setupPeerstoreProvider() error { if !ok { return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.pstoreProvider = pstoreProvider return nil @@ -260,6 +269,7 @@ func (m *p2pModule) setupPeerstoreProvider() error { // setupCurrentHeightProvider attempts to retrieve the current height provider // from the bus registry, falls back to the consensus module if none is registered. func (m *p2pModule) setupCurrentHeightProvider() error { + // TECHDEBT(#810): simplify once submodules are more convenient to retrieve. m.logger.Debug().Msg("setupCurrentHeightProvider") currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName) if err != nil { @@ -276,6 +286,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error { if !ok { return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.currentHeightProvider = currentHeightProvider return nil @@ -294,6 +307,8 @@ func (m *p2pModule) setupNonceDeduper() error { // setupRouter instantiates the configured router implementation. func (m *p2pModule) setupRouter() (err error) { + // TECHDEBT(#810): register the router to the module registry instead of + // holding a reference in the module struct. This will improve testability. m.router, err = raintree.NewRainTreeRouter( m.GetBus(), &config.RainTreeConfig{ diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 707f0a0a9..bd0656ef7 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -2,11 +2,9 @@ package raintree import ( "fmt" - "io" - "time" libp2pHost "github.com/libp2p/go-libp2p/core/host" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/pokt-network/pocket/p2p/unicast" "google.golang.org/protobuf/proto" "github.com/pokt-network/pocket/logger" @@ -22,15 +20,9 @@ import ( "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" - telemetry "github.com/pokt-network/pocket/telemetry" + "github.com/pokt-network/pocket/telemetry" ) -// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. -// TECHDEBT(#629): parameterize and expose via config. -// readStreamTimeout is the duration to wait for a read operation on a -// stream to complete, after which the stream is closed ("timed out"). -const readStreamTimeout = time.Second * 10 - var ( _ typesP2P.Router = &rainTreeRouter{} _ modules.IntegratableModule = &rainTreeRouter{} @@ -41,10 +33,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr type rainTreeRouter struct { base_modules.IntegratableModule + unicast.UnicastRouter logger *modules.Logger // handler is the function to call when a message is received. - handler typesP2P.RouterHandler + handler typesP2P.MessageHandler // host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore // & connection manager. `libp2p.New` configures and starts listening // according to options. @@ -84,7 +77,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type return nil, err } - rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream) return typesP2P.Router(rtr), nil } @@ -191,9 +183,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres return nil } -// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation -// if applicable. Returns the serialized `PocketEnvelope` data contained within. -func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { +// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope` +// bytes contained within, continues broadcast propagation, if applicable, and +// passes them off to the application by calling the configured `rtr.handler`. +// Intended to be called in a go routine. +func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight() blockHeight := fmt.Sprintf("%d", blockHeightInt) @@ -207,25 +201,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { ) var rainTreeMsg typesP2P.RainTreeMessage - if err := proto.Unmarshal(data, &rainTreeMsg); err != nil { - return nil, err + if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil { + return err } + // TECHDEBT(#763): refactor as "pre-propagation validation" networkMessage := messaging.PocketEnvelope{} if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil { rtr.logger.Error().Err(err).Msg("Error decoding network message") - return nil, err + return err } // Continue RainTree propagation if rainTreeMsg.Level > 0 { if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil { - return nil, err + return err } } - // Return the data back to the caller so it can be handled by the app specific bus - return rainTreeMsg.Data, nil + // There was no error, but we don't need to forward this to the app-specific bus. + // For example, the message has already been handled by the application. + if rainTreeMsg.Data == nil { + rtr.logger.Debug().Msg("no data in RainTree message") + return nil + } + + // Call configured message handler with the serialized `PocketEnvelope`. + if err := rtr.handler(rainTreeMsg.Data); err != nil { + return fmt.Errorf("handling raintree message: %w", err) + } + return nil } // GetPeerstore implements the respective member of `typesP2P.Router`. @@ -254,6 +259,7 @@ func (rtr *rainTreeRouter) AddPeer(peer typesP2P.Peer) error { return nil } +// RemovePeer implements the respective member of `typesP2P.Router`. func (rtr *rainTreeRouter) RemovePeer(peer typesP2P.Peer) error { rtr.peersManager.HandleEvent( typesP2P.PeerManagerEvent{ @@ -270,94 +276,42 @@ func (rtr *rainTreeRouter) Size() int { return rtr.peersManager.GetPeerstore().Size() } -// handleStream ensures the peerstore contains the remote peer and then reads -// the incoming stream in a new go routine. -func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) { - rtr.logger.Debug().Msg("handling incoming stream") - peer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("parsing remote peer identity") - - if err = stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream") - } - return - } - - if err := rtr.AddPeer(peer); err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("adding remote peer to router") - } - - go rtr.readStream(stream) +// shouldSendToTarget returns false if target is self. +func shouldSendToTarget(target target) bool { + return !target.isSelf } -// readStream reads the incoming stream, extracts the serialized `PocketEnvelope` -// data from the incoming `RainTreeMessage`, and passes it to the application by -// calling the configured `rtr.handler`. Intended to be called in a go routine. -func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) { - // Time out if no data is sent to free resources. - // NB: tests using libp2p's `mocknet` rely on this not returning an error. - if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { - // `SetReadDeadline` not supported by `mocknet` streams. - rtr.logger.Error().Err(err).Msg("setting stream read deadline") - } - - // log incoming stream - rtr.logStream(stream) - - // read stream - rainTreeMsgBz, err := io.ReadAll(stream) - if err != nil { - rtr.logger.Error().Err(err).Msg("reading from stream") - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - return - } - - // done reading; reset to signal this to remote peer - // NB: failing to reset the stream can easily max out the number of available - // network connections on the receiver's side. - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") +// setupUnicastRouter configures and assigns `rtr.UnicastRouter`. +func (rtr *rainTreeRouter) setupUnicastRouter() error { + unicastRouterCfg := config.UnicastRouterConfig{ + Logger: rtr.logger, + Host: rtr.host, + ProtocolID: protocol.PoktProtocolID, + MessageHandler: rtr.handleRainTreeMsg, + PeerHandler: rtr.AddPeer, } - // extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation) - poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz) + unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg) if err != nil { - rtr.logger.Error().Err(err).Msg("handling raintree message") - return - } - - // There was no error, but we don't need to forward this to the app-specific bus. - // For example, the message has already been handled by the application. - if poktEnvelopeBz == nil { - return - } - - // call configured handler to forward to app-specific bus - if err := rtr.handler(poktEnvelopeBz); err != nil { - rtr.logger.Error().Err(err).Msg("handling pocket envelope") + return fmt.Errorf("setting up unicast router: %w", err) } -} -// shouldSendToTarget returns false if target is self. -func shouldSendToTarget(target target) bool { - return !target.isSelf + rtr.UnicastRouter = *unicastRouter + return nil } func (rtr *rainTreeRouter) setupDependencies() error { + if err := rtr.setupUnicastRouter(); err != nil { + return err + } + pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) if err != nil { - return err + return fmt.Errorf("getting staked peerstore: %w", err) } if err := rtr.setupPeerManager(pstore); err != nil { - return err + return fmt.Errorf("setting up peer manager: %w", err) } if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil { @@ -374,9 +328,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro func (rtr *rainTreeRouter) getHostname() string { return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname } - -// newReadStreamDeadline returns a future deadline -// based on the read stream timeout duration. -func newReadStreamDeadline() time.Time { - return time.Now().Add(readStreamTimeout) -} diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index ebcb464b5..e2e81b636 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -2,12 +2,14 @@ package raintree -import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { - rtr.handleStream(stream) + rtr.UnicastRouter.HandleStream(stream) } diff --git a/p2p/types/router.go b/p2p/types/router.go index f9c7f2d74..37080acbd 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -15,14 +15,25 @@ type Router interface { Broadcast(data []byte) error Send(data []byte, address cryptoPocket.Address) error - // Address book helpers - // TECHDEBT: simplify - remove `GetPeerstore` + // GetPeerstore is used by the P2P module to update the staked actor router's + // (`RainTreeRouter`) peerstore. + // + // TECHDEBT(#859+): remove the need for this group of interface methods. + // All peer discovery logic should be encapsulated by the router. + // Adopt `HandleEvent(*anypb.Any) error` here instead and forward events + // from P2P module to routers. + // CONSIDERATION: Utility, Conseneus and P2P modules could share an interface + // containing this method (e.g. `BusEventHandler`). GetPeerstore() Peerstore + // AddPeer is used to add a peer to the routers peerstore. It is intended to + // support peer discovery. AddPeer(peer Peer) error + // RemovePeer is used to remove a peer to the routers peerstore. It is used + // during churn to purge offline peers from the routers peerstore. RemovePeer(peer Peer) error } -type RouterHandler func(data []byte) error +type MessageHandler func(data []byte) error // RouterConfig is used to configure `Router` implementations and to test a // given configuration's validity. diff --git a/p2p/raintree/logging.go b/p2p/unicast/logging.go similarity index 50% rename from p2p/raintree/logging.go rename to p2p/unicast/logging.go index bbe3e6c3b..4a7f4faaf 100644 --- a/p2p/raintree/logging.go +++ b/p2p/unicast/logging.go @@ -1,4 +1,4 @@ -package raintree +package unicast import ( libp2pNetwork "github.com/libp2p/go-libp2p/core/network" @@ -7,8 +7,17 @@ import ( "github.com/pokt-network/pocket/p2p/utils" ) +// TECHDEBT(#830): it would be nice to have at least one more degree of freedom with which +// to limit logging in areas where it is known to be excessive / high frequency. +// Especially applicable to debug log lines which only contribute in edge cases, +// unusual circumstances, or regressions (e.g. hitting OS resource limits because +// of too many concurrent streams). +// +// This could ultimately be actuated from the CLI via flags, configs, and/or env +// vars. Initially, we could consider coupling to a `--verbose` persistent flag. + // logStream logs the incoming stream and its scope stats -func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { +func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) { rtr.logStreamScopeStats(stream) remotePeer, err := utils.PeerFromLibp2pStream(stream) @@ -21,7 +30,7 @@ func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { // logStreamScopeStats logs the incoming stream's scope stats // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) -func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { +func (rtr *UnicastRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { if err := utils.LogScopeStatFactory( &logger.Global.Logger, "stream scope (read-side)", @@ -29,3 +38,6 @@ func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { rtr.logger.Debug().Err(err).Msg("logging stream scope stats") } } +func (rtr *UnicastRouter) getHostname() string { + return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname +} diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go new file mode 100644 index 000000000..18a41fc32 --- /dev/null +++ b/p2p/unicast/router.go @@ -0,0 +1,149 @@ +package unicast + +import ( + "io" + "time" + + libp2pHost "github.com/libp2p/go-libp2p/core/host" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/p2p/config" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. +// TECHDEBT(#629): parameterize and expose via config. +// readStreamTimeout is the duration to wait for a read operation on a +// stream to complete, after which the stream is closed ("timed out"). +const readStreamTimeout = time.Second * 10 + +var _ unicastRouterFactory = &UnicastRouter{} + +type unicastRouterFactory = modules.FactoryWithConfig[*UnicastRouter, *config.UnicastRouterConfig] + +type UnicastRouter struct { + base_modules.IntegratableModule + + logger *modules.Logger + host libp2pHost.Host + // messageHandler is the function to call when a message is received. + // host represents a libp2p network node, it encapsulates a libp2p peerstore + // & connection manager. `libp2p.New` configures and starts listening + // according to options. + // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) + messageHandler typesP2P.MessageHandler + // peerHandler is called whenever a new incoming stream is established. + // TECHDEBT(#749,#747): this may not be needed once we've adopted libp2p + // peer IDs and multiaddr natively. + peerHandler func(peer typesP2P.Peer) error +} + +func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + return new(UnicastRouter).Create(bus, cfg) +} + +func (*UnicastRouter) Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + if err := cfg.IsValid(); err != nil { + return nil, err + } + + rtr := &UnicastRouter{ + logger: cfg.Logger, + host: cfg.Host, + messageHandler: cfg.MessageHandler, + peerHandler: cfg.PeerHandler, + } + + // `UnicastRouter` is not a submodule and therefore does not register with the + // module registry. However, as it does depend on the bus and therefore MUST + // embed the base `IntegrableModule` and call `#SetBus()`. + rtr.SetBus(bus) + + // Don't handle incoming streams in client debug mode. + if !rtr.isClientDebugMode() { + rtr.host.SetStreamHandler(cfg.ProtocolID, rtr.handleStream) + } + + return rtr, nil +} + +// handleStream ensures the peerstore contains the remote peer and then reads +// the incoming stream in a new go routine. +func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { + rtr.logger.Debug().Msg("handling incoming stream") + peer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("parsing remote peer identity") + + // Reset stream to signal the sender to give up and move on. + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err = stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream") + } + return + } + + if err := rtr.peerHandler(peer); err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("adding remote peer to router") + } + + // concurrently read messages out of incoming streams for handling. + go rtr.readStream(stream) +} + +// readStream reads the message bytes out of the incoming stream and passes it to +// the configured `rtr.messageHandler`. Intended to be called in a go routine. +func (rtr *UnicastRouter) readStream(stream libp2pNetwork.Stream) { + // Time out if no data is sent to free resources. + if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { + // Not returning an error for testing purposes; i.e. `SetReadDeadline` is + // not supported by libp2p `mocknet` streams. This should only produce an + // error if a node advertises and listens via an unsupported transport + // protocol, which should never happen in prod. + rtr.logger.Error().Err(err).Msg("setting stream read deadline") + } + + // log incoming stream + rtr.logStream(stream) + + // read stream + messageBz, err := io.ReadAll(stream) + if err != nil { + rtr.logger.Error().Err(err).Msg("reading from stream") + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + return + } + + // done reading; reset to signal this to remote peer + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + + if err := rtr.messageHandler(messageBz); err != nil { + rtr.logger.Error().Err(err).Msg("handling message") + return + } +} + +// isClientDebugMode returns the value of `ClientDebugMode` in the base config +func (rtr *UnicastRouter) isClientDebugMode() bool { + return rtr.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode +} + +// newReadStreamDeadline returns a future deadline +// based on the read stream timeout duration. +func newReadStreamDeadline() time.Time { + return time.Now().Add(readStreamTimeout) +} diff --git a/p2p/unicast/testutil.go b/p2p/unicast/testutil.go new file mode 100644 index 000000000..8a3f9caf3 --- /dev/null +++ b/p2p/unicast/testutil.go @@ -0,0 +1,10 @@ +//go:build test + +package unicast + +import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + +// HandleStream exports `unicastRouter#handleStream` for testing purposes. +func (rtr *UnicastRouter) HandleStream(stream libp2pNetwork.Stream) { + rtr.handleStream(stream) +}