diff --git a/p2p/config/config.go b/p2p/config/config.go index e64fe8519..d98f4b969 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -4,9 +4,19 @@ import ( "fmt" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/multierr" + "github.com/pokt-network/pocket/p2p/providers" + typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/crypto" - "go.uber.org/multierr" + "github.com/pokt-network/pocket/shared/modules" +) + +var ( + _ typesP2P.RouterConfig = &baseConfig{} + _ typesP2P.RouterConfig = &UnicastRouterConfig{} + _ typesP2P.RouterConfig = &RainTreeConfig{} ) // baseConfig implements `RouterConfig` using the given libp2p host and current @@ -22,6 +32,14 @@ type baseConfig struct { PeerstoreProvider providers.PeerstoreProvider } +type UnicastRouterConfig struct { + Logger *modules.Logger + Host host.Host + ProtocolID protocol.ID + MessageHandler typesP2P.MessageHandler + PeerHandler func(peer typesP2P.Peer) error +} + // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. type BackgroundConfig struct { Host host.Host @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) { if cfg.PeerstoreProvider == nil { err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) } + return nil +} + +// IsValid implements the respective member of the `RouterConfig` interface. +func (cfg *UnicastRouterConfig) IsValid() (err error) { + if cfg.Logger == nil { + err = multierr.Append(err, fmt.Errorf("logger not configured")) + } + + if cfg.Host == nil { + err = multierr.Append(err, fmt.Errorf("host not configured")) + } + + if cfg.ProtocolID == "" { + err = multierr.Append(err, fmt.Errorf("protocol id not configured")) + } + + if cfg.MessageHandler == nil { + err = multierr.Append(err, fmt.Errorf("message handler not configured")) + } + + if cfg.PeerHandler == nil { + err = multierr.Append(err, fmt.Errorf("peer handler not configured")) + } return err } diff --git a/p2p/module.go b/p2p/module.go index d9e164fc7..5f87d5b6e 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -43,11 +43,17 @@ type p2pModule struct { identity libp2p.Option listenAddrs libp2p.Option + // TECHDEBT(#810): register the providers to the module registry instead of + // holding a reference in the module struct and passing via router config. + // // Assigned during creation via `#setupDependencies()`. currentHeightProvider providers.CurrentHeightProvider pstoreProvider providers.PeerstoreProvider nonceDeduper *mempool.GenericFIFOSet[uint64, uint64] + // TECHDEBT(#810): register the routers to the module registry instead of + // holding a reference in the module struct. This will improve testability. + // // Assigned during `#Start()`. TLDR; `host` listens on instantiation. // and `router` depends on `host`. router typesP2P.Router @@ -252,6 +258,9 @@ func (m *p2pModule) setupPeerstoreProvider() error { if !ok { return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.pstoreProvider = pstoreProvider return nil @@ -260,6 +269,7 @@ func (m *p2pModule) setupPeerstoreProvider() error { // setupCurrentHeightProvider attempts to retrieve the current height provider // from the bus registry, falls back to the consensus module if none is registered. func (m *p2pModule) setupCurrentHeightProvider() error { + // TECHDEBT(#810): simplify once submodules are more convenient to retrieve. m.logger.Debug().Msg("setupCurrentHeightProvider") currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName) if err != nil { @@ -276,6 +286,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error { if !ok { return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.currentHeightProvider = currentHeightProvider return nil @@ -294,6 +307,8 @@ func (m *p2pModule) setupNonceDeduper() error { // setupRouter instantiates the configured router implementation. func (m *p2pModule) setupRouter() (err error) { + // TECHDEBT(#810): register the router to the module registry instead of + // holding a reference in the module struct. This will improve testability. m.router, err = raintree.NewRainTreeRouter( m.GetBus(), &config.RainTreeConfig{ diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 707f0a0a9..bd0656ef7 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -2,11 +2,9 @@ package raintree import ( "fmt" - "io" - "time" libp2pHost "github.com/libp2p/go-libp2p/core/host" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/pokt-network/pocket/p2p/unicast" "google.golang.org/protobuf/proto" "github.com/pokt-network/pocket/logger" @@ -22,15 +20,9 @@ import ( "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" - telemetry "github.com/pokt-network/pocket/telemetry" + "github.com/pokt-network/pocket/telemetry" ) -// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. -// TECHDEBT(#629): parameterize and expose via config. -// readStreamTimeout is the duration to wait for a read operation on a -// stream to complete, after which the stream is closed ("timed out"). -const readStreamTimeout = time.Second * 10 - var ( _ typesP2P.Router = &rainTreeRouter{} _ modules.IntegratableModule = &rainTreeRouter{} @@ -41,10 +33,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr type rainTreeRouter struct { base_modules.IntegratableModule + unicast.UnicastRouter logger *modules.Logger // handler is the function to call when a message is received. - handler typesP2P.RouterHandler + handler typesP2P.MessageHandler // host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore // & connection manager. `libp2p.New` configures and starts listening // according to options. @@ -84,7 +77,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type return nil, err } - rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream) return typesP2P.Router(rtr), nil } @@ -191,9 +183,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres return nil } -// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation -// if applicable. Returns the serialized `PocketEnvelope` data contained within. -func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { +// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope` +// bytes contained within, continues broadcast propagation, if applicable, and +// passes them off to the application by calling the configured `rtr.handler`. +// Intended to be called in a go routine. +func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight() blockHeight := fmt.Sprintf("%d", blockHeightInt) @@ -207,25 +201,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { ) var rainTreeMsg typesP2P.RainTreeMessage - if err := proto.Unmarshal(data, &rainTreeMsg); err != nil { - return nil, err + if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil { + return err } + // TECHDEBT(#763): refactor as "pre-propagation validation" networkMessage := messaging.PocketEnvelope{} if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil { rtr.logger.Error().Err(err).Msg("Error decoding network message") - return nil, err + return err } // Continue RainTree propagation if rainTreeMsg.Level > 0 { if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil { - return nil, err + return err } } - // Return the data back to the caller so it can be handled by the app specific bus - return rainTreeMsg.Data, nil + // There was no error, but we don't need to forward this to the app-specific bus. + // For example, the message has already been handled by the application. + if rainTreeMsg.Data == nil { + rtr.logger.Debug().Msg("no data in RainTree message") + return nil + } + + // Call configured message handler with the serialized `PocketEnvelope`. + if err := rtr.handler(rainTreeMsg.Data); err != nil { + return fmt.Errorf("handling raintree message: %w", err) + } + return nil } // GetPeerstore implements the respective member of `typesP2P.Router`. @@ -254,6 +259,7 @@ func (rtr *rainTreeRouter) AddPeer(peer typesP2P.Peer) error { return nil } +// RemovePeer implements the respective member of `typesP2P.Router`. func (rtr *rainTreeRouter) RemovePeer(peer typesP2P.Peer) error { rtr.peersManager.HandleEvent( typesP2P.PeerManagerEvent{ @@ -270,94 +276,42 @@ func (rtr *rainTreeRouter) Size() int { return rtr.peersManager.GetPeerstore().Size() } -// handleStream ensures the peerstore contains the remote peer and then reads -// the incoming stream in a new go routine. -func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) { - rtr.logger.Debug().Msg("handling incoming stream") - peer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("parsing remote peer identity") - - if err = stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream") - } - return - } - - if err := rtr.AddPeer(peer); err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("adding remote peer to router") - } - - go rtr.readStream(stream) +// shouldSendToTarget returns false if target is self. +func shouldSendToTarget(target target) bool { + return !target.isSelf } -// readStream reads the incoming stream, extracts the serialized `PocketEnvelope` -// data from the incoming `RainTreeMessage`, and passes it to the application by -// calling the configured `rtr.handler`. Intended to be called in a go routine. -func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) { - // Time out if no data is sent to free resources. - // NB: tests using libp2p's `mocknet` rely on this not returning an error. - if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { - // `SetReadDeadline` not supported by `mocknet` streams. - rtr.logger.Error().Err(err).Msg("setting stream read deadline") - } - - // log incoming stream - rtr.logStream(stream) - - // read stream - rainTreeMsgBz, err := io.ReadAll(stream) - if err != nil { - rtr.logger.Error().Err(err).Msg("reading from stream") - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - return - } - - // done reading; reset to signal this to remote peer - // NB: failing to reset the stream can easily max out the number of available - // network connections on the receiver's side. - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") +// setupUnicastRouter configures and assigns `rtr.UnicastRouter`. +func (rtr *rainTreeRouter) setupUnicastRouter() error { + unicastRouterCfg := config.UnicastRouterConfig{ + Logger: rtr.logger, + Host: rtr.host, + ProtocolID: protocol.PoktProtocolID, + MessageHandler: rtr.handleRainTreeMsg, + PeerHandler: rtr.AddPeer, } - // extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation) - poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz) + unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg) if err != nil { - rtr.logger.Error().Err(err).Msg("handling raintree message") - return - } - - // There was no error, but we don't need to forward this to the app-specific bus. - // For example, the message has already been handled by the application. - if poktEnvelopeBz == nil { - return - } - - // call configured handler to forward to app-specific bus - if err := rtr.handler(poktEnvelopeBz); err != nil { - rtr.logger.Error().Err(err).Msg("handling pocket envelope") + return fmt.Errorf("setting up unicast router: %w", err) } -} -// shouldSendToTarget returns false if target is self. -func shouldSendToTarget(target target) bool { - return !target.isSelf + rtr.UnicastRouter = *unicastRouter + return nil } func (rtr *rainTreeRouter) setupDependencies() error { + if err := rtr.setupUnicastRouter(); err != nil { + return err + } + pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) if err != nil { - return err + return fmt.Errorf("getting staked peerstore: %w", err) } if err := rtr.setupPeerManager(pstore); err != nil { - return err + return fmt.Errorf("setting up peer manager: %w", err) } if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil { @@ -374,9 +328,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro func (rtr *rainTreeRouter) getHostname() string { return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname } - -// newReadStreamDeadline returns a future deadline -// based on the read stream timeout duration. -func newReadStreamDeadline() time.Time { - return time.Now().Add(readStreamTimeout) -} diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index ebcb464b5..e2e81b636 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -2,12 +2,14 @@ package raintree -import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { - rtr.handleStream(stream) + rtr.UnicastRouter.HandleStream(stream) } diff --git a/p2p/types/router.go b/p2p/types/router.go index f9c7f2d74..37080acbd 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -15,14 +15,25 @@ type Router interface { Broadcast(data []byte) error Send(data []byte, address cryptoPocket.Address) error - // Address book helpers - // TECHDEBT: simplify - remove `GetPeerstore` + // GetPeerstore is used by the P2P module to update the staked actor router's + // (`RainTreeRouter`) peerstore. + // + // TECHDEBT(#859+): remove the need for this group of interface methods. + // All peer discovery logic should be encapsulated by the router. + // Adopt `HandleEvent(*anypb.Any) error` here instead and forward events + // from P2P module to routers. + // CONSIDERATION: Utility, Conseneus and P2P modules could share an interface + // containing this method (e.g. `BusEventHandler`). GetPeerstore() Peerstore + // AddPeer is used to add a peer to the routers peerstore. It is intended to + // support peer discovery. AddPeer(peer Peer) error + // RemovePeer is used to remove a peer to the routers peerstore. It is used + // during churn to purge offline peers from the routers peerstore. RemovePeer(peer Peer) error } -type RouterHandler func(data []byte) error +type MessageHandler func(data []byte) error // RouterConfig is used to configure `Router` implementations and to test a // given configuration's validity. diff --git a/p2p/raintree/logging.go b/p2p/unicast/logging.go similarity index 50% rename from p2p/raintree/logging.go rename to p2p/unicast/logging.go index bbe3e6c3b..4a7f4faaf 100644 --- a/p2p/raintree/logging.go +++ b/p2p/unicast/logging.go @@ -1,4 +1,4 @@ -package raintree +package unicast import ( libp2pNetwork "github.com/libp2p/go-libp2p/core/network" @@ -7,8 +7,17 @@ import ( "github.com/pokt-network/pocket/p2p/utils" ) +// TECHDEBT(#830): it would be nice to have at least one more degree of freedom with which +// to limit logging in areas where it is known to be excessive / high frequency. +// Especially applicable to debug log lines which only contribute in edge cases, +// unusual circumstances, or regressions (e.g. hitting OS resource limits because +// of too many concurrent streams). +// +// This could ultimately be actuated from the CLI via flags, configs, and/or env +// vars. Initially, we could consider coupling to a `--verbose` persistent flag. + // logStream logs the incoming stream and its scope stats -func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { +func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) { rtr.logStreamScopeStats(stream) remotePeer, err := utils.PeerFromLibp2pStream(stream) @@ -21,7 +30,7 @@ func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { // logStreamScopeStats logs the incoming stream's scope stats // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) -func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { +func (rtr *UnicastRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { if err := utils.LogScopeStatFactory( &logger.Global.Logger, "stream scope (read-side)", @@ -29,3 +38,6 @@ func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { rtr.logger.Debug().Err(err).Msg("logging stream scope stats") } } +func (rtr *UnicastRouter) getHostname() string { + return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname +} diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go new file mode 100644 index 000000000..18a41fc32 --- /dev/null +++ b/p2p/unicast/router.go @@ -0,0 +1,149 @@ +package unicast + +import ( + "io" + "time" + + libp2pHost "github.com/libp2p/go-libp2p/core/host" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/p2p/config" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. +// TECHDEBT(#629): parameterize and expose via config. +// readStreamTimeout is the duration to wait for a read operation on a +// stream to complete, after which the stream is closed ("timed out"). +const readStreamTimeout = time.Second * 10 + +var _ unicastRouterFactory = &UnicastRouter{} + +type unicastRouterFactory = modules.FactoryWithConfig[*UnicastRouter, *config.UnicastRouterConfig] + +type UnicastRouter struct { + base_modules.IntegratableModule + + logger *modules.Logger + host libp2pHost.Host + // messageHandler is the function to call when a message is received. + // host represents a libp2p network node, it encapsulates a libp2p peerstore + // & connection manager. `libp2p.New` configures and starts listening + // according to options. + // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) + messageHandler typesP2P.MessageHandler + // peerHandler is called whenever a new incoming stream is established. + // TECHDEBT(#749,#747): this may not be needed once we've adopted libp2p + // peer IDs and multiaddr natively. + peerHandler func(peer typesP2P.Peer) error +} + +func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + return new(UnicastRouter).Create(bus, cfg) +} + +func (*UnicastRouter) Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + if err := cfg.IsValid(); err != nil { + return nil, err + } + + rtr := &UnicastRouter{ + logger: cfg.Logger, + host: cfg.Host, + messageHandler: cfg.MessageHandler, + peerHandler: cfg.PeerHandler, + } + + // `UnicastRouter` is not a submodule and therefore does not register with the + // module registry. However, as it does depend on the bus and therefore MUST + // embed the base `IntegrableModule` and call `#SetBus()`. + rtr.SetBus(bus) + + // Don't handle incoming streams in client debug mode. + if !rtr.isClientDebugMode() { + rtr.host.SetStreamHandler(cfg.ProtocolID, rtr.handleStream) + } + + return rtr, nil +} + +// handleStream ensures the peerstore contains the remote peer and then reads +// the incoming stream in a new go routine. +func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { + rtr.logger.Debug().Msg("handling incoming stream") + peer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("parsing remote peer identity") + + // Reset stream to signal the sender to give up and move on. + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err = stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream") + } + return + } + + if err := rtr.peerHandler(peer); err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("adding remote peer to router") + } + + // concurrently read messages out of incoming streams for handling. + go rtr.readStream(stream) +} + +// readStream reads the message bytes out of the incoming stream and passes it to +// the configured `rtr.messageHandler`. Intended to be called in a go routine. +func (rtr *UnicastRouter) readStream(stream libp2pNetwork.Stream) { + // Time out if no data is sent to free resources. + if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { + // Not returning an error for testing purposes; i.e. `SetReadDeadline` is + // not supported by libp2p `mocknet` streams. This should only produce an + // error if a node advertises and listens via an unsupported transport + // protocol, which should never happen in prod. + rtr.logger.Error().Err(err).Msg("setting stream read deadline") + } + + // log incoming stream + rtr.logStream(stream) + + // read stream + messageBz, err := io.ReadAll(stream) + if err != nil { + rtr.logger.Error().Err(err).Msg("reading from stream") + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + return + } + + // done reading; reset to signal this to remote peer + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + + if err := rtr.messageHandler(messageBz); err != nil { + rtr.logger.Error().Err(err).Msg("handling message") + return + } +} + +// isClientDebugMode returns the value of `ClientDebugMode` in the base config +func (rtr *UnicastRouter) isClientDebugMode() bool { + return rtr.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode +} + +// newReadStreamDeadline returns a future deadline +// based on the read stream timeout duration. +func newReadStreamDeadline() time.Time { + return time.Now().Add(readStreamTimeout) +} diff --git a/p2p/unicast/testutil.go b/p2p/unicast/testutil.go new file mode 100644 index 000000000..8a3f9caf3 --- /dev/null +++ b/p2p/unicast/testutil.go @@ -0,0 +1,10 @@ +//go:build test + +package unicast + +import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + +// HandleStream exports `unicastRouter#handleStream` for testing purposes. +func (rtr *UnicastRouter) HandleStream(stream libp2pNetwork.Stream) { + rtr.handleStream(stream) +}