From 30e94a6d82279f221846fb66d1641a30a9c6f542 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 19 Jun 2023 13:41:15 +0200 Subject: [PATCH] refactor: unicast router --- internal/testutil/proxy.go | 3 + p2p/config/config.go | 44 ++++++++++- p2p/raintree/logging.go | 31 -------- p2p/raintree/router.go | 147 ++++++++++++------------------------- p2p/raintree/testutil.go | 22 +++++- p2p/types/router.go | 2 +- p2p/unicast/logging.go | 44 +++++++++++ p2p/unicast/router.go | 139 +++++++++++++++++++++++++++++++++++ p2p/unicast/testutil.go | 10 +++ 9 files changed, 306 insertions(+), 136 deletions(-) create mode 100644 internal/testutil/proxy.go delete mode 100644 p2p/raintree/logging.go create mode 100644 p2p/unicast/logging.go create mode 100644 p2p/unicast/router.go create mode 100644 p2p/unicast/testutil.go diff --git a/internal/testutil/proxy.go b/internal/testutil/proxy.go new file mode 100644 index 0000000000..a586a6c054 --- /dev/null +++ b/internal/testutil/proxy.go @@ -0,0 +1,3 @@ +package testutil + +type ProxyFactory[T any] func(target T) (proxy T) diff --git a/p2p/config/config.go b/p2p/config/config.go index e64fe8519b..d98f4b9697 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -4,9 +4,19 @@ import ( "fmt" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/multierr" + "github.com/pokt-network/pocket/p2p/providers" + typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/crypto" - "go.uber.org/multierr" + "github.com/pokt-network/pocket/shared/modules" +) + +var ( + _ typesP2P.RouterConfig = &baseConfig{} + _ typesP2P.RouterConfig = &UnicastRouterConfig{} + _ typesP2P.RouterConfig = &RainTreeConfig{} ) // baseConfig implements `RouterConfig` using the given libp2p host and current @@ -22,6 +32,14 @@ type baseConfig struct { PeerstoreProvider providers.PeerstoreProvider } +type UnicastRouterConfig struct { + Logger *modules.Logger + Host host.Host + ProtocolID protocol.ID + MessageHandler typesP2P.MessageHandler + PeerHandler func(peer typesP2P.Peer) error +} + // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. type BackgroundConfig struct { Host host.Host @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) { if cfg.PeerstoreProvider == nil { err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) } + return nil +} + +// IsValid implements the respective member of the `RouterConfig` interface. +func (cfg *UnicastRouterConfig) IsValid() (err error) { + if cfg.Logger == nil { + err = multierr.Append(err, fmt.Errorf("logger not configured")) + } + + if cfg.Host == nil { + err = multierr.Append(err, fmt.Errorf("host not configured")) + } + + if cfg.ProtocolID == "" { + err = multierr.Append(err, fmt.Errorf("protocol id not configured")) + } + + if cfg.MessageHandler == nil { + err = multierr.Append(err, fmt.Errorf("message handler not configured")) + } + + if cfg.PeerHandler == nil { + err = multierr.Append(err, fmt.Errorf("peer handler not configured")) + } return err } diff --git a/p2p/raintree/logging.go b/p2p/raintree/logging.go deleted file mode 100644 index bbe3e6c3b0..0000000000 --- a/p2p/raintree/logging.go +++ /dev/null @@ -1,31 +0,0 @@ -package raintree - -import ( - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" - - "github.com/pokt-network/pocket/logger" - "github.com/pokt-network/pocket/p2p/utils" -) - -// logStream logs the incoming stream and its scope stats -func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { - rtr.logStreamScopeStats(stream) - - remotePeer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Debug().Err(err).Msg("getting remote remotePeer") - } else { - utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer) - } -} - -// logStreamScopeStats logs the incoming stream's scope stats -// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) -func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { - if err := utils.LogScopeStatFactory( - &logger.Global.Logger, - "stream scope (read-side)", - )(stream.Scope()); err != nil { - rtr.logger.Debug().Err(err).Msg("logging stream scope stats") - } -} diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 707f0a0a99..a68fd293df 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -2,11 +2,8 @@ package raintree import ( "fmt" - "io" - "time" - libp2pHost "github.com/libp2p/go-libp2p/core/host" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/pokt-network/pocket/p2p/unicast" "google.golang.org/protobuf/proto" "github.com/pokt-network/pocket/logger" @@ -22,15 +19,9 @@ import ( "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" - telemetry "github.com/pokt-network/pocket/telemetry" + "github.com/pokt-network/pocket/telemetry" ) -// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. -// TECHDEBT(#629): parameterize and expose via config. -// readStreamTimeout is the duration to wait for a read operation on a -// stream to complete, after which the stream is closed ("timed out"). -const readStreamTimeout = time.Second * 10 - var ( _ typesP2P.Router = &rainTreeRouter{} _ modules.IntegratableModule = &rainTreeRouter{} @@ -41,10 +32,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr type rainTreeRouter struct { base_modules.IntegratableModule + unicast.UnicastRouter logger *modules.Logger // handler is the function to call when a message is received. - handler typesP2P.RouterHandler + handler typesP2P.MessageHandler // host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore // & connection manager. `libp2p.New` configures and starts listening // according to options. @@ -84,7 +76,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type return nil, err } - rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream) return typesP2P.Router(rtr), nil } @@ -191,9 +182,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres return nil } -// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation -// if applicable. Returns the serialized `PocketEnvelope` data contained within. -func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { +// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope` +// bytes contained within, continues broadcast propagation, if applicable, and +// passes them off to the application by calling the configured `rtr.handler`. +// Intended to be called in a go routine. +func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight() blockHeight := fmt.Sprintf("%d", blockHeightInt) @@ -207,25 +200,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { ) var rainTreeMsg typesP2P.RainTreeMessage - if err := proto.Unmarshal(data, &rainTreeMsg); err != nil { - return nil, err + if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil { + return err } + // TECHDEBT(#763): refactor as "pre-propagation validation" networkMessage := messaging.PocketEnvelope{} if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil { rtr.logger.Error().Err(err).Msg("Error decoding network message") - return nil, err + return err } + // -- // Continue RainTree propagation if rainTreeMsg.Level > 0 { if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil { - return nil, err + return err } } - // Return the data back to the caller so it can be handled by the app specific bus - return rainTreeMsg.Data, nil + // There was no error, but we don't need to forward this to the app-specific bus. + // For example, the message has already been handled by the application. + if rainTreeMsg.Data == nil { + return nil + } + + // call configured handler to forward to app-specific bus + if err := rtr.handler(rainTreeMsg.Data); err != nil { + rtr.logger.Error().Err(err).Msg("handling raintree message") + } + return nil } // GetPeerstore implements the respective member of `typesP2P.Router`. @@ -270,94 +274,41 @@ func (rtr *rainTreeRouter) Size() int { return rtr.peersManager.GetPeerstore().Size() } -// handleStream ensures the peerstore contains the remote peer and then reads -// the incoming stream in a new go routine. -func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) { - rtr.logger.Debug().Msg("handling incoming stream") - peer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("parsing remote peer identity") - - if err = stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream") - } - return - } - - if err := rtr.AddPeer(peer); err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("adding remote peer to router") - } - - go rtr.readStream(stream) +// shouldSendToTarget returns false if target is self. +func shouldSendToTarget(target target) bool { + return !target.isSelf } -// readStream reads the incoming stream, extracts the serialized `PocketEnvelope` -// data from the incoming `RainTreeMessage`, and passes it to the application by -// calling the configured `rtr.handler`. Intended to be called in a go routine. -func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) { - // Time out if no data is sent to free resources. - // NB: tests using libp2p's `mocknet` rely on this not returning an error. - if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { - // `SetReadDeadline` not supported by `mocknet` streams. - rtr.logger.Error().Err(err).Msg("setting stream read deadline") +func (rtr *rainTreeRouter) setupUnicastRouter() error { + unicastRouterCfg := config.UnicastRouterConfig{ + Logger: rtr.logger, + Host: rtr.host, + ProtocolID: protocol.PoktProtocolID, + MessageHandler: rtr.handleRainTreeMsg, + PeerHandler: rtr.AddPeer, } - // log incoming stream - rtr.logStream(stream) - - // read stream - rainTreeMsgBz, err := io.ReadAll(stream) + unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg) if err != nil { - rtr.logger.Error().Err(err).Msg("reading from stream") - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - return - } - - // done reading; reset to signal this to remote peer - // NB: failing to reset the stream can easily max out the number of available - // network connections on the receiver's side. - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - - // extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation) - poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz) - if err != nil { - rtr.logger.Error().Err(err).Msg("handling raintree message") - return - } - - // There was no error, but we don't need to forward this to the app-specific bus. - // For example, the message has already been handled by the application. - if poktEnvelopeBz == nil { - return - } - - // call configured handler to forward to app-specific bus - if err := rtr.handler(poktEnvelopeBz); err != nil { - rtr.logger.Error().Err(err).Msg("handling pocket envelope") + return fmt.Errorf("setting up unicast router: %w", err) } -} -// shouldSendToTarget returns false if target is self. -func shouldSendToTarget(target target) bool { - return !target.isSelf + rtr.UnicastRouter = *unicastRouter + return nil } func (rtr *rainTreeRouter) setupDependencies() error { + if err := rtr.setupUnicastRouter(); err != nil { + return err + } + pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) if err != nil { - return err + return fmt.Errorf("getting staked peerstore: %w", err) } if err := rtr.setupPeerManager(pstore); err != nil { - return err + return fmt.Errorf("setting up peer manager: %w", err) } if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil { @@ -374,9 +325,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro func (rtr *rainTreeRouter) getHostname() string { return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname } - -// newReadStreamDeadline returns a future deadline -// based on the read stream timeout duration. -func newReadStreamDeadline() time.Time { - return time.Now().Add(readStreamTimeout) -} diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index ebcb464b5f..f12ef0a588 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -2,12 +2,30 @@ package raintree -import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/regen-network/gocuke" + + "github.com/pokt-network/pocket/internal/testutil" + typesP2P "github.com/pokt-network/pocket/p2p/types" +) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter +type routerHandlerProxyFactory = testutil.ProxyFactory[typesP2P.MessageHandler] + // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { - rtr.handleStream(stream) + rtr.UnicastRouter.HandleStream(stream) +} +func (rtr *rainTreeRouter) HandlerProxy( + t gocuke.TestingT, + handlerProxyFactory routerHandlerProxyFactory, +) { + t.Helper() + + // pass original handler to proxy factory & replace it with the proxy + origHandler := rtr.handler + rtr.handler = handlerProxyFactory(origHandler) } diff --git a/p2p/types/router.go b/p2p/types/router.go index f9c7f2d745..21a2b80100 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -22,7 +22,7 @@ type Router interface { RemovePeer(peer Peer) error } -type RouterHandler func(data []byte) error +type MessageHandler func(data []byte) error // RouterConfig is used to configure `Router` implementations and to test a // given configuration's validity. diff --git a/p2p/unicast/logging.go b/p2p/unicast/logging.go new file mode 100644 index 0000000000..7e2539c632 --- /dev/null +++ b/p2p/unicast/logging.go @@ -0,0 +1,44 @@ +package unicast + +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/logger" + "github.com/pokt-network/pocket/p2p/utils" +) + +// TECHDEBT(#830): it would be nice to have at least one more degree of freedom with which +// to limit logging in areas where it is known to be excessive / high frequency. +// Especially applicable to debug log lines which only contribute in edge cases, +// unusual circumstances, or regressions (e.g. hitting OS resource limits because +// of too many concurrent streams). +// +// This could ultimately be actuated from the CLI via flags, configs, and/or env +// vars. Initially, weo could consider coupling to a `--verbose` persistent flag. +// + +// logStream logs the incoming stream and its scope stats +func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) { + rtr.logStreamScopeStats(stream) + + remotePeer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Debug().Err(err).Msg("getting remote remotePeer") + } else { + utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer) + } +} + +// logStreamScopeStats logs the incoming stream's scope stats +// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) +func (rtr *UnicastRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { + if err := utils.LogScopeStatFactory( + &logger.Global.Logger, + "stream scope (read-side)", + )(stream.Scope()); err != nil { + rtr.logger.Debug().Err(err).Msg("logging stream scope stats") + } +} +func (rtr *UnicastRouter) getHostname() string { + return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname +} diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go new file mode 100644 index 0000000000..fd9b53c126 --- /dev/null +++ b/p2p/unicast/router.go @@ -0,0 +1,139 @@ +package unicast + +import ( + "io" + "time" + + libp2pHost "github.com/libp2p/go-libp2p/core/host" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/p2p/config" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. +// TECHDEBT(#629): parameterize and expose via config. +// readStreamTimeout is the duration to wait for a read operation on a +// stream to complete, after which the stream is closed ("timed out"). +const readStreamTimeout = time.Second * 10 + +var _ unicastRouterFactory = &UnicastRouter{} + +// TODO_THIS_COMMIT: consider defining/(re)using `RouterFactory` type +type unicastRouterFactory = modules.FactoryWithConfig[*UnicastRouter, *config.UnicastRouterConfig] + +type UnicastRouter struct { + base_modules.IntegratableModule + + logger *modules.Logger + // messageHandler is the function to call when a message is received. + // host represents a libp2p network node, it encapsulates a libp2p peerstore + // & connection manager. `libp2p.New` configures and starts listening + // according to options. + // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) + host libp2pHost.Host + messageHandler typesP2P.MessageHandler + // TODO_THIS_COMMIT: consider defining alongside `MessageHandler` type + // OR removing `MessageHandler` type + peerHandler func(peer typesP2P.Peer) error +} + +func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + return new(UnicastRouter).Create(bus, cfg) +} + +func (*UnicastRouter) Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + if err := cfg.IsValid(); err != nil { + return nil, err + } + + rtr := &UnicastRouter{ + logger: cfg.Logger, + host: cfg.Host, + messageHandler: cfg.MessageHandler, + peerHandler: cfg.PeerHandler, + } + rtr.SetBus(bus) + + // Don't handle incoming streams in client debug mode. + if !rtr.isClientDebugMode() { + rtr.host.SetStreamHandler(cfg.ProtocolID, rtr.handleStream) + } + + return rtr, nil +} + +// handleStream ensures the peerstore contains the remote peer and then reads +// the incoming stream in a new go routine. +func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { + rtr.logger.Debug().Msg("handling incoming stream") + peer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("parsing remote peer identity") + + if err = stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream") + } + return + } + + if err := rtr.peerHandler(peer); err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("adding remote peer to router") + } + + go rtr.readStream(stream) +} + +// readStream reads the message bytes out of the incoming stream and passes it to +// the configured `rtr.messageHandler`. Intended to be called in a go routine. +func (rtr *UnicastRouter) readStream(stream libp2pNetwork.Stream) { + // Time out if no data is sent to free resources. + // NB: tests using libp2p's `mocknet` rely on this not returning an error. + if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { + // `SetReadDeadline` not supported by `mocknet` streams. + rtr.logger.Error().Err(err).Msg("setting stream read deadline") + } + + //log incoming stream + rtr.logStream(stream) + + // read stream + messageBz, err := io.ReadAll(stream) + if err != nil { + rtr.logger.Error().Err(err).Msg("reading from stream") + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + return + } + + // done reading; reset to signal this to remote peer + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + + if err := rtr.messageHandler(messageBz); err != nil { + rtr.logger.Error().Err(err).Msg("handling message") + return + } +} + +// isClientDebugMode returns the value of `ClientDebugMode` in the base config +func (rtr *UnicastRouter) isClientDebugMode() bool { + return rtr.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode +} + +// newReadStreamDeadline returns a future deadline +// based on the read stream timeout duration. +func newReadStreamDeadline() time.Time { + return time.Now().Add(readStreamTimeout) +} diff --git a/p2p/unicast/testutil.go b/p2p/unicast/testutil.go new file mode 100644 index 0000000000..8a3f9caf35 --- /dev/null +++ b/p2p/unicast/testutil.go @@ -0,0 +1,10 @@ +//go:build test + +package unicast + +import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + +// HandleStream exports `unicastRouter#handleStream` for testing purposes. +func (rtr *UnicastRouter) HandleStream(stream libp2pNetwork.Stream) { + rtr.handleStream(stream) +}