Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] refactor: unicast router #844

Merged
merged 18 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"go.uber.org/multierr"
"github.com/pokt-network/pocket/shared/modules"
)

var (
_ typesP2P.RouterConfig = &baseConfig{}
_ typesP2P.RouterConfig = &UnicastRouterConfig{}
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
Expand All @@ -22,6 +32,14 @@ type baseConfig struct {
PeerstoreProvider providers.PeerstoreProvider
}

type UnicastRouterConfig struct {
Logger *modules.Logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first:

  1. Package exposed Logger in a struct
  2. Config without an associated proto

No problems/concerns, but wanted to understand if this is the approach we're taking with submodules?

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Package exposed Logger in a struct

The structs in this package are all config structs which are used to pass required parameters to constructor functions (most of which are submodule constructors). They must export their fields in order for consumers to be able to configure the unexported fields of the object under construction, via the respective constructor.

I could understand the argument that it may make more sense for these configs to live in same package as the constructor which they accompany; however, even then, these fields would still need to be exported for package-external consumers to use them (e.g. p2p package). The rationale behind the current code organization was to deduplicate baseConfig#IsValid() between RainTreeConfig and BackgroundConfig. To do this without creating an import cycle requires an additional package. At that point, this package seemed most appropriate for UnicastConfig to live here as well. I don't have a strong opinion about it, I just felt like this was the most readable and least complex option at the time.

  1. Config without an associated proto

RainTreeRouter and BackgroundRouter configs (also defined in this package) do not have an associated protobuf config type, nor do I think it would make sense for them to. In the specific cases of the routers, only one of the required parameters are serializable and it is (appropriately) derived from a P2PConfig field (Address). This makes it a poor candidate, in my view, to be converted into a protobuf type such that it can be embedded in the P2P config type; however, you've made me aware of the possibility of such a case and potential consistent integration approach. 🙌

mindmap
    root((Submodules))
        (options)
            ASSUMPTIONS
                (MUST be optional, effecting the submodules state and/or behavior)
        (configs)
            (Consideration:
            embedding submodule configs in respective module's protobuf config]
                ASSUMPTIONS
                    (Must be required for a submodule to function properly)
                    ("Multiple submodules may share a common dependency / parameter argument which MAY NOT be dependency injectable")
                        %% ("Initializaiton of such shared depencencies SHOULD be initialized and/or registered (in the case of submodules) in a parent module  in each submodule, ideally via some common and/or embedded implementation.")
                PROS
                    ("Provides clear and consistent convention with respect to how to structure configs in both modules and submodules")
                    ("Supports shared submodules; i.e. such configs could be aggregated with cardinality greater than 1 in one or more modules simultaneously without conflict")
                    ("Encourages usage of dependecy injection for submodules.")
                            ("For example, the P2P module currently sets up a peerstore provider and then passes it to router submdoules via their configs.
                            Since the providers (pstore & height) are submodules, the P2P SHOULD instead register them to the bus and let the routers dendency-inject them.")
                CONS
                    ("Protobuf config can only contain primitive/serializable types")
                        ("Functions and non-submodule, non-serializable dependencies become problematic")
                            ("For example, the P2P module currently sets up a libp2p host and passes it along with a reference to one of its methods to router submdoules via their configs.
                            Forcing submodules to rely only on seralizable configs seems insufficient.")
                ALTERNATIVES
                    (Support either and/or both; embed submodule-specific protobuf configs, where applicable, AND allow for an additional layer of abstraction to support passing high-level objects.]
                        (A protobuf config type MAY be embedded in the submodule's "config type")
                OPEN Qs
                    (Should submodules be constrained in whether they can consider a protobuf config type directly?)
        

Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. My tl;dr takeaway is:

  1. Separate package: share configs across RainTreeConfig and BackgroundConfig
  2. No protobuf: not exposed to the user / node runner.

Host host.Host
ProtocolID protocol.ID
MessageHandler typesP2P.MessageHandler
PeerHandler func(peer typesP2P.Peer) error
}

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Expand Down Expand Up @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) {
if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}
return nil
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *UnicastRouterConfig) IsValid() (err error) {
if cfg.Logger == nil {
err = multierr.Append(err, fmt.Errorf("logger not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.ProtocolID == "" {
err = multierr.Append(err, fmt.Errorf("protocol id not configured"))
}

if cfg.MessageHandler == nil {
err = multierr.Append(err, fmt.Errorf("message handler not configured"))
}

if cfg.PeerHandler == nil {
err = multierr.Append(err, fmt.Errorf("peer handler not configured"))
}
return err
}

Expand Down
15 changes: 15 additions & 0 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ type p2pModule struct {
identity libp2p.Option
listenAddrs libp2p.Option

// TECHDEBT(#810): register the providers to the module registry instead of
// holding a reference in the module struct and passing via router config.
//
// Assigned during creation via `#setupDependencies()`.
currentHeightProvider providers.CurrentHeightProvider
pstoreProvider providers.PeerstoreProvider
nonceDeduper *mempool.GenericFIFOSet[uint64, uint64]

// TECHDEBT(#810): register the routers to the module registry instead of
// holding a reference in the module struct. This will improve testability.
//
// Assigned during `#Start()`. TLDR; `host` listens on instantiation.
// and `router` depends on `host`.
router typesP2P.Router
Expand Down Expand Up @@ -252,6 +258,9 @@ func (m *p2pModule) setupPeerstoreProvider() error {
if !ok {
return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
}

// TECHDEBT(#810): register the provider to the module registry instead of
// holding a reference in the module struct and passing via router config.
m.pstoreProvider = pstoreProvider

return nil
Expand All @@ -260,6 +269,7 @@ func (m *p2pModule) setupPeerstoreProvider() error {
// setupCurrentHeightProvider attempts to retrieve the current height provider
// from the bus registry, falls back to the consensus module if none is registered.
func (m *p2pModule) setupCurrentHeightProvider() error {
// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
m.logger.Debug().Msg("setupCurrentHeightProvider")
currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName)
if err != nil {
Expand All @@ -276,6 +286,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error {
if !ok {
return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule)
}

// TECHDEBT(#810): register the provider to the module registry instead of
// holding a reference in the module struct and passing via router config.
m.currentHeightProvider = currentHeightProvider

return nil
Expand All @@ -294,6 +307,8 @@ func (m *p2pModule) setupNonceDeduper() error {

// setupRouter instantiates the configured router implementation.
func (m *p2pModule) setupRouter() (err error) {
// TECHDEBT(#810): register the router to the module registry instead of
// holding a reference in the module struct. This will improve testability.
m.router, err = raintree.NewRainTreeRouter(
m.GetBus(),
&config.RainTreeConfig{
Expand Down
148 changes: 48 additions & 100 deletions p2p/raintree/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package raintree

import (
"fmt"
"io"
"time"

libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/pokt-network/pocket/p2p/unicast"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
Expand All @@ -22,15 +20,9 @@ import (
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
telemetry "github.com/pokt-network/pocket/telemetry"
"github.com/pokt-network/pocket/telemetry"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var (
_ typesP2P.Router = &rainTreeRouter{}
_ modules.IntegratableModule = &rainTreeRouter{}
Expand All @@ -41,10 +33,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr

type rainTreeRouter struct {
base_modules.IntegratableModule
unicast.UnicastRouter

logger *modules.Logger
// handler is the function to call when a message is received.
handler typesP2P.RouterHandler
handler typesP2P.MessageHandler
// host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
Expand Down Expand Up @@ -84,7 +77,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
return nil, err
}

rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream)
return typesP2P.Router(rtr), nil
}

Expand Down Expand Up @@ -191,9 +183,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres
return nil
}

// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation
// if applicable. Returns the serialized `PocketEnvelope` data contained within.
func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope`
// bytes contained within, continues broadcast propagation, if applicable, and
// passes them off to the application by calling the configured `rtr.handler`.
// Intended to be called in a go routine.
func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight()
blockHeight := fmt.Sprintf("%d", blockHeightInt)

Expand All @@ -207,25 +201,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
)

var rainTreeMsg typesP2P.RainTreeMessage
if err := proto.Unmarshal(data, &rainTreeMsg); err != nil {
return nil, err
if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil {
return err
}

// TECHDEBT(#763): refactor as "pre-propagation validation"
networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil {
rtr.logger.Error().Err(err).Msg("Error decoding network message")
return nil, err
return err
}

// Continue RainTree propagation
if rainTreeMsg.Level > 0 {
if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil {
return nil, err
return err
}
}

// Return the data back to the caller so it can be handled by the app specific bus
return rainTreeMsg.Data, nil
// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if rainTreeMsg.Data == nil {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
rtr.logger.Debug().Msg("no data in RainTree message")
return nil
}

// Call configured message handler with the serialized `PocketEnvelope`.
if err := rtr.handler(rainTreeMsg.Data); err != nil {
return fmt.Errorf("handling raintree message: %w", err)
}
return nil
}

// GetPeerstore implements the respective member of `typesP2P.Router`.
Expand Down Expand Up @@ -254,6 +259,7 @@ func (rtr *rainTreeRouter) AddPeer(peer typesP2P.Peer) error {
return nil
}

// RemovePeer implements the respective member of `typesP2P.Router`.
func (rtr *rainTreeRouter) RemovePeer(peer typesP2P.Peer) error {
rtr.peersManager.HandleEvent(
typesP2P.PeerManagerEvent{
Expand All @@ -270,94 +276,42 @@ func (rtr *rainTreeRouter) Size() int {
return rtr.peersManager.GetPeerstore().Size()
}

// handleStream ensures the peerstore contains the remote peer and then reads
// the incoming stream in a new go routine.
func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) {
rtr.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := rtr.AddPeer(peer); err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go rtr.readStream(stream)
// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
}

// readStream reads the incoming stream, extracts the serialized `PocketEnvelope`
// data from the incoming `RainTreeMessage`, and passes it to the application by
// calling the configured `rtr.handler`. Intended to be called in a go routine.
func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// `SetReadDeadline` not supported by `mocknet` streams.
rtr.logger.Error().Err(err).Msg("setting stream read deadline")
}

// log incoming stream
rtr.logStream(stream)

// read stream
rainTreeMsgBz, err := io.ReadAll(stream)
if err != nil {
rtr.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}
return
}

// done reading; reset to signal this to remote peer
// NB: failing to reset the stream can easily max out the number of available
// network connections on the receiver's side.
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
// setupUnicastRouter configures and assigns `rtr.UnicastRouter`.
func (rtr *rainTreeRouter) setupUnicastRouter() error {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
ProtocolID: protocol.PoktProtocolID,
MessageHandler: rtr.handleRainTreeMsg,
PeerHandler: rtr.AddPeer,
}

// extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation)
poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz)
unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
if err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
return
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if poktEnvelopeBz == nil {
return
}

// call configured handler to forward to app-specific bus
if err := rtr.handler(poktEnvelopeBz); err != nil {
rtr.logger.Error().Err(err).Msg("handling pocket envelope")
return fmt.Errorf("setting up unicast router: %w", err)
}
}

// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
rtr.UnicastRouter = *unicastRouter
return nil
}

func (rtr *rainTreeRouter) setupDependencies() error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight())
if err != nil {
return err
return fmt.Errorf("getting staked peerstore: %w", err)
}

if err := rtr.setupPeerManager(pstore); err != nil {
return err
return fmt.Errorf("setting up peer manager: %w", err)
}

if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil {
Expand All @@ -374,9 +328,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro
func (rtr *rainTreeRouter) getHostname() string {
return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
6 changes: 4 additions & 2 deletions p2p/raintree/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

package raintree

import libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
)

// RainTreeRouter exports `rainTreeRouter` for testing purposes.
type RainTreeRouter = rainTreeRouter

// HandleStream exports `rainTreeRouter#handleStream` for testing purposes.
func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) {
rtr.handleStream(stream)
rtr.UnicastRouter.HandleStream(stream)
}
Loading