Skip to content

Commit

Permalink
refactor: unicast router
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jun 19, 2023
1 parent 9d5dffe commit 30e94a6
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 136 deletions.
3 changes: 3 additions & 0 deletions internal/testutil/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package testutil

type ProxyFactory[T any] func(target T) (proxy T)
44 changes: 43 additions & 1 deletion p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"go.uber.org/multierr"
"github.com/pokt-network/pocket/shared/modules"
)

var (
_ typesP2P.RouterConfig = &baseConfig{}
_ typesP2P.RouterConfig = &UnicastRouterConfig{}
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
Expand All @@ -22,6 +32,14 @@ type baseConfig struct {
PeerstoreProvider providers.PeerstoreProvider
}

type UnicastRouterConfig struct {
Logger *modules.Logger
Host host.Host
ProtocolID protocol.ID
MessageHandler typesP2P.MessageHandler
PeerHandler func(peer typesP2P.Peer) error
}

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Expand Down Expand Up @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) {
if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}
return nil
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *UnicastRouterConfig) IsValid() (err error) {
if cfg.Logger == nil {
err = multierr.Append(err, fmt.Errorf("logger not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.ProtocolID == "" {
err = multierr.Append(err, fmt.Errorf("protocol id not configured"))
}

if cfg.MessageHandler == nil {
err = multierr.Append(err, fmt.Errorf("message handler not configured"))
}

if cfg.PeerHandler == nil {
err = multierr.Append(err, fmt.Errorf("peer handler not configured"))
}
return err
}

Expand Down
31 changes: 0 additions & 31 deletions p2p/raintree/logging.go

This file was deleted.

147 changes: 46 additions & 101 deletions p2p/raintree/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package raintree

import (
"fmt"
"io"
"time"

libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/pokt-network/pocket/p2p/unicast"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
Expand All @@ -22,15 +19,9 @@ import (
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
telemetry "github.com/pokt-network/pocket/telemetry"
"github.com/pokt-network/pocket/telemetry"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var (
_ typesP2P.Router = &rainTreeRouter{}
_ modules.IntegratableModule = &rainTreeRouter{}
Expand All @@ -41,10 +32,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr

type rainTreeRouter struct {
base_modules.IntegratableModule
unicast.UnicastRouter

logger *modules.Logger
// handler is the function to call when a message is received.
handler typesP2P.RouterHandler
handler typesP2P.MessageHandler
// host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
Expand Down Expand Up @@ -84,7 +76,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
return nil, err
}

rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream)
return typesP2P.Router(rtr), nil
}

Expand Down Expand Up @@ -191,9 +182,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres
return nil
}

// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation
// if applicable. Returns the serialized `PocketEnvelope` data contained within.
func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope`
// bytes contained within, continues broadcast propagation, if applicable, and
// passes them off to the application by calling the configured `rtr.handler`.
// Intended to be called in a go routine.
func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight()
blockHeight := fmt.Sprintf("%d", blockHeightInt)

Expand All @@ -207,25 +200,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
)

var rainTreeMsg typesP2P.RainTreeMessage
if err := proto.Unmarshal(data, &rainTreeMsg); err != nil {
return nil, err
if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil {
return err
}

// TECHDEBT(#763): refactor as "pre-propagation validation"
networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil {
rtr.logger.Error().Err(err).Msg("Error decoding network message")
return nil, err
return err
}
// --

// Continue RainTree propagation
if rainTreeMsg.Level > 0 {
if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil {
return nil, err
return err
}
}

// Return the data back to the caller so it can be handled by the app specific bus
return rainTreeMsg.Data, nil
// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if rainTreeMsg.Data == nil {
return nil
}

// call configured handler to forward to app-specific bus
if err := rtr.handler(rainTreeMsg.Data); err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
}
return nil
}

// GetPeerstore implements the respective member of `typesP2P.Router`.
Expand Down Expand Up @@ -270,94 +274,41 @@ func (rtr *rainTreeRouter) Size() int {
return rtr.peersManager.GetPeerstore().Size()
}

// handleStream ensures the peerstore contains the remote peer and then reads
// the incoming stream in a new go routine.
func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) {
rtr.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := rtr.AddPeer(peer); err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go rtr.readStream(stream)
// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
}

// readStream reads the incoming stream, extracts the serialized `PocketEnvelope`
// data from the incoming `RainTreeMessage`, and passes it to the application by
// calling the configured `rtr.handler`. Intended to be called in a go routine.
func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// `SetReadDeadline` not supported by `mocknet` streams.
rtr.logger.Error().Err(err).Msg("setting stream read deadline")
func (rtr *rainTreeRouter) setupUnicastRouter() error {
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
ProtocolID: protocol.PoktProtocolID,
MessageHandler: rtr.handleRainTreeMsg,
PeerHandler: rtr.AddPeer,
}

// log incoming stream
rtr.logStream(stream)

// read stream
rainTreeMsgBz, err := io.ReadAll(stream)
unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
if err != nil {
rtr.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}
return
}

// done reading; reset to signal this to remote peer
// NB: failing to reset the stream can easily max out the number of available
// network connections on the receiver's side.
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}

// extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation)
poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz)
if err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
return
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if poktEnvelopeBz == nil {
return
}

// call configured handler to forward to app-specific bus
if err := rtr.handler(poktEnvelopeBz); err != nil {
rtr.logger.Error().Err(err).Msg("handling pocket envelope")
return fmt.Errorf("setting up unicast router: %w", err)
}
}

// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
rtr.UnicastRouter = *unicastRouter
return nil
}

func (rtr *rainTreeRouter) setupDependencies() error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight())
if err != nil {
return err
return fmt.Errorf("getting staked peerstore: %w", err)
}

if err := rtr.setupPeerManager(pstore); err != nil {
return err
return fmt.Errorf("setting up peer manager: %w", err)
}

if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil {
Expand All @@ -374,9 +325,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro
func (rtr *rainTreeRouter) getHostname() string {
return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
22 changes: 20 additions & 2 deletions p2p/raintree/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,30 @@

package raintree

import libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/regen-network/gocuke"

"github.com/pokt-network/pocket/internal/testutil"
typesP2P "github.com/pokt-network/pocket/p2p/types"
)

// RainTreeRouter exports `rainTreeRouter` for testing purposes.
type RainTreeRouter = rainTreeRouter

type routerHandlerProxyFactory = testutil.ProxyFactory[typesP2P.MessageHandler]

// HandleStream exports `rainTreeRouter#handleStream` for testing purposes.
func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) {
rtr.handleStream(stream)
rtr.UnicastRouter.HandleStream(stream)
}
func (rtr *rainTreeRouter) HandlerProxy(
t gocuke.TestingT,
handlerProxyFactory routerHandlerProxyFactory,
) {
t.Helper()

// pass original handler to proxy factory & replace it with the proxy
origHandler := rtr.handler
rtr.handler = handlerProxyFactory(origHandler)
}
2 changes: 1 addition & 1 deletion p2p/types/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Router interface {
RemovePeer(peer Peer) error
}

type RouterHandler func(data []byte) error
type MessageHandler func(data []byte) error

// RouterConfig is used to configure `Router` implementations and to test a
// given configuration's validity.
Expand Down
Loading

0 comments on commit 30e94a6

Please sign in to comment.