Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] refactor: unicast router #844

Merged
merged 18 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/testutil/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package testutil

type ProxyFactory[T any] func(target T) (proxy T)
44 changes: 43 additions & 1 deletion p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"go.uber.org/multierr"
"github.com/pokt-network/pocket/shared/modules"
)

var (
_ typesP2P.RouterConfig = &baseConfig{}
_ typesP2P.RouterConfig = &UnicastRouterConfig{}
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
Expand All @@ -22,6 +32,14 @@ type baseConfig struct {
PeerstoreProvider providers.PeerstoreProvider
}

type UnicastRouterConfig struct {
Logger *modules.Logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first:

  1. Package exposed Logger in a struct
  2. Config without an associated proto

No problems/concerns, but wanted to understand if this is the approach we're taking with submodules?

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Package exposed Logger in a struct

The structs in this package are all config structs which are used to pass required parameters to constructor functions (most of which are submodule constructors). They must export their fields in order for consumers to be able to configure the unexported fields of the object under construction, via the respective constructor.

I could understand the argument that it may make more sense for these configs to live in same package as the constructor which they accompany; however, even then, these fields would still need to be exported for package-external consumers to use them (e.g. p2p package). The rationale behind the current code organization was to deduplicate baseConfig#IsValid() between RainTreeConfig and BackgroundConfig. To do this without creating an import cycle requires an additional package. At that point, this package seemed most appropriate for UnicastConfig to live here as well. I don't have a strong opinion about it, I just felt like this was the most readable and least complex option at the time.

  1. Config without an associated proto

RainTreeRouter and BackgroundRouter configs (also defined in this package) do not have an associated protobuf config type, nor do I think it would make sense for them to. In the specific cases of the routers, only one of the required parameters are serializable and it is (appropriately) derived from a P2PConfig field (Address). This makes it a poor candidate, in my view, to be converted into a protobuf type such that it can be embedded in the P2P config type; however, you've made me aware of the possibility of such a case and potential consistent integration approach. 🙌

mindmap
    root((Submodules))
        (options)
            ASSUMPTIONS
                (MUST be optional, effecting the submodules state and/or behavior)
        (configs)
            (Consideration:
            embedding submodule configs in respective module's protobuf config]
                ASSUMPTIONS
                    (Must be required for a submodule to function properly)
                    ("Multiple submodules may share a common dependency / parameter argument which MAY NOT be dependency injectable")
                        %% ("Initializaiton of such shared depencencies SHOULD be initialized and/or registered (in the case of submodules) in a parent module  in each submodule, ideally via some common and/or embedded implementation.")
                PROS
                    ("Provides clear and consistent convention with respect to how to structure configs in both modules and submodules")
                    ("Supports shared submodules; i.e. such configs could be aggregated with cardinality greater than 1 in one or more modules simultaneously without conflict")
                    ("Encourages usage of dependecy injection for submodules.")
                            ("For example, the P2P module currently sets up a peerstore provider and then passes it to router submdoules via their configs.
                            Since the providers (pstore & height) are submodules, the P2P SHOULD instead register them to the bus and let the routers dendency-inject them.")
                CONS
                    ("Protobuf config can only contain primitive/serializable types")
                        ("Functions and non-submodule, non-serializable dependencies become problematic")
                            ("For example, the P2P module currently sets up a libp2p host and passes it along with a reference to one of its methods to router submdoules via their configs.
                            Forcing submodules to rely only on seralizable configs seems insufficient.")
                ALTERNATIVES
                    (Support either and/or both; embed submodule-specific protobuf configs, where applicable, AND allow for an additional layer of abstraction to support passing high-level objects.]
                        (A protobuf config type MAY be embedded in the submodule's "config type")
                OPEN Qs
                    (Should submodules be constrained in whether they can consider a protobuf config type directly?)
        

Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. My tl;dr takeaway is:

  1. Separate package: share configs across RainTreeConfig and BackgroundConfig
  2. No protobuf: not exposed to the user / node runner.

Host host.Host
ProtocolID protocol.ID
MessageHandler typesP2P.MessageHandler
PeerHandler func(peer typesP2P.Peer) error
}

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Expand Down Expand Up @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) {
if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}
return nil
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *UnicastRouterConfig) IsValid() (err error) {
if cfg.Logger == nil {
err = multierr.Append(err, fmt.Errorf("logger not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.ProtocolID == "" {
err = multierr.Append(err, fmt.Errorf("protocol id not configured"))
}

if cfg.MessageHandler == nil {
err = multierr.Append(err, fmt.Errorf("message handler not configured"))
}

if cfg.PeerHandler == nil {
err = multierr.Append(err, fmt.Errorf("peer handler not configured"))
}
return err
}

Expand Down
147 changes: 47 additions & 100 deletions p2p/raintree/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package raintree

import (
"fmt"
"io"
"time"

libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/pokt-network/pocket/p2p/unicast"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
Expand All @@ -22,15 +20,9 @@ import (
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
telemetry "github.com/pokt-network/pocket/telemetry"
"github.com/pokt-network/pocket/telemetry"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var (
_ typesP2P.Router = &rainTreeRouter{}
_ modules.IntegratableModule = &rainTreeRouter{}
Expand All @@ -41,10 +33,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr

type rainTreeRouter struct {
base_modules.IntegratableModule
unicast.UnicastRouter

logger *modules.Logger
// handler is the function to call when a message is received.
handler typesP2P.RouterHandler
handler typesP2P.MessageHandler
// host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
Expand Down Expand Up @@ -84,7 +77,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
return nil, err
}

rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream)
return typesP2P.Router(rtr), nil
}

Expand Down Expand Up @@ -191,9 +183,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres
return nil
}

// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation
// if applicable. Returns the serialized `PocketEnvelope` data contained within.
func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope`
// bytes contained within, continues broadcast propagation, if applicable, and
// passes them off to the application by calling the configured `rtr.handler`.
// Intended to be called in a go routine.
func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight()
blockHeight := fmt.Sprintf("%d", blockHeightInt)

Expand All @@ -207,25 +201,37 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
)

var rainTreeMsg typesP2P.RainTreeMessage
if err := proto.Unmarshal(data, &rainTreeMsg); err != nil {
return nil, err
if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil {
return err
}

// TECHDEBT(#763): refactor as "pre-propagation validation"
networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil {
rtr.logger.Error().Err(err).Msg("Error decoding network message")
return nil, err
return err
}
// --
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

// Continue RainTree propagation
if rainTreeMsg.Level > 0 {
if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil {
return nil, err
return err
}
}

// Return the data back to the caller so it can be handled by the app specific bus
return rainTreeMsg.Data, nil
// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if rainTreeMsg.Data == nil {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
rtr.logger.Debug().Msg("no data in RainTree message")
return nil
}

// call configured handler to forward to app-specific bus
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
if err := rtr.handler(rainTreeMsg.Data); err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// GetPeerstore implements the respective member of `typesP2P.Router`.
Expand Down Expand Up @@ -270,94 +276,41 @@ func (rtr *rainTreeRouter) Size() int {
return rtr.peersManager.GetPeerstore().Size()
}

// handleStream ensures the peerstore contains the remote peer and then reads
// the incoming stream in a new go routine.
func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) {
rtr.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := rtr.AddPeer(peer); err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go rtr.readStream(stream)
// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
}

// readStream reads the incoming stream, extracts the serialized `PocketEnvelope`
// data from the incoming `RainTreeMessage`, and passes it to the application by
// calling the configured `rtr.handler`. Intended to be called in a go routine.
func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// `SetReadDeadline` not supported by `mocknet` streams.
rtr.logger.Error().Err(err).Msg("setting stream read deadline")
}

// log incoming stream
rtr.logStream(stream)

// read stream
rainTreeMsgBz, err := io.ReadAll(stream)
if err != nil {
rtr.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}
return
func (rtr *rainTreeRouter) setupUnicastRouter() error {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
ProtocolID: protocol.PoktProtocolID,
MessageHandler: rtr.handleRainTreeMsg,
PeerHandler: rtr.AddPeer,
}

// done reading; reset to signal this to remote peer
// NB: failing to reset the stream can easily max out the number of available
// network connections on the receiver's side.
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}

// extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation)
poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz)
unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
if err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
return
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if poktEnvelopeBz == nil {
return
}

// call configured handler to forward to app-specific bus
if err := rtr.handler(poktEnvelopeBz); err != nil {
rtr.logger.Error().Err(err).Msg("handling pocket envelope")
return fmt.Errorf("setting up unicast router: %w", err)
}
}

// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
rtr.UnicastRouter = *unicastRouter
return nil
}

func (rtr *rainTreeRouter) setupDependencies() error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight())
if err != nil {
return err
return fmt.Errorf("getting staked peerstore: %w", err)
}

if err := rtr.setupPeerManager(pstore); err != nil {
return err
return fmt.Errorf("setting up peer manager: %w", err)
}

if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil {
Expand All @@ -374,9 +327,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro
func (rtr *rainTreeRouter) getHostname() string {
return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
22 changes: 20 additions & 2 deletions p2p/raintree/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,30 @@

package raintree

import libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/regen-network/gocuke"

"github.com/pokt-network/pocket/internal/testutil"
typesP2P "github.com/pokt-network/pocket/p2p/types"
)

// RainTreeRouter exports `rainTreeRouter` for testing purposes.
type RainTreeRouter = rainTreeRouter

type routerHandlerProxyFactory = testutil.ProxyFactory[typesP2P.MessageHandler]

// HandleStream exports `rainTreeRouter#handleStream` for testing purposes.
func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) {
rtr.handleStream(stream)
rtr.UnicastRouter.HandleStream(stream)
}
func (rtr *rainTreeRouter) HandlerProxy(
t gocuke.TestingT,
handlerProxyFactory routerHandlerProxyFactory,
) {
t.Helper()

// pass original handler to proxy factory & replace it with the proxy
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
origHandler := rtr.handler
rtr.handler = handlerProxyFactory(origHandler)
}
2 changes: 1 addition & 1 deletion p2p/types/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Router interface {
RemovePeer(peer Peer) error
}

type RouterHandler func(data []byte) error
type MessageHandler func(data []byte) error

// RouterConfig is used to configure `Router` implementations and to test a
// given configuration's validity.
Expand Down
19 changes: 16 additions & 3 deletions p2p/raintree/logging.go → p2p/unicast/logging.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package raintree
package unicast

import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
Expand All @@ -7,8 +7,18 @@ import (
"github.com/pokt-network/pocket/p2p/utils"
)

// TECHDEBT(#830): it would be nice to have at least one more degree of freedom with which
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// to limit logging in areas where it is known to be excessive / high frequency.
// Especially applicable to debug log lines which only contribute in edge cases,
// unusual circumstances, or regressions (e.g. hitting OS resource limits because
// of too many concurrent streams).
//
// This could ultimately be actuated from the CLI via flags, configs, and/or env
// vars. Initially, we could consider coupling to a `--verbose` persistent flag.
//
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

// logStream logs the incoming stream and its scope stats
func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) {
func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) {
rtr.logStreamScopeStats(stream)

remotePeer, err := utils.PeerFromLibp2pStream(stream)
Expand All @@ -21,11 +31,14 @@ func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) {

// logStreamScopeStats logs the incoming stream's scope stats
// (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope)
func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) {
func (rtr *UnicastRouter) logStreamScopeStats(stream libp2pNetwork.Stream) {
if err := utils.LogScopeStatFactory(
&logger.Global.Logger,
"stream scope (read-side)",
)(stream.Scope()); err != nil {
rtr.logger.Debug().Err(err).Msg("logging stream scope stats")
}
}
func (rtr *UnicastRouter) getHostname() string {
return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname
}
Loading