diff --git a/Makefile b/Makefile
index b60870525..888559ad4 100644
--- a/Makefile
+++ b/Makefile
@@ -301,7 +301,7 @@ protogen_local: go_protoc-go-inject-tag ## Generate go structures for all of the
$(PROTOC_SHARED) -I=./consensus/types/proto --go_out=./consensus/types ./consensus/types/proto/*.proto
# P2P
- $(PROTOC_SHARED) -I=./p2p/raintree/types/proto --go_out=./p2p/types ./p2p/raintree/types/proto/*.proto
+ $(PROTOC_SHARED) -I=./p2p/types/proto --go_out=./p2p/types ./p2p/types/proto/*.proto
# echo "View generated proto files by running: make protogen_show"
diff --git a/p2p/README.md b/p2p/README.md
index 8263cd563..cde1a155c 100644
--- a/p2p/README.md
+++ b/p2p/README.md
@@ -5,11 +5,15 @@ This document is meant to be a supplement to the living specification of [1.0 Po
## Table of Contents
- [Definitions](#definitions)
-- [Interface](#interface)
-- [Implementation](#implementation)
- - [Code Architecture - P2P Module](#code-architecture---p2p-module)
- - [Code Architecture - Network Module](#code-architecture---network-module)
- - [Code Organization](#code-organization)
+- [Interface & Integration](#interface--integration)
+- [Module Architecture](#module-architecture)
+ - [Architecture Design Language](#architecture-design-language)
+ - [Legends](#legends)
+ - [P2P Module / Router Decoupling](#p2p-module--router-decoupling)
+ - [Message Propagation & Handling](#message-propagation--handling)
+ - [Message Deduplication](#message-deduplication)
+ - [Peer Discovery](#peer-discovery)
+ - [Code Organization](#code-organization)
- [Testing](#testing)
- [Running Unit Tests](#running-unit-tests)
- [RainTree testing framework](#raintree-testing-framework)
@@ -34,14 +38,11 @@ A structured "gossip" protocol (and implementation) which uses the raintree algo
A "gossip" protocol (implementation TBD) which facilitates "gossip" to all P2P participants, including non-staked actors (e.g. full-nodes).
-## Interface
+## Interface & Integration
-This module aims to implement the interface specified in `pocket/shared/modules/p2p_module.go` using the specification above.
-
-## Implementation
-
-### P2P Module Architecture
+This module aims to implement the interface specified in [`pocket/shared/modules/p2p_module.go`](../shared/modules/p2p_module.go).
+_(TODO: diagram legend)_
```mermaid
flowchart TD
subgraph P2P["P2P Module"]
@@ -67,105 +68,508 @@ flowchart TD
class PN pocket_network
```
-`Routers` is where [RainTree](https://github.com/pokt-network/pocket/files/9853354/raintree.pdf) (or the simpler basic approach) is implemented. See `raintree/router.go` for the specific implementation of RainTree, but please refer to the [specifications](https://github.com/pokt-network/pocket-network-protocol/tree/main/p2p) for more details.
+`Routers` is where [RainTree](https://github.com/pokt-network/pocket/files/9853354/raintree.pdf) is implemented.
+See [`raintree/router.go`](./raintree/router.go) for the specific implementation of RainTree, but please refer to the [specifications](https://github.com/pokt-network/pocket-network-protocol/tree/main/p2p) for more details.
-### Raintree Router Architecture
+## Module Architecture
+
+_(TODO: move "arch. design lang." & "legends" sections into `shared` to support common usage)_
-_DISCUSS(team): If you feel this needs a diagram, please reach out to the team for additional details._
-_TODO(olshansky, BenVan): Link to RainTree visualizations once it is complete._
+### Architecture Design Language
-### Message Propagation
+The architecture design language expressed in this documentation is based on [UML](https://www.uml-diagrams.org/).
+Due to limitations in the current version of mermaid, class diagrams are much more adherant to the UML component specification.
+Component diagrams however are much more loosely inspired by their UML counterparts.
-Given `Local P2P Module` has a message that it needs to propagate:
+Regardless, each architecture diagram should be accompanied by a legend which covers all the design language features used to provide disambiguation.
-
- - 1a.
Raintree Router
selects targets from the Pokt Peerstore
, which only includes staked actors
- - 1b.
Background Router
selects targets from the libp2p Peerstore
, which includes all P2P participants
- - 2. Libp2p
Host
manages opening and closing streams to targeted peers
- - 3.
Remote P2P module
's (i.e. receiver's) handleStream
is called (having been registered via setStreamHandler()
)
- - 4a.
handleStream
propagates message via Raintree Router
- - 4b.
handleStream
propagates message via Background Router
- - 5a. Repeat step 1a from
Remote P2P Module
's perspective targeting its next peers
- - 5b. Repeat step 1b from
Remote P2P Module
's perspective targeting its next peers
-
+References:
+- [Class Diagrams](https://www.uml-diagrams.org/class-diagrams-overview.html)
+- [Component Diagrams](https://www.uml-diagrams.org/component-diagrams.html)
+
+ _NOTE: mermaid does not support ports, interfaces, ... in component diagrams ("flowcharts)._
+
+### Legends
```mermaid
-flowchart TD
- subgraph lMod[Local P2P Module]
- subgraph lHost[Libp2p `Host`]
+flowchart
+subgraph Legend
+ m[[`Method`]]
+ c[Component]
+
+ m -- "unconditional usage" --> c
+ m -. "conditional usage" .-> c
+ m -. "ignored" .-x c
+end
+```
+
+```mermaid
+classDiagram
+class ConcreteType {
+ +ExportedField
+ -unexportedField
+ +ExportedMethod(...argTypes) (...returnTypes)
+ -unexportedMethod(...argTypes) (...returnTypes)
+}
+
+class InterfaceType {
+ <>
+ +Method(...argTypes) (...returnTypes)
+}
+
+ConcreteType --|> InterfaceType : Interface realization
+
+ConcreteType --> OtherType : Direct usage
+ConcreteType --o OtherType : Composition
+ConcreteType --* OtherType : Aggregatation
+ConcreteType ..* "(cardinality)" OtherType : Indirect (via interface)
+```
+
+#### Interface Realization
+
+_TL;DR An instance (i.e. client) implements the associated interface (i.e. supplierl)._
+
+> Realization is a specialized abstraction relationship between two sets of model elements, one representing a specification (the supplier) and the other represents an implementation of the latter (the client).
+
+> Realization can be used to model stepwise refinement, optimizations, transformations, templates, model synthesis, framework composition, etc.
+
+_(see: [UML Realization](https://www.uml-diagrams.org/realization.html))_
+
+#### Direct Usage
+
+_TL;DR one instance (i.e. client) is dependent the associated instance(s) (i.e. supplier) to function properly._
+
+> Dependency is a directed relationship which is used to show that some UML element or a set of elements requires, needs or depends on other model elements for specification or implementation. Because of this, dependency is called a supplier - client relationship, where supplier provides something to the client, and thus the client is in some sense incomplete while semantically or structurally dependent on the supplier element(s). Modification of the supplier may impact the client elements.
+
+> Usage is a dependency in which one named element (client) requires another named element (supplier) for its full definition or implementation.
+
+_(see: [UML Dependency](https://www.uml-diagrams.org/dependency.html))_
+
+#### Composition
+
+_TL;DR deleting an instance also deletes the associated instance(s)._
+
+> A "strong" form of aggregation
+
+> If a composite (whole) is deleted, all of its composite parts are "normally" deleted with it.
+
+_(see: [UML Shared composition](https://www.uml-diagrams.org/composition.html))_
+
+#### Aggregation
+
+
+_TL;DR deleting an instance does not necessarily delete the associated instance(s)._
+
+> A "weak" form of aggregation
+
+> Shared part could be included in several composites, and if some or all of the composites are deleted, shared part may still exist.
+
+_(see: [UML Shared aggregation](https://www.uml-diagrams.org/aggregation.html))_
+
+#### Cardinality
+
+_TL;DR indicates a number, or range of instances associated (i.e. supplier(s))_
+
+Cardinality indicates the number or range of simultaneous instances of supplier that are associated with the client.
+Applicable to multiple association types.
+Can be expressed arbitrarily (e.g. wildcards, variable, equation, etc.)
+
+_(see: [UML Association](https://www.uml-diagrams.org/association.html#association-end))_
+
+
+### P2P Module / Router Decoupling
+
+The P2P module encapsulates the `RainTreeRouter` and `BackgroundRouter` submodules.
+The P2P module internally refers to these as the `stakedActorRouter` and `unstakedActorRouter`, respectively.
+
+Depending on the necessary routing scheme (unicast / broadcast) and whether the peers involved are staked actors, a node will use one or both of these routers.
+
+**Unicast**
+
+| Sender | Receiver | Router | Example Usage |
+|----------------|----------------|-----------------|----------------------------------------------------------------------|
+| Staked Actor | Staked Actor | Raintree only | Consensus hotstuff messages (validators only) & state sync responses |
+| Staked Actor | Untaked Actor | Background only | Consensus state sync responses |
+| Unstaked Actor | Staked Actor | Background only | Consensus state sync responses, debug messages |
+| Unstaked Actor | Unstaked Actor | Background only | Consensus state sync responses, debug messages |
+
+**Broadcast**
+
+| Broadcaster | Receiver | Router | Example Usage |
+|----------------|----------------|-----------------------|-----------------------------------------------------------------|
+| Staked Actor | Staked Actor | Raintree + Background | Utility tx messages, consensus state sync requests |
+| Staked Actor | Untaked Actor | Background only | Utility tx messages (redundancy), consensus state sync requests |
+| Unstaked Actor | Staked Actor | Background only | Utility tx messages (redundancy), consensus state sync requests |
+| Unstaked Actor | Unstaked Actor | Background only | Utility tx messages, consensus state sync requests |
+
+Both router submodule implementations embed a `UnicastRouter` which enables them to send and receive messages directly to/from a single peer.
+
+**Class Diagram**
+
+```mermaid
+classDiagram
+ class p2pModule {
+ -stakedActorRouter Router
+ -unstakedActorRouter Router
+ -handlePocketEnvelope([]byte) error
+ }
+
+ class P2PModule {
+ <>
+ GetAddress() (Address, error)
+ HandleEvent(*anypb.Any) error
+ Send([]byte, Address) error
+ Broadcast([]byte) error
+ }
+ p2pModule --|> P2PModule
+
+ class RainTreeRouter {
+ UnicastRouter
+ -handler MessageHandler
+ +Broadcast([]byte) error
+ -handleRainTreeMsg([]byte) error
+ }
+
+ class BackgroundRouter {
+ UnicastRouter
+ -handler MessageHandler
+ +Broadcast([]byte) error
+ -handleBackgroundMsg([]byte) error
+ -readSubscription(subscription *pubsub.Subscription)
+ }
+
+ class UnicastRouter {
+ -messageHandler MessageHandler
+ -peerHandler PeerHandler
+ +Send([]byte, Address) error
+ -handleStream(libp2pNetwork.Stream)
+ -readStream(libp2pNetwork.Stream)
+ }
+ RainTreeRouter --* UnicastRouter : (embedded)
+ BackgroundRouter --* UnicastRouter : (embedded)
+
+ p2pModule --o "2" Router
+ p2pModule ..* RainTreeRouter : (`stakedActorRouter`)
+ p2pModule ..* BackgroundRouter : (`unstakedActorRouter`)
+
+ class Router {
+ <>
+ +Send([]byte, Address) error
+ +Broadcast([]byte) error
+ }
+ BackgroundRouter --|> Router
+ RainTreeRouter --|> Router
+```
+
+### Message Propagation & Handling
+
+**Unicast**
+
+```mermaid
+flowchart
+ subgraph lp2p["Local P2P Module (outgoing)"]
+ lps[[`Send`]]
+ lps -. "(iff local & remote peer are staked)" ..-> lrtu
+ lps -. "(if local or remote peer are not staked)" .-> lbgu
+
+ lbgu -- "opens stream\nto target peer" ---> lhost
+
+ lhost[Libp2p Host]
+
+ subgraph lrt[RainTree Router]
+ subgraph lRTPS[Raintree Peerstore]
+ lStakedPS([staked actors only])
+ end
+
+ lrtu[UnicastRouter]
+
+ lrtu -- "network address lookup" --> lRTPS
+ end
+
+ lrtu -- "opens a stream\nto target peer" ---> lhost
+
+ subgraph lbg[Background Router]
+ lbgu[UnicastRouter]
+ subgraph lBGPS[Background Peerstore]
+ lNetPS([all P2P participants])
+ end
+
+ lbgu -- "network address lookup" --> lBGPS
+ end
+ end
+
+ subgraph rp2p["Remote P2P Module (incoming)"]
+ rhost[Libp2p Host]
+
+ subgraph rrt[RainTree Router]
+ rrth[[`RainTreeMessage` Handler]]
+ rrtu[UnicastRouter]
+ end
+
+ subgraph rbg[Background Router]
+ rbgh[[`BackgroundMessage` Handler]]
+ rbgu[UnicastRouter]
+ rbgu --> rbgh
+ end
+
+ rp2ph[[`PocketEnvelope` Handler]]
+ rbus[bus]
+ rhost -. "new stream" .-> rrtu
+ rhost -- "new subscription message" --> rbgu
+ rrtu --> rrth
+
+ rnd[Nonce Deduper]
+ rp2ph -- "deduplicate msg mempool" --> rnd
end
- subgraph lRT[Raintree Router]
+
+
+ rp2ph -. "(iff not duplicate msg)\npublish event" .-> rbus
+
+ rrth --> rp2ph
+ rbgh --> rp2ph
+
+ lhost --> rhost
+```
+
+**Broadcast**
+
+```mermaid
+flowchart
+ subgraph lp2p["Local P2P Module (outgoing)"]
+ lpb[[`Broadcast`]]
+ lpb -. "(iff local & remote peer are staked)" ..-> lrtu
+ lpb -- "(always)" --> lbggt
+
+ lbggt -- "msg published\n(gossipsub protocol)" ---> lhost
+
+ lhost[Libp2p Host]
+
+ subgraph lrt[RainTree Router]
subgraph lRTPS[Raintree Peerstore]
lStakedPS([staked actors only])
end
-
- subgraph lPM[PeerManager]
- end
- lPM --> lRTPS
+
+ lrtu[UnicastRouter]
+
+ lrtu -- "network address lookup" --> lRTPS
end
+
+ lrtu -- "opens a stream\nto target peer" ---> lhost
- subgraph lBG[Background Router]
+ subgraph lbg[Background Router]
+ lbggt[Gossipsub Topic]
subgraph lBGPS[Background Peerstore]
lNetPS([all P2P participants])
end
+
+ lbggt -- "network address lookup" --> lBGPS
+ end
+ end
- subgraph lGossipSub[GossipSub]
- end
+ subgraph rp2p["Remote P2P Module (incoming)"]
+ rhost[Libp2p Host]
- subgraph lDHT[Kademlia DHT]
- end
+ subgraph rrt[RainTree Router]
+ rrth[[`RainTreeMessage` Handler]]
+ rrtu[UnicastRouter]
+ end
- lGossipSub --> lBGPS
- lDHT --> lBGPS
+ subgraph rbg[Background Router]
+ rbgh[[`BackgroundMessage` Handler]]
+ rbgg[Gossipsub Subscription]
+ rbggt[Gossipsub Topic]
+ rbgg --> rbgh
+ rbgh -- "(background msg\npropagation cont.)" ---> rbggt
end
- lRT --1a--> lHost
- lBG --1b--> lHost
+ rp2ph[[`PocketEnvelope` Handler]]
+ rbus[bus]
+ rhost -. "new stream" ..-> rrtu
+ rhost -- "new subscription message" --> rbgg
+ rbggt -- "(background msg\npropagation cont.)" --> rhost
+ rrtu --> rrth
+ rrth -. "(iff level > 0)\n(raintree msg\npropagation cont.)" .-> rrtu
+ rrtu -- "(raintree msg\npropagation cont.)" --> rhost
+
+ rnd[Nonce Deduper]
+ rp2ph -- "deduplicate msg mempool" --> rnd
end
-subgraph rMod[Remote P2P Module]
-subgraph rHost[Libp2p `Host`]
-end
-subgraph rRT[Raintree Router]
-subgraph rPS[Raintree Peerstore]
-rStakedPS([staked actors only])
-end
-subgraph rPM[PeerManager]
-end
+ rp2ph -. "(iff not duplicate msg)\npublish event" .-> rbus
-rPM --> rStakedPS
-end
+ rrth --> rp2ph
+ rbgh --> rp2ph
-subgraph rBG[Background Router]
-subgraph rBGPS[Background Peerstore]
-rNetPS([all P2P participants])
-end
+ lhost --> rhost
+```
-subgraph rGossipSub[GossipSub]
-end
+### Message Deduplication
-subgraph rDHT[Kademlia DHT]
-end
+Messages MUST be deduplicated before broadcasting their respective event over the bus since it is expected that nodes will receive duplicate messages (for multiple reasons).
-rGossipSub --> rBGPS
-rDHT --> rBGPS
-end
+The responsibility of deduplication is encapsulated by the P2P module, As such duplicate messages may come from multiple routers in some of these scenarios.
-rHost -. "3 (setStreamHandler())" .-> hs[[handleStream]]
+The `NondeDeduper` state is not persisted outside of memory and therefore is cleared during node restarts.
-hs --4a--> rRT
-hs --4b--> rBG
-rBG --"5a (cont. propagation)"--> rHost
-linkStyle 11 stroke:#ff3
-rRT --"5b (cont. propagation)"--> rHost
-linkStyle 12 stroke:#ff3
-end
+```mermaid
+classDiagram
+ class RainTreeMessage {
+ <>
+ +Level uint32
+ +Data []byte
+ }
+
+ class BackgroundMessage {
+ <>
+ +Data []byte
+ }
+
+ class PocketEnvelope {
+ <>
+ +Content *anypb.Any
+ +Nonce uint64
+ }
+
+ RainTreeMessage --* PocketEnvelope : serialized as `Data`
+ BackgroundMessage --* PocketEnvelope : serialized as `Data`
+
+
+ class p2pModule {
+ -handlePocketEnvelope([]byte) error
+ }
+
+ class P2PModule {
+ <>
+ GetAddress() (Address, error)
+ HandleEvent(*anypb.Any) error
+ Send([]byte, address Address) error
+ Broadcast([]byte) error
+ }
+ p2pModule --|> P2PModule
+
+ class RainTreeRouter {
+ UnicastRouter
+ -handler MessageHandler
+ +Broadcast([]byte) error
+ -handleRainTreeMsg([]byte) error
+ }
+
+ class NonceDeduper {
+ Push(Nonce) error
+ Contains(Nonce) bool
+ }
+
+ class Bus {
+ <>
+ PublishEventToBus(PocketEnvelope)
+ GetBusEvent() PocketEnvelope
+ }
+ p2pModule --> Bus
+
+ class BackgroundRouter {
+ UnicastRouter
+ -handler MessageHandler
+ +Broadcast([]byte) error
+ -handleBackgroundMsg([]byte) error
+ -readSubscription(subscription *pubsub.Subscription)
+ }
+
+ class UnicastRouter {
+ -messageHandler MessageHandler
+ -peerHandler PeerHandler
+ +Send([]byte, address Address) error
+ -handleStream(stream libp2pNetwork.Stream)
+ -readStream(stream libp2pNetwork.Stream)
+ }
+ RainTreeRouter --* UnicastRouter : (embedded)
+ BackgroundRouter --* UnicastRouter : (embedded)
+
+ p2pModule ..* RainTreeRouter
+ RainTreeRouter --o RainTreeMessage
+
+ p2pModule ..* BackgroundRouter
+ BackgroundRouter --o BackgroundMessage
+
+ p2pModule --o PocketEnvelope
+ p2pModule --* NonceDeduper
+```
+
+#### Configuration
+
+The size of the `NonceDeduper` queue is configurable via the `P2PConfig.MaxNonces` field.
-lHost --2--> rHost
+### Peer Discovery
+
+Peer discovery involves pairing peer IDs to their network addresses (multiaddr).
+This pairing always has an associated TTL (time-to-live), near the end of which it must
+be refreshed.
+
+In the background gossip overlay network (`backgroundRouter`), peers will re-advertise themselves every 3 hours through their TTL (see: [`RoutingDiscovery#Advertise()`](https://github.com/libp2p/go-libp2p/blob/87c2561238cb0340ddb182c61be8dbbc7a12a780/p2p/discovery/routing/routing.go#L34) and [`ProviderManager#AddProvider()`](https://github.com/libp2p/go-libp2p-kad-dht/blob/v0.24.2/providers/providers_manager.go#L255)).
+This refreshes the libp2p peerstore automatically.
+
+In the raintree gossip overlay network (`raintreeRouter`), the libp2p peerstore is **NOT** currently refreshed _(TODO: [#859](https://github.com/pokt-network/network/isues/859))_.
+
+```mermaid
+flowchart TD
+ subgraph bus
+ end
+
+ subgraph pers[Persistence Module]
+ end
+
+ subgraph cons[Consensus Module]
+ end
+
+ cons -- "(staked actor set changed)\npublish event" --> bus
+ bus --> rPM
+ rPM -- "get staked actors\nat current height" --> pers
+
+ subgraph p2p["P2P Module"]
+ host[Libp2p Host]
+ host -- "incoming\nraintree message" --> rtu
+ host -- "incoming\nbackground message" --> bgu
+ host -- "incoming\ntopic message" --> bgr
+ host -- "DHT peer discovery" --> rDHT
+
+ subgraph rt[RainTree Router]
+ subgraph rPS[Raintree Peerstore]
+ rStakedPS([staked actors only])
+ end
+
+ subgraph rPM[PeerManager]
+ end
+
+ rtu[UnicastRouter]
+
+ rPM -- "synchronize\n(add/remove)" --> rPS
+ rtu -. "(no discovery)" .-x rPS
+ end
+
+ subgraph bg[Background Router]
+ subgraph rBGPS[Background Peerstore]
+ rNetPS([all P2P participants])
+ end
+
+ subgraph bgr[GossipSub Topic\nSubscription]
+ end
+
+ subgraph rDHT[Kademlia DHT]
+ end
+
+ bgu -- "add if new" --> rBGPS
+ bgr -- "add if new" --> rBGPS
+ rDHT -- "continuous import" --> rBGPS
+
+ bgu[UnicastRouter]
+ end
+
+ end
```
-The `Network Module` is where [RainTree](https://github.com/pokt-network/pocket/files/9853354/raintree.pdf) (or the simpler basic approach) is implemented. See `raintree/network.go` for the specific implementation of RainTree, but please refer to the [specifications](https://github.com/pokt-network/pocket-network-protocol/tree/main/p2p) for more details.
+### Raintree Router Architecture
+
+_NOTE: If you (the reader) feel this needs a diagram, please reach out to the team for additional details._
### Code Organization
@@ -177,6 +581,8 @@ p2p
│ └── router_test.go # `BackgroundRouter` functional tests
├── bootstrap.go # `p2pModule` bootstrap related method(s)
├── CHANGELOG.md
+├── config
+│ └── config.go
├── event_handler.go
├── module.go # `p2pModule` definition
├── module_raintree_test.go # `p2pModule` & `RainTreeRouter` functional tests (routing)
@@ -189,21 +595,18 @@ p2p
│ ├── peerstore_provider
│ └── providers.go
├── raintree
-│ ├── nonce_deduper.go
-│ ├── nonce_deduper_test.go
│ ├── peers_manager.go # `rainTreePeersManager` implementation of `PeersManager` interface
│ ├── peers_manager_test.go
│ ├── peerstore_utils.go # Raintree routing helpers
│ ├── router.go # `RainTreeRouter` implementation of `Router` interface
│ ├── router_test.go # `RainTreeRouter` functional tests
│ ├── target.go # `target` definition
-│ ├── types
-│ │ └── proto
-│ │ └── raintree.proto
+│ ├── testutil.go
│ └── utils_test.go
-├── README.md
+├── testutil.go
├── transport_encryption_test.go # Libp2p transport security integration test
├── types
+│ ├── background.pb.go
│ ├── errors.go
│ ├── libp2p_mocks.go
│ ├── mocks
@@ -213,12 +616,18 @@ p2p
│ ├── peerstore.go # `Peerstore` interface & `PeerAddrMap` implementation definitions
│ ├── peers_view.go # `PeersView` interface & `sortedPeersView` implementation definitions
│ ├── peers_view_test.go
+│ ├── proto
│ ├── raintree.pb.go
│ └── router.go # `Router` interface definition
+├── unicast
+│ ├── logging.go
+│ ├── router.go
+│ └── testutil.go
├── utils
-│ ├── config.go # `RouterConfig` definition
│ ├── host.go # Helpers for working with libp2p hosts
│ ├── logging.go # Helpers for logging
+│ ├── nonce_deduper.go
+│ ├── nonce_deduper_test.go
│ ├── peer_conversion.go # Helpers for converting between "native" and libp2p peer representations
│ ├── url_conversion.go # Helpers for converting between "native" and libp2p network address representations
│ └── url_conversion_test.go
diff --git a/p2p/background/router.go b/p2p/background/router.go
index 199190b21..c3770368f 100644
--- a/p2p/background/router.go
+++ b/p2p/background/router.go
@@ -9,12 +9,19 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
+ libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
+ "go.uber.org/multierr"
+ "google.golang.org/protobuf/proto"
+
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
+ "github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
+ "github.com/pokt-network/pocket/p2p/unicast"
"github.com/pokt-network/pocket/p2p/utils"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
+ "github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
)
@@ -22,18 +29,31 @@ import (
var (
_ typesP2P.Router = &backgroundRouter{}
_ modules.IntegratableModule = &backgroundRouter{}
+ _ backgroundRouterFactory = &backgroundRouter{}
)
+type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig]
+
// backgroundRouter implements `typesP2P.Router` for use with all P2P participants.
type backgroundRouter struct {
base_modules.IntegratableModule
+ unicast.UnicastRouter
logger *modules.Logger
+ // handler is the function to call when a message is received.
+ handler typesP2P.MessageHandler
// host represents a libp2p network node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme)
host libp2pHost.Host
+ // cancelReadSubscription is the cancel function for the context which is
+ // monitored in the `#readSubscription()` go routine. Call to terminate it.
+ // only one read subscription exists per router at any point in time
+ cancelReadSubscription context.CancelFunc
+
+ // Fields below are assigned during creation via `#setupDependencies()`.
+
// gossipSub is used for broadcast communication
// (i.e. multiple, unidentified receivers)
// TECHDEBT: investigate diff between randomSub and gossipSub
@@ -47,96 +67,106 @@ type backgroundRouter struct {
kadDHT *dht.IpfsDHT
// TECHDEBT: `pstore` will likely be removed in future refactoring / simplification
// of the `Router` interface.
- // pstore is the background router's peerstore.
+ // pstore is the background router's peerstore. Assigned in `backgroundRouter#setupPeerstore()`.
pstore typesP2P.Peerstore
}
-// NewBackgroundRouter returns a `backgroundRouter` as a `typesP2P.Router`
+// Create returns a `backgroundRouter` as a `typesP2P.Router`
// interface using the given configuration.
-func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
- // TECHDEBT(#595): add ctx to interface methods and propagate down.
- ctx := context.TODO()
+func Create(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
+ return new(backgroundRouter).Create(bus, cfg)
+}
- networkLogger := logger.Global.CreateLoggerForModule("backgroundRouter")
- networkLogger.Info().Msg("Initializing background router")
+func (*backgroundRouter) Create(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
+ bgRouterLogger := logger.Global.CreateLoggerForModule("backgroundRouter")
- // seed initial peerstore with current on-chain peer info (i.e. staked actors)
- pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight(
- cfg.CurrentHeightProvider.CurrentHeight(),
- )
- if err != nil {
+ if err := cfg.IsValid(); err != nil {
return nil, err
}
- // CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
- gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host)
- if err != nil {
- return nil, fmt.Errorf("creating gossip pubsub: %w", err)
- }
+ // TECHDEBT(#595): add ctx to interface methods and propagate down.
+ ctx, cancel := context.WithCancel(context.TODO())
- dhtMode := dht.ModeAutoServer
- // NB: don't act as a bootstrap node in peer discovery in client debug mode
- if isClientDebugMode(bus) {
- dhtMode = dht.ModeClient
+ rtr := &backgroundRouter{
+ logger: bgRouterLogger,
+ handler: cfg.Handler,
+ host: cfg.Host,
+ cancelReadSubscription: cancel,
}
+ rtr.SetBus(bus)
- kadDHT, err := dht.New(ctx, cfg.Host, dht.Mode(dhtMode))
- if err != nil {
- return nil, fmt.Errorf("creating DHT: %w", err)
- }
+ bgRouterLogger.Info().Fields(map[string]any{
+ "host_id": cfg.Host.ID(),
+ "unicast_protocol_id": protocol.BackgroundProtocolID,
+ "broadcast_pubsub_topic": protocol.BackgroundTopicStr,
+ }).Msg("initializing background router")
- topic, err := gossipSub.Join(protocol.BackgroundTopicStr)
- if err != nil {
- return nil, fmt.Errorf("joining background topic: %w", err)
+ if err := rtr.setupDependencies(ctx, cfg); err != nil {
+ return nil, err
}
- // INVESTIGATE: `WithBufferSize` `SubOpt`:
- // > WithBufferSize is a Subscribe option to customize the size of the subscribe
- // > output buffer. The default length is 32 but it can be configured to avoid
- // > dropping messages if the consumer is not reading fast enough.
- // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize)
- subscription, err := topic.Subscribe()
- if err != nil {
- return nil, fmt.Errorf("subscribing to background topic: %w", err)
- }
+ go rtr.readSubscription(ctx)
- rtr := &backgroundRouter{
- host: cfg.Host,
- gossipSub: gossipSub,
- kadDHT: kadDHT,
- topic: topic,
- subscription: subscription,
- logger: networkLogger,
- pstore: pstore,
+ return rtr, nil
+}
+
+func (rtr *backgroundRouter) Close() error {
+ rtr.logger.Debug().Msg("closing background router")
+
+ rtr.cancelReadSubscription()
+ rtr.subscription.Cancel()
+
+ var topicCloseErr error
+ if err := rtr.topic.Close(); err != context.Canceled {
+ topicCloseErr = err
}
- return rtr, nil
+ return multierr.Append(
+ topicCloseErr,
+ rtr.kadDHT.Close(),
+ )
}
// Broadcast implements the respective `typesP2P.Router` interface method.
-func (rtr *backgroundRouter) Broadcast(data []byte) error {
+func (rtr *backgroundRouter) Broadcast(pocketEnvelopeBz []byte) error {
+ backgroundMsg := &typesP2P.BackgroundMessage{
+ Data: pocketEnvelopeBz,
+ }
+ backgroundMsgBz, err := proto.Marshal(backgroundMsg)
+ if err != nil {
+ return err
+ }
+
// TECHDEBT(#595): add ctx to interface methods and propagate down.
- return rtr.topic.Publish(context.TODO(), data)
+ return rtr.topic.Publish(context.TODO(), backgroundMsgBz)
}
// Send implements the respective `typesP2P.Router` interface method.
-func (rtr *backgroundRouter) Send(data []byte, address cryptoPocket.Address) error {
+func (rtr *backgroundRouter) Send(pocketEnvelopeBz []byte, address cryptoPocket.Address) error {
+ backgroundMessage := &typesP2P.BackgroundMessage{
+ Data: pocketEnvelopeBz,
+ }
+ backgroundMessageBz, err := proto.Marshal(backgroundMessage)
+ if err != nil {
+ return fmt.Errorf("marshalling background message: %w", err)
+ }
+
peer := rtr.pstore.GetPeer(address)
if peer == nil {
return fmt.Errorf("peer with address %s not in peerstore", address)
}
- if err := utils.Libp2pSendToPeer(rtr.host, data, peer); err != nil {
+ if err := utils.Libp2pSendToPeer(
+ rtr.host,
+ protocol.BackgroundProtocolID,
+ backgroundMessageBz,
+ peer,
+ ); err != nil {
return err
}
return nil
}
-// HandleNetworkData implements the respective `typesP2P.Router` interface method.
-func (rtr *backgroundRouter) HandleNetworkData(data []byte) ([]byte, error) {
- return data, nil // intentional passthrough
-}
-
// GetPeerstore implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) GetPeerstore() typesP2P.Peerstore {
return rtr.pstore
@@ -166,6 +196,228 @@ func (rtr *backgroundRouter) RemovePeer(peer typesP2P.Peer) error {
return rtr.pstore.RemovePeer(peer.GetAddress())
}
+// setupUnicastRouter configures and assigns `rtr.UnicastRouter`.
+func (rtr *backgroundRouter) setupUnicastRouter() error {
+ unicastRouterCfg := config.UnicastRouterConfig{
+ Logger: rtr.logger,
+ Host: rtr.host,
+ ProtocolID: protocol.BackgroundProtocolID,
+ MessageHandler: rtr.handleBackgroundMsg,
+ PeerHandler: rtr.AddPeer,
+ }
+
+ unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
+ if err != nil {
+ return fmt.Errorf("setting up unicast router: %w", err)
+ }
+
+ rtr.UnicastRouter = *unicastRouter
+ return nil
+}
+
+func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.BackgroundConfig) error {
+ // NB: The order in which the internal components are setup below is important
+ if err := rtr.setupUnicastRouter(); err != nil {
+ return err
+ }
+
+ if err := rtr.setupPeerDiscovery(ctx); err != nil {
+ return fmt.Errorf("setting up peer discovery: %w", err)
+ }
+
+ if err := rtr.setupPubsub(ctx); err != nil {
+ return fmt.Errorf("setting up pubsub: %w", err)
+ }
+
+ if err := rtr.setupTopic(); err != nil {
+ return fmt.Errorf("setting up topic: %w", err)
+ }
+
+ if err := rtr.setupSubscription(); err != nil {
+ return fmt.Errorf("setting up subscription: %w", err)
+ }
+
+ if err := rtr.setupPeerstore(
+ ctx,
+ cfg.PeerstoreProvider,
+ cfg.CurrentHeightProvider,
+ ); err != nil {
+ return fmt.Errorf("setting up peerstore: %w", err)
+ }
+ return nil
+}
+
+func (rtr *backgroundRouter) setupPeerstore(
+ ctx context.Context,
+ pstoreProvider providers.PeerstoreProvider,
+ currentHeightProvider providers.CurrentHeightProvider,
+) (err error) {
+ // seed initial peerstore with current on-chain peer info (i.e. staked actors)
+ rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(
+ currentHeightProvider.CurrentHeight(),
+ )
+ if err != nil {
+ return err
+ }
+
+ if err := rtr.bootstrap(ctx); err != nil {
+ return fmt.Errorf("bootstrapping peerstore: %w", err)
+ }
+
+ return nil
+}
+
+// setupPeerDiscovery sets up the Kademlia Distributed Hash Table (DHT)
+func (rtr *backgroundRouter) setupPeerDiscovery(ctx context.Context) (err error) {
+ dhtMode := dht.ModeAutoServer
+ // NB: don't act as a bootstrap node in peer discovery in client debug mode
+ if isClientDebugMode(rtr.GetBus()) {
+ dhtMode = dht.ModeClient
+ }
+
+ rtr.kadDHT, err = dht.New(ctx, rtr.host, dht.Mode(dhtMode))
+ return err
+}
+
+// setupPubsub sets up a new gossip sub topic using libp2p
+func (rtr *backgroundRouter) setupPubsub(ctx context.Context) (err error) {
+ // TECHDEBT(#730): integrate libp2p tracing via `pubsub.WithEventTracer()`.
+
+ // CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
+ rtr.gossipSub, err = pubsub.NewGossipSub(ctx, rtr.host)
+ return err
+}
+
+func (rtr *backgroundRouter) setupTopic() (err error) {
+ if err := rtr.gossipSub.RegisterTopicValidator(
+ protocol.BackgroundTopicStr,
+ rtr.topicValidator,
+ ); err != nil {
+ return fmt.Errorf(
+ "registering topic validator for topic: %q: %w",
+ protocol.BackgroundTopicStr, err,
+ )
+ }
+
+ if rtr.topic, err = rtr.gossipSub.Join(protocol.BackgroundTopicStr); err != nil {
+ return fmt.Errorf(
+ "joining background topic: %q: %w",
+ protocol.BackgroundTopicStr, err,
+ )
+ }
+ return nil
+}
+
+func (rtr *backgroundRouter) setupSubscription() (err error) {
+ // INVESTIGATE: `WithBufferSize` `SubOpt`:
+ // > WithBufferSize is a Subscribe option to customize the size of the subscribe
+ // > output buffer. The default length is 32 but it can be configured to avoid
+ // > dropping messages if the consumer is not reading fast enough.
+ // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize)
+ rtr.subscription, err = rtr.topic.Subscribe()
+ return err
+}
+
+func (rtr *backgroundRouter) bootstrap(ctx context.Context) error {
+ // CONSIDERATION: add `GetPeers` method, which returns a map,
+ // to the `PeerstoreProvider` interface to simplify this loop.
+ for _, peer := range rtr.pstore.GetPeerList() {
+ if err := utils.AddPeerToLibp2pHost(rtr.host, peer); err != nil {
+ return err
+ }
+
+ libp2pAddrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
+ if err != nil {
+ return fmt.Errorf(
+ "converting peer info, pokt address: %s: %w",
+ peer.GetAddress(),
+ err,
+ )
+ }
+
+ // don't attempt to connect to self
+ if rtr.host.ID() == libp2pAddrInfo.ID {
+ return nil
+ }
+
+ if err := rtr.host.Connect(ctx, libp2pAddrInfo); err != nil {
+ return fmt.Errorf("connecting to peer: %w", err)
+ }
+ }
+ return nil
+}
+
+// topicValidator is used in conjunction with libp2p-pubsub's notion of "topic
+// validaton". It is used for arbitrary and concurrent pre-propagation validation
+// of messages.
+//
+// (see: https://github.com/libp2p/specs/tree/master/pubsub#topic-validation
+// and https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#PubSub.RegisterTopicValidator)
+//
+// Also note: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#BasicSeqnoValidator
+func (rtr *backgroundRouter) topicValidator(_ context.Context, _ libp2pPeer.ID, msg *pubsub.Message) bool {
+ var backgroundMsg typesP2P.BackgroundMessage
+ if err := proto.Unmarshal(msg.Data, &backgroundMsg); err != nil {
+ rtr.logger.Error().Err(err).Msg("unmarshalling Background message")
+ return false
+ }
+
+ if backgroundMsg.Data == nil {
+ rtr.logger.Debug().Msg("no data in Background message")
+ return false
+ }
+
+ poktEnvelope := messaging.PocketEnvelope{}
+ if err := proto.Unmarshal(backgroundMsg.Data, &poktEnvelope); err != nil {
+ rtr.logger.Error().Err(err).Msg("Error decoding Background message")
+ return false
+ }
+
+ return true
+}
+
+// readSubscription is a while loop for receiving and handling messages from the
+// subscription. It is intended to be called as a goroutine.
+func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
+ for {
+ if err := ctx.Err(); err != nil {
+ if err != context.Canceled {
+ rtr.logger.Error().Err(err).
+ Msg("context error while reading subscription")
+ }
+ return
+ }
+ msg, err := rtr.subscription.Next(ctx)
+
+ if err != nil {
+ rtr.logger.Error().Err(err).
+ Msg("error reading from background topic subscription")
+ continue
+ }
+
+ // TECHDEBT/DISCUSS: telemetry
+ if err := rtr.handleBackgroundMsg(msg.Data); err != nil {
+ rtr.logger.Error().Err(err).Msg("error handling background message")
+ continue
+ }
+ }
+}
+
+func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error {
+ var backgroundMsg typesP2P.BackgroundMessage
+ if err := proto.Unmarshal(backgroundMsgBz, &backgroundMsg); err != nil {
+ return err
+ }
+
+ // There was no error, but we don't need to forward this to the app-specific bus.
+ // For example, the message has already been handled by the application.
+ if backgroundMsg.Data == nil {
+ return nil
+ }
+
+ return rtr.handler(backgroundMsg.Data)
+}
+
// isClientDebugMode returns the value of `ClientDebugMode` in the base config
func isClientDebugMode(bus modules.Bus) bool {
return bus.GetRuntimeMgr().GetConfig().ClientDebugMode
diff --git a/p2p/background/router_test.go b/p2p/background/router_test.go
index a1a0fe40b..61cb5f153 100644
--- a/p2p/background/router_test.go
+++ b/p2p/background/router_test.go
@@ -8,32 +8,44 @@ import (
"time"
"github.com/golang/mock/gomock"
+ pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/anypb"
+
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
+ "github.com/pokt-network/pocket/p2p/protocol"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/defaults"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
+ "github.com/pokt-network/pocket/shared/messaging"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
- "github.com/stretchr/testify/require"
)
// https://www.rfc-editor.org/rfc/rfc3986#section-3.2.2
-const testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080"
+const (
+ testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080"
+ invalidReceiveTimeout = time.Millisecond * 500
+)
// TECHDEBT(#609): move & de-dup.
-var testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort)
+var (
+ testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort)
+ noopHandler = func(data []byte) error { return nil }
+)
func TestBackgroundRouter_AddPeer(t *testing.T) {
- testRouter := newTestRouter(t, nil)
+ testRouter := newTestRouter(t, nil, nil)
libp2pPStore := testRouter.host.Peerstore()
// NB: assert initial state
@@ -81,7 +93,7 @@ func TestBackgroundRouter_AddPeer(t *testing.T) {
}
func TestBackgroundRouter_RemovePeer(t *testing.T) {
- testRouter := newTestRouter(t, nil)
+ testRouter := newTestRouter(t, nil, nil)
peerstore := testRouter.host.Peerstore()
// NB: assert initial state
@@ -114,6 +126,116 @@ func TestBackgroundRouter_RemovePeer(t *testing.T) {
require.Len(t, existingPeerstoreAddrs, 1)
}
+func TestBackgroundRouter_Validation(t *testing.T) {
+ invalidProtoMessage := anypb.Any{
+ TypeUrl: "/notADefinedProtobufType",
+ Value: []byte("not a serialized protobuf"),
+ }
+
+ testCases := []struct {
+ name string
+ msgBz []byte
+ }{
+ {
+ name: "invalid BackgroundMessage",
+ // NB: `msgBz` would normally be a serialized `BackgroundMessage`.
+ msgBz: mustMarshal(t, &invalidProtoMessage),
+ },
+ {
+ name: "empty PocketEnvelope",
+ msgBz: mustMarshal(t, &typesP2P.BackgroundMessage{
+ // NB: `Data` is normally a serialized `PocketEnvelope`.
+ Data: nil,
+ }),
+ },
+ {
+ name: "invalid PoketEnvelope",
+ msgBz: mustMarshal(t, &typesP2P.BackgroundMessage{
+ // NB: `Data` is normally a serialized `PocketEnvelope`.
+ Data: mustMarshal(t, &invalidProtoMessage),
+ }),
+ },
+ }
+
+ // Set up test router as the receiver.
+ ctx := context.Background()
+ libp2pMockNet := mocknet.New()
+
+ receivedChan := make(chan []byte, 1)
+ receiverPrivKey, receiverPeer := newTestPeer(t)
+ receiverHost := newTestHost(t, libp2pMockNet, receiverPrivKey)
+ receiverRouter := newRouterWithSelfPeerAndHost(
+ t, receiverPeer,
+ receiverHost,
+ func(data []byte) error {
+ receivedChan <- data
+ return nil
+ },
+ )
+
+ t.Cleanup(func() {
+ err := receiverRouter.Close()
+ require.NoError(t, err)
+ })
+
+ // Wrap `receiverRouter#topicValidator` to make assertions by.
+ // Existing topic validator must be unregistered first.
+ err := receiverRouter.gossipSub.UnregisterTopicValidator(protocol.BackgroundTopicStr)
+ require.NoError(t, err)
+
+ // Register topic validator wrapper.
+ err = receiverRouter.gossipSub.RegisterTopicValidator(
+ protocol.BackgroundTopicStr,
+ func(ctx context.Context, peerID libp2pPeer.ID, msg *pubsub.Message) bool {
+ msgIsValid := receiverRouter.topicValidator(ctx, peerID, msg)
+ require.Falsef(t, msgIsValid, "expected message to be invalid")
+
+ return msgIsValid
+ },
+ )
+ require.NoError(t, err)
+
+ for _, testCase := range testCases {
+ t.Run(testCase.name, func(t *testing.T) {
+ senderPrivKey, _ := newTestPeer(t)
+ senderHost := newTestHost(t, libp2pMockNet, senderPrivKey)
+ gossipPubsub, err := pubsub.NewGossipSub(ctx, senderHost)
+ require.NoError(t, err)
+
+ err = libp2pMockNet.LinkAll()
+ require.NoError(t, err)
+
+ receiverAddrInfo, err := utils.Libp2pAddrInfoFromPeer(receiverPeer)
+ require.NoError(t, err)
+
+ err = senderHost.Connect(ctx, receiverAddrInfo)
+ require.NoError(t, err)
+
+ topic, err := gossipPubsub.Join(protocol.BackgroundTopicStr)
+ require.NoError(t, err)
+
+ err = topic.Publish(ctx, testCase.msgBz)
+ require.NoError(t, err)
+
+ // Destroy previous topic and sender instances to start with new ones
+ // for each test case.
+ t.Cleanup(func() {
+ _ = topic.Close()
+ _ = senderHost.Close()
+ })
+
+ // Ensure no messages were handled at the end of each test case for
+ // async errors.
+ select {
+ case <-receivedChan:
+ t.Fatal("no messages should have been handled by receiver router")
+ case <-time.After(invalidReceiveTimeout):
+ // no error, continue
+ }
+ })
+ }
+}
+
func TestBackgroundRouter_Broadcast(t *testing.T) {
const (
numPeers = 4
@@ -138,17 +260,31 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
libp2pMockNet = mocknet.New()
)
- // setup 4 libp2p hosts to listen for incoming streams from the test backgroundRouter
+ testPocketEnvelope, err := messaging.PackMessage(&anypb.Any{
+ TypeUrl: "/test",
+ Value: []byte(testMsg),
+ })
+ require.NoError(t, err)
+
+ testPocketEnvelopeBz, err := proto.Marshal(testPocketEnvelope)
+ require.NoError(t, err)
+
+ // setup 4 receiver routers to listen for incoming messages from the sender router
for i := 0; i < numPeers; i++ {
broadcastWaitgroup.Add(1)
bootstrapWaitgroup.Add(1)
- privKey, selfPeer := newTestPeer(t)
+ privKey, peer := newTestPeer(t)
host := newTestHost(t, libp2pMockNet, privKey)
testHosts = append(testHosts, host)
expectedPeerIDs[i] = host.ID().String()
- rtr := newRouterWithSelfPeerAndHost(t, selfPeer, host)
- go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &seenMessagesMutext, seenMessages)
+ newRouterWithSelfPeerAndHost(t, peer, host, func(data []byte) error {
+ seenMessagesMutext.Lock()
+ defer seenMessagesMutext.Unlock()
+ seenMessages[host.ID().String()] = struct{}{}
+ broadcastWaitgroup.Done()
+ return nil
+ })
}
// bootstrap off of arbitrary testHost
@@ -156,12 +292,12 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
// set up a test backgroundRouter
testRouterHost := newTestHost(t, libp2pMockNet, privKey)
- testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost)
+ testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost, nil)
testHosts = append(testHosts, testRouterHost)
// simulate network links between each to every other
// (i.e. fully-connected network)
- err := libp2pMockNet.LinkAll()
+ err = libp2pMockNet.LinkAll()
require.NoError(t, err)
// setup notifee/notify BEFORE bootstrapping
@@ -189,7 +325,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
// broadcast message
t.Log("broadcasting...")
- err := testRouter.Broadcast([]byte(testMsg))
+ err := testRouter.Broadcast(testPocketEnvelopeBz)
require.NoError(t, err)
// wait for broadcast to be received by all peers
@@ -241,7 +377,11 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) {
}
// TECHDEBT(#609): move & de-duplicate
-func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRouter {
+func newTestRouter(
+ t *testing.T,
+ libp2pMockNet mocknet.Mocknet,
+ handler typesP2P.MessageHandler,
+) *backgroundRouter {
t.Helper()
privKey, selfPeer := newTestPeer(t)
@@ -256,10 +396,15 @@ func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRoute
require.NoError(t, err)
})
- return newRouterWithSelfPeerAndHost(t, selfPeer, host)
+ return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler)
}
-func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host) *backgroundRouter {
+func newRouterWithSelfPeerAndHost(
+ t *testing.T,
+ selfPeer typesP2P.Peer,
+ host libp2pHost.Host,
+ handler typesP2P.MessageHandler,
+) *backgroundRouter {
t.Helper()
ctrl := gomock.NewController(t)
@@ -268,7 +413,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
P2P: &configs.P2PConfig{
IsClientOnly: false,
},
- })
+ }).AnyTimes()
consensusMock := mockModules.NewMockConsensusModule(ctrl)
consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes()
@@ -284,11 +429,16 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
err := pstore.AddPeer(selfPeer)
require.NoError(t, err)
- router, err := NewBackgroundRouter(busMock, &config.BackgroundConfig{
+ if handler == nil {
+ handler = noopHandler
+ }
+
+ router, err := Create(busMock, &config.BackgroundConfig{
Addr: selfPeer.GetAddress(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: consensusMock,
Host: host,
+ Handler: handler,
})
require.NoError(t, err)
@@ -332,7 +482,11 @@ func newMockNetHostFromPeer(
return host
}
-func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.PrivateKey) libp2pHost.Host {
+func newTestHost(
+ t *testing.T,
+ mockNet mocknet.Mocknet,
+ privKey cryptoPocket.PrivateKey,
+) libp2pHost.Host {
t.Helper()
// listen on random port on loopback interface
@@ -346,30 +500,11 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri
return newMockNetHostFromPeer(t, mockNet, privKey, peer)
}
-func readSubscription(
- t *testing.T,
- ctx context.Context,
- broadcastWaitGroup *sync.WaitGroup,
- rtr *backgroundRouter,
- mu *sync.Mutex,
- seenMsgs map[string]struct{},
-) {
+func mustMarshal(t *testing.T, msg proto.Message) []byte {
t.Helper()
- for {
- if err := ctx.Err(); err != nil {
- if err != context.Canceled || err != context.DeadlineExceeded {
- require.NoError(t, err)
- }
- return
- }
-
- _, err := rtr.subscription.Next(ctx)
- require.NoError(t, err)
+ msgBz, err := proto.Marshal(msg)
+ require.NoError(t, err)
- mu.Lock()
- broadcastWaitGroup.Done()
- seenMsgs[rtr.host.ID().String()] = struct{}{}
- mu.Unlock()
- }
+ return msgBz
}
diff --git a/p2p/bootstrap.go b/p2p/bootstrap.go
index 75198ee0e..1def39373 100644
--- a/p2p/bootstrap.go
+++ b/p2p/bootstrap.go
@@ -42,6 +42,7 @@ func (m *p2pModule) configureBootstrapNodes() error {
}
// bootstrap attempts to bootstrap from a bootstrap node
+// TECHDEBT(#859): refactor bootstrapping.
func (m *p2pModule) bootstrap() error {
var pstore typesP2P.Peerstore
@@ -76,14 +77,14 @@ func (m *p2pModule) bootstrap() error {
for _, peer := range pstore.GetPeerList() {
m.logger.Debug().Str("address", peer.GetAddress().String()).Msg("Adding peer to router")
- if err := m.router.AddPeer(peer); err != nil {
+ if err := m.stakedActorRouter.AddPeer(peer); err != nil {
m.logger.Error().Err(err).
Str("pokt_address", peer.GetAddress().String()).
Msg("adding peer")
}
}
- if m.router.GetPeerstore().Size() == 0 {
+ if m.stakedActorRouter.GetPeerstore().Size() == 0 {
return fmt.Errorf("bootstrap failed")
}
return nil
diff --git a/p2p/config/config.go b/p2p/config/config.go
index 3cbdf4c40..99ddfb715 100644
--- a/p2p/config/config.go
+++ b/p2p/config/config.go
@@ -16,6 +16,7 @@ import (
var (
_ typesP2P.RouterConfig = &baseConfig{}
_ typesP2P.RouterConfig = &UnicastRouterConfig{}
+ _ typesP2P.RouterConfig = &BackgroundConfig{}
_ typesP2P.RouterConfig = &RainTreeConfig{}
)
@@ -30,6 +31,7 @@ type baseConfig struct {
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
+ Handler func(data []byte) error
}
type UnicastRouterConfig struct {
@@ -76,6 +78,10 @@ func (cfg *baseConfig) IsValid() (err error) {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}
+ if cfg.Handler == nil {
+ err = multierr.Append(err, fmt.Errorf("handler not configured"))
+ }
+
return err
}
@@ -104,23 +110,25 @@ func (cfg *UnicastRouterConfig) IsValid() (err error) {
}
// IsValid implements the respective member of the `RouterConfig` interface.
-func (cfg *BackgroundConfig) IsValid() (err error) {
+func (cfg *BackgroundConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
+ Handler: cfg.Handler,
}
- return multierr.Append(err, baseCfg.IsValid())
+ return baseCfg.IsValid()
}
// IsValid implements the respective member of the `RouterConfig` interface.
-func (cfg *RainTreeConfig) IsValid() (err error) {
+func (cfg *RainTreeConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
+ Handler: cfg.Handler,
}
- return multierr.Append(err, baseCfg.IsValid())
+ return baseCfg.IsValid()
}
diff --git a/p2p/event_handler.go b/p2p/event_handler.go
index 48e1a7d73..ba0885839 100644
--- a/p2p/event_handler.go
+++ b/p2p/event_handler.go
@@ -3,10 +3,11 @@ package p2p
import (
"fmt"
+ "google.golang.org/protobuf/types/known/anypb"
+
"github.com/pokt-network/pocket/shared/codec"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/messaging"
- "google.golang.org/protobuf/types/known/anypb"
)
// CONSIDERATION(#576): making this part of some new `ConnManager`.
@@ -23,7 +24,13 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
return fmt.Errorf("failed to cast event to ConsensusNewHeightEvent")
}
- oldPeerList := m.router.GetPeerstore().GetPeerList()
+ if isStaked, err := m.isStakedActor(); err != nil {
+ return err
+ } else if !isStaked {
+ return nil // unstaked actors do not use RainTree and therefore do not need to update this router
+ }
+
+ oldPeerList := m.stakedActorRouter.GetPeerstore().GetPeerList()
updatedPeerstore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height)
if err != nil {
return err
@@ -31,12 +38,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
added, removed := oldPeerList.Delta(updatedPeerstore.GetPeerList())
for _, add := range added {
- if err := m.router.AddPeer(add); err != nil {
+ if err := m.stakedActorRouter.AddPeer(add); err != nil {
return err
}
}
for _, rm := range removed {
- if err := m.router.RemovePeer(rm); err != nil {
+ if err := m.stakedActorRouter.RemovePeer(rm); err != nil {
return err
}
}
@@ -50,13 +57,25 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
m.logger.Debug().Fields(messaging.TransitionEventToMap(stateMachineTransitionEvent)).Msg("Received state machine transition event")
if stateMachineTransitionEvent.NewState == string(coreTypes.StateMachineState_P2P_Bootstrapping) {
- if m.router.GetPeerstore().Size() == 0 {
- m.logger.Warn().Msg("No peers in addrbook, bootstrapping")
+ staked, err := m.isStakedActor()
+ if err != nil {
+ return err
+ }
+ if staked {
+ // TECHDEBT(#859): this will never happen as the peerstore is
+ // seeded from consensus during P2P module construction.
+ if m.stakedActorRouter.GetPeerstore().Size() == 0 {
+ m.logger.Warn().Msg("No peers in peerstore, bootstrapping")
- if err := m.bootstrap(); err != nil {
- return err
+ if err := m.bootstrap(); err != nil {
+ return err
+ }
}
}
+
+ // TECHDEBT(#859): for unstaked actors, unstaked actor (background)
+ // router bootstrapping SHOULD complete before the event below is sent.
+
m.logger.Info().Bool("TODO", true).Msg("Advertise self to network")
if err := m.GetBus().GetStateMachineModule().SendEvent(coreTypes.StateMachineEvent_P2P_IsBootstrapped); err != nil {
return err
diff --git a/p2p/module.go b/p2p/module.go
index 5f87d5b6e..e4479c080 100644
--- a/p2p/module.go
+++ b/p2p/module.go
@@ -3,14 +3,17 @@ package p2p
import (
"errors"
"fmt"
+ "sync/atomic"
"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
+ "go.uber.org/multierr"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pokt-network/pocket/logger"
+ "github.com/pokt-network/pocket/p2p/background"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
@@ -35,6 +38,7 @@ var _ modules.P2PModule = &p2pModule{}
type p2pModule struct {
base_modules.IntegratableModule
+ started atomic.Bool
address cryptoPocket.Address
logger *modules.Logger
options []modules.ModuleOption
@@ -55,8 +59,9 @@ type p2pModule struct {
// holding a reference in the module struct. This will improve testability.
//
// Assigned during `#Start()`. TLDR; `host` listens on instantiation.
- // and `router` depends on `host`.
- router typesP2P.Router
+ // `stakedActorRouter` and `unstakedActorRouter` depends on `host`.
+ stakedActorRouter typesP2P.Router
+ unstakedActorRouter typesP2P.Router
// host represents a libp2p network node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options. Assigned via `#Start()` (starts on instantiation).
@@ -68,19 +73,6 @@ func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, e
return new(p2pModule).Create(bus, options...)
}
-// WithHostOption associates an existing (i.e. "started") libp2p `host.Host`
-// with this module, instead of creating a new one on `#Start()`.
-// Primarily intended for testing.
-func WithHostOption(host libp2pHost.Host) modules.ModuleOption {
- return func(m modules.InitializableModule) {
- mod, ok := m.(*p2pModule)
- if ok {
- mod.host = host
- mod.logger.Debug().Msg("using host provided via `WithHostOption`")
- }
- }
-}
-
func (m *p2pModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
logger.Global.Debug().Msg("Creating P2P module")
*m = p2pModule{
@@ -143,8 +135,12 @@ func (m *p2pModule) GetModuleName() string {
}
// Start instantiates and assigns `m.host`, unless one already exists, and
-// `m.router` (which depends on `m.host` as a required config field).
+// `m.stakedActorRouter` (which depends on `m.host` as a required config field).
func (m *p2pModule) Start() (err error) {
+ if !m.started.CompareAndSwap(false, true) {
+ return fmt.Errorf("p2p module already started")
+ }
+
m.GetBus().
GetTelemetryModule().
GetTimeSeriesAgent().
@@ -153,7 +149,7 @@ func (m *p2pModule) Start() (err error) {
telemetry.P2P_NODE_STARTED_TIMESERIES_METRIC_DESCRIPTION,
)
- // Return early if host has already been started (e.g. via `WithHostOption`)
+ // Return early if host has already been started (e.g. via `WithHost`)
if m.host == nil {
// Libp2p hosts provided via `WithHost()` option are destroyed when
// `#Stop()`ing the module. Therefore, a new one must be created.
@@ -168,8 +164,8 @@ func (m *p2pModule) Start() (err error) {
}
}
- if err := m.setupRouter(); err != nil {
- return fmt.Errorf("setting up router: %w", err)
+ if err := m.setupRouters(); err != nil {
+ return fmt.Errorf("setting up routers: %w", err)
}
m.GetBus().
@@ -180,38 +176,102 @@ func (m *p2pModule) Start() (err error) {
}
func (m *p2pModule) Stop() error {
- err := m.host.Close()
+ m.logger.Debug().Msg("stopping P2P module")
+
+ if !m.started.CompareAndSwap(true, false) {
+ return fmt.Errorf("p2p module already stopped")
+ }
+
+ var stakedActorRouterCloseErr error
+ if m.stakedActorRouter != nil {
+ stakedActorRouterCloseErr = m.stakedActorRouter.Close()
+ }
+
+ routerCloseErrs := multierr.Append(
+ m.unstakedActorRouter.Close(),
+ stakedActorRouterCloseErr,
+ )
+
+ err := multierr.Append(
+ routerCloseErrs,
+ m.host.Close(),
+ )
// Don't reuse closed host, `#Start()` will re-create.
m.host = nil
+ m.stakedActorRouter = nil
+ m.unstakedActorRouter = nil
return err
}
func (m *p2pModule) Broadcast(msg *anypb.Any) error {
- c := &messaging.PocketEnvelope{
+ isStaked, err := m.isStakedActor()
+ if err != nil {
+ return err
+ }
+
+ if isStaked {
+ if m.stakedActorRouter == nil {
+ return fmt.Errorf("broadcasting: staked actor router not started")
+ }
+ }
+
+ if m.unstakedActorRouter == nil {
+ return fmt.Errorf("broadcasting: unstaked actor router not started")
+ }
+
+ poktEnvelope := &messaging.PocketEnvelope{
Content: msg,
Nonce: cryptoPocket.GetNonce(),
}
- data, err := codec.GetCodec().Marshal(c)
+ poktEnvelopeBz, err := codec.GetCodec().Marshal(poktEnvelope)
if err != nil {
return err
}
- return m.router.Broadcast(data)
+ var stakedBroadcastErr error
+ if isStaked {
+ stakedBroadcastErr = m.stakedActorRouter.Broadcast(poktEnvelopeBz)
+ }
+
+ unstakedBroadcastErr := m.unstakedActorRouter.Broadcast(poktEnvelopeBz)
+
+ return multierr.Append(stakedBroadcastErr, unstakedBroadcastErr)
}
func (m *p2pModule) Send(addr cryptoPocket.Address, msg *anypb.Any) error {
- c := &messaging.PocketEnvelope{
+ poktEnvelope := &messaging.PocketEnvelope{
Content: msg,
Nonce: cryptoPocket.GetNonce(),
}
- data, err := codec.GetCodec().Marshal(c)
+ poktEnvelopeBz, err := codec.GetCodec().Marshal(poktEnvelope)
if err != nil {
return err
}
- return m.router.Send(data, addr)
+ isStaked, err := m.isStakedActor()
+ if err != nil {
+ return err
+ }
+
+ // Send via the staked actor router both if this node and the peer are staked
+ // actors; otherwise, send via the unstaked actor router.
+ if !isStaked {
+ return m.unstakedActorRouter.Send(poktEnvelopeBz, addr)
+ }
+
+ stakedActorSendErr := m.stakedActorRouter.Send(poktEnvelopeBz, addr)
+
+ // Peer is not a staked actor.
+ if errors.Is(stakedActorSendErr, typesP2P.ErrUnknownPeer) {
+ m.logger.Warn().
+ Str("address", addr.String()).
+ Msgf("attempting to send to unstaked actor")
+
+ return m.unstakedActorRouter.Send(poktEnvelopeBz, addr)
+ }
+ return nil
}
// TECHDEBT(#348): Define what the node identity is throughout the codebase
@@ -273,6 +333,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error {
m.logger.Debug().Msg("setupCurrentHeightProvider")
currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName)
if err != nil {
+ // TECHDEBT(#810): add a `consensusCurrentHeightProvider` submodule to wrap
+ // the consensus module usage (similar to how `persistencePeerstoreProvider`
+ // wraps persistence).
currentHeightProviderModule = m.GetBus().GetConsensusModule()
}
@@ -305,11 +368,37 @@ func (m *p2pModule) setupNonceDeduper() error {
return nil
}
-// setupRouter instantiates the configured router implementation.
-func (m *p2pModule) setupRouter() (err error) {
- // TECHDEBT(#810): register the router to the module registry instead of
+// setupRouters instantiates the configured router implementations.
+func (m *p2pModule) setupRouters() (err error) {
+ // TECHDEBT(#810): register the routers to the module registry instead of
// holding a reference in the module struct. This will improve testability.
- m.router, err = raintree.NewRainTreeRouter(
+ if err := m.setupStakedRouter(); err != nil {
+ return err
+ }
+
+ if err := m.setupUnstakedRouter(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// setupStakedRouter initializes the staked actor router ONLY IF this node is
+// a staked actor, exclusively for use between staked actors.
+func (m *p2pModule) setupStakedRouter() (err error) {
+ // `nstakedActorRouter` may already be initialized via a `ModuleOption`.
+ if m.stakedActorRouter != nil {
+ m.logger.Debug().Msg("staked actor router already initialized")
+ return nil
+ }
+
+ if isStaked, err := m.isStakedActor(); err != nil {
+ return err
+ } else if !isStaked {
+ return nil
+ }
+
+ m.logger.Debug().Msg("setting up staked actor router")
+ m.stakedActorRouter, err = raintree.NewRainTreeRouter(
m.GetBus(),
&config.RainTreeConfig{
Addr: m.address,
@@ -319,10 +408,39 @@ func (m *p2pModule) setupRouter() (err error) {
Handler: m.handlePocketEnvelope,
},
)
- return err
+ if err != nil {
+ return fmt.Errorf("setting up staked actor router: %w", err)
+ }
+ return nil
+}
+
+// setupUnstakedRouter initializes the unstaked actor router for use with the
+// entire P2P network.
+func (m *p2pModule) setupUnstakedRouter() (err error) {
+ // `unstakedActorRouter` may already be initialized via a `ModuleOption`.
+ if m.unstakedActorRouter != nil {
+ m.logger.Debug().Msg("unstaked actor router already initialized")
+ return nil
+ }
+
+ m.logger.Debug().Msg("setting up unstaked actor router")
+ m.unstakedActorRouter, err = background.Create(
+ m.GetBus(),
+ &config.BackgroundConfig{
+ Addr: m.address,
+ CurrentHeightProvider: m.currentHeightProvider,
+ PeerstoreProvider: m.pstoreProvider,
+ Host: m.host,
+ Handler: m.handlePocketEnvelope,
+ },
+ )
+ if err != nil {
+ return fmt.Errorf("unstaked actor router: %w", err)
+ }
+ return nil
}
-// setupHost creates a new libp2p host and assignes it to `m.host`. Libp2p host
+// setupHost creates a new libp2p host and assigns it to `m.host`. Libp2p host
// starts listening upon instantiation.
func (m *p2pModule) setupHost() (err error) {
m.logger.Debug().Msg("creating new libp2p host")
@@ -437,3 +555,24 @@ func (m *p2pModule) getMultiaddr() (multiaddr.Multiaddr, error) {
"%s:%d", m.cfg.Hostname, m.cfg.Port,
))
}
+
+func (m *p2pModule) getStakedPeerstore() (typesP2P.Peerstore, error) {
+ return m.pstoreProvider.GetStakedPeerstoreAtHeight(
+ m.currentHeightProvider.CurrentHeight(),
+ )
+}
+
+// isStakedActor returns whether the current node is a staked actor at the current height.
+// Return an error if a peerstore can't be provided.
+func (m *p2pModule) isStakedActor() (bool, error) {
+ pstore, err := m.getStakedPeerstore()
+ if err != nil {
+ return false, fmt.Errorf("getting staked peerstore: %w", err)
+ }
+
+ // Ensure self address is present in current height's staked actor set.
+ if self := pstore.GetPeer(m.address); self != nil {
+ return true, nil
+ }
+ return false, nil
+}
diff --git a/p2p/module_raintree_test.go b/p2p/module_raintree_test.go
index 9bd873913..3dc98a988 100644
--- a/p2p/module_raintree_test.go
+++ b/p2p/module_raintree_test.go
@@ -9,18 +9,14 @@ import (
"regexp"
"sort"
"strconv"
- "strings"
"sync"
"testing"
- libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pokt-network/pocket/internal/testutil"
- "github.com/pokt-network/pocket/p2p/protocol"
- "github.com/pokt-network/pocket/p2p/raintree"
)
// TODO(#314): Add the tooling and instructions on how to generate unit tests in this file.
@@ -220,11 +216,13 @@ func TestRainTreeNetworkCompleteTwentySevenNodes(t *testing.T) {
// 1. It creates and configures a "real" P2P module where all the other components of the node are mocked.
// 2. It then triggers a single message and waits for all of the expected messages transmission to complete before announcing failure.
func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig TestNetworkSimulationConfig) {
+ var readWriteWaitGroup sync.WaitGroup
+
// Configure & prepare test module
numValidators := len(networkSimulationConfig)
runtimeConfigs := createMockRuntimeMgrs(t, numValidators)
genesisMock := runtimeConfigs[0].GetGenesis()
- busMocks := createMockBuses(t, runtimeConfigs)
+ busMocks := createMockBuses(t, runtimeConfigs, &readWriteWaitGroup)
valIds := make([]string, 0, numValidators)
for valId := range networkSimulationConfig {
@@ -241,7 +239,6 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te
// Create connection and bus mocks along with a shared WaitGroup to track the number of expected
// reads and writes throughout the mocked local network
- var wg sync.WaitGroup
for i, valId := range valIds {
expectedCall := networkSimulationConfig[valId]
expectedReads := expectedCall.numNetworkReads
@@ -249,50 +246,41 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te
log.Printf("[valId: %s] expected reads: %d\n", valId, expectedReads)
log.Printf("[valId: %s] expected writes: %d\n", valId, expectedWrites)
- wg.Add(expectedReads)
- wg.Add(expectedWrites)
+ readWriteWaitGroup.Add(expectedReads)
+ readWriteWaitGroup.Add(expectedWrites)
persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock)
consensusMock := prepareConsensusMock(t, busMocks[i])
- telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &wg, expectedWrites)
+ telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites)
prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock)
}
libp2pMockNet := mocknet.New()
- defer func() {
- err := libp2pMockNet.Close()
- require.NoError(t, err)
- }()
// Inject the connection and bus mocks into the P2P modules
p2pModules := createP2PModules(t, busMocks, libp2pMockNet)
- for serviceURL, p2pMod := range p2pModules {
+ for _, p2pMod := range p2pModules {
err := p2pMod.Start()
require.NoError(t, err)
-
- sURL := strings.Clone(serviceURL)
- mod := *p2pMod
- p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) {
- log.Printf("[valID: %s] Read\n", sURL)
- (&mod).router.(*raintree.RainTreeRouter).HandleStream(stream)
- wg.Done()
- })
}
// Wait for completion
- defer waitForNetworkSimulationCompletion(t, &wg)
+ defer waitForNetworkSimulationCompletion(t, &readWriteWaitGroup)
t.Cleanup(func() {
// Stop all p2p modules
for _, p2pMod := range p2pModules {
err := p2pMod.Stop()
require.NoError(t, err)
}
+
+ err := libp2pMockNet.Close()
+ require.NoError(t, err)
})
// Send the first message (by the originator) to trigger a RainTree broadcast
- p := &anypb.Any{}
+ p := &anypb.Any{TypeUrl: "test"}
p2pMod := p2pModules[origNode]
require.NoError(t, p2pMod.Broadcast(p))
}
diff --git a/p2p/module_test.go b/p2p/module_test.go
index 79cd17066..45477bd1d 100644
--- a/p2p/module_test.go
+++ b/p2p/module_test.go
@@ -111,7 +111,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl)
- mockBus := createMockBus(t, mockRuntimeMgr)
+ mockBus := createMockBus(t, mockRuntimeMgr, nil)
genesisStateMock := createMockGenesisState(keys)
persistenceMock := preparePersistenceMock(t, mockBus, genesisStateMock)
@@ -137,7 +137,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) {
}
host := newLibp2pMockNetHost(t, privKey, peer)
- p2pMod, err := Create(mockBus, WithHostOption(host))
+ p2pMod, err := Create(mockBus, WithHost(host))
if (err != nil) != tt.wantErr {
t.Errorf("p2pModule.Create() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -155,7 +155,7 @@ func TestP2pModule_WithHostOption_Restart(t *testing.T) {
privKey := cryptoPocket.GetPrivKeySeed(1)
mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl)
- mockBus := createMockBus(t, mockRuntimeMgr)
+ mockBus := createMockBus(t, mockRuntimeMgr, nil)
genesisStateMock := createMockGenesisState(nil)
persistenceMock := preparePersistenceMock(t, mockBus, genesisStateMock)
@@ -184,7 +184,7 @@ func TestP2pModule_WithHostOption_Restart(t *testing.T) {
}
mockNetHost := newLibp2pMockNetHost(t, privKey, peer)
- p2pMod, err := Create(mockBus, WithHostOption(mockNetHost))
+ p2pMod, err := Create(mockBus, WithHost(mockNetHost))
require.NoError(t, err)
mod, ok := p2pMod.(*p2pModule)
diff --git a/p2p/protocol/protocol.go b/p2p/protocol/protocol.go
index 81737e9a8..c4463bb45 100644
--- a/p2p/protocol/protocol.go
+++ b/p2p/protocol/protocol.go
@@ -3,15 +3,17 @@ package protocol
import "github.com/libp2p/go-libp2p/core/protocol"
const (
- // PoktProtocolID is the libp2p protocol ID used when opening a new stream
- // to a remote peer and setting the stream handler for the local peer.
- // Libp2p APIs use this to distinguish which multiplexed protocols/streams to consider.
- PoktProtocolID = protocol.ID("pokt/v1.0.0")
+ // RaintreeProtocolID is the libp2p protocol ID used in the Raintree router
+ // when opening a new stream to a remote peer and setting the stream handler
+ // for the local peer. Libp2p APIs use this to distinguish which multiplexed
+ // protocols/streams to consider.
+ RaintreeProtocolID = protocol.ID("pokt/raintree/v1.0.0")
+ // BackgroundProtocolID is the libp2p protocol ID used in the Background router
+ // when opening a new stream to a remote peer and setting the stream handler
+ // for the local peer. Libp2p APIs use this to distinguish which multiplexed
+ // protocols/streams to consider.
+ BackgroundProtocolID = protocol.ID("pokt/background/v1.0.0")
// BackgroundTopicStr is a "default" pubsub topic string used when
// subscribing and broadcasting.
BackgroundTopicStr = "pokt/background"
- // PeerDiscoveryNamespace used by both advertiser and discoverer to rendezvous
- // during peer discovery. Advertiser(s) and discoverer(s) MUST have matching
- // discovery namespaces to find one another.
- PeerDiscoveryNamespace = "pokt/peer_discovery"
)
diff --git a/p2p/raintree/peers_manager_test.go b/p2p/raintree/peers_manager_test.go
index 7fa01a3d3..9e3fa2502 100644
--- a/p2p/raintree/peers_manager_test.go
+++ b/p2p/raintree/peers_manager_test.go
@@ -9,8 +9,11 @@ import (
"github.com/foxcpp/go-mockdns"
"github.com/golang/mock/gomock"
+ libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
+ "github.com/stretchr/testify/require"
+
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
typesP2P "github.com/pokt-network/pocket/p2p/types"
@@ -18,7 +21,6 @@ import (
"github.com/pokt-network/pocket/runtime/configs"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
- "github.com/stretchr/testify/require"
)
const (
@@ -26,6 +28,8 @@ const (
addrAlphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ["
)
+var noopHandler = func(_ []byte) error { return nil }
+
type ExpectedRainTreeRouterConfig struct {
numNodes int
numExpectedLevels int
@@ -101,6 +105,7 @@ func TestRainTree_Peerstore_HandleUpdate(t *testing.T) {
Addr: pubKey.Address(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
+ Handler: noopHandler,
}
router, err := NewRainTreeRouter(mockBus, rtCfg)
@@ -168,6 +173,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) {
Addr: pubKey.Address(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
+ Handler: noopHandler,
}
router, err := NewRainTreeRouter(mockBus, rtCfg)
@@ -286,13 +292,15 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM
hostMock := mocksP2P.NewMockHost(ctrl)
hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes()
- hostMock.EXPECT().SetStreamHandler(gomock.Any(), gomock.Any()).Times(1)
+ hostMock.EXPECT().SetStreamHandler(gomock.Any(), gomock.Any()).AnyTimes()
+ hostMock.EXPECT().ID().Return(libp2pPeer.ID("")).AnyTimes()
rtCfg := &config.RainTreeConfig{
Host: hostMock,
Addr: []byte{expectedMsgProp.orig},
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
+ Handler: noopHandler,
}
router, err := NewRainTreeRouter(busMock, rtCfg)
diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go
index fe11c9120..d79e5be76 100644
--- a/p2p/raintree/router.go
+++ b/p2p/raintree/router.go
@@ -54,9 +54,7 @@ func NewRainTreeRouter(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Ro
}
func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) {
- routerLogger := logger.Global.CreateLoggerForModule("router")
- routerLogger.Info().Msg("Initializing rainTreeRouter")
-
+ rainTreeLogger := logger.Global.CreateLoggerForModule("rainTreeRouter")
if err := cfg.IsValid(); err != nil {
return nil, err
}
@@ -66,11 +64,24 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
selfAddr: cfg.Addr,
pstoreProvider: cfg.PeerstoreProvider,
currentHeightProvider: cfg.CurrentHeightProvider,
- logger: routerLogger,
+ logger: rainTreeLogger,
handler: cfg.Handler,
}
rtr.SetBus(bus)
+ height := rtr.currentHeightProvider.CurrentHeight()
+ pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(height)
+ if err != nil {
+ return nil, fmt.Errorf("getting staked peerstore at height %d: %w", height, err)
+ }
+ rainTreeLogger.Info().Fields(map[string]any{
+ "address": cfg.Addr,
+ "host_id": cfg.Host.ID(),
+ "protocol_id": protocol.BackgroundProtocolID,
+ "current_height": height,
+ "peerstore_size": pstore.Size(),
+ }).Msg("initializing raintree router")
+
if err := rtr.setupDependencies(); err != nil {
return nil, err
}
@@ -78,6 +89,10 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
return typesP2P.Router(rtr), nil
}
+func (rtr *rainTreeRouter) Close() error {
+ return nil
+}
+
// NetworkBroadcast implements the respective member of `typesP2P.Router`.
func (rtr *rainTreeRouter) Broadcast(data []byte) error {
return rtr.broadcastAtLevel(data, rtr.peersManager.GetMaxNumLevels())
@@ -151,15 +166,14 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres
peer := rtr.peersManager.GetPeerstore().GetPeer(address)
if peer == nil {
- return fmt.Errorf("no known peer with pokt address %s", address)
+ return fmt.Errorf("%w: with pokt address %s", typesP2P.ErrUnknownPeer, address)
}
// debug logging
hostname := rtr.getHostname()
utils.LogOutgoingMsg(rtr.logger, hostname, peer)
- if err := utils.Libp2pSendToPeer(rtr.host, data, peer); err != nil {
- rtr.logger.Debug().Err(err).Msg("from libp2pSendInternal")
+ if err := utils.Libp2pSendToPeer(rtr.host, protocol.RaintreeProtocolID, data, peer); err != nil {
return err
}
@@ -200,14 +214,13 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
var rainTreeMsg typesP2P.RainTreeMessage
if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil {
+ // TECHDEBT: add telemetry
return err
}
- // TECHDEBT(#763): refactor as "pre-propagation validation"
- networkMessage := messaging.PocketEnvelope{}
- if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil {
- rtr.logger.Error().Err(err).Msg("Error decoding network message")
- return err
+ if err := rtr.validateRainTreeMsg(&rainTreeMsg); err != nil {
+ // TECHDEBT: add telemetry
+ return fmt.Errorf("validating raintree message: %w", err)
}
// Continue RainTree propagation
@@ -231,6 +244,13 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
return nil
}
+// validateRainTreeMsg ensures that the `data` contained within the RainTree message
+// is a valid `PocketEnvelope` by attempting to deserialize it.
+func (rtr *rainTreeRouter) validateRainTreeMsg(rainTreeMsg *typesP2P.RainTreeMessage) error {
+ networkMessage := messaging.PocketEnvelope{}
+ return proto.Unmarshal(rainTreeMsg.Data, &networkMessage)
+}
+
// GetPeerstore implements the respective member of `typesP2P.Router`.
func (rtr *rainTreeRouter) GetPeerstore() typesP2P.Peerstore {
return rtr.peersManager.GetPeerstore()
@@ -284,7 +304,7 @@ func (rtr *rainTreeRouter) setupUnicastRouter() error {
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
- ProtocolID: protocol.PoktProtocolID,
+ ProtocolID: protocol.RaintreeProtocolID,
MessageHandler: rtr.handleRainTreeMsg,
PeerHandler: rtr.AddPeer,
}
diff --git a/p2p/raintree/router_test.go b/p2p/raintree/router_test.go
index 1ef093a20..2865a584b 100644
--- a/p2p/raintree/router_test.go
+++ b/p2p/raintree/router_test.go
@@ -57,6 +57,7 @@ func TestRainTreeRouter_AddPeer(t *testing.T) {
Addr: selfAddr,
PeerstoreProvider: peerstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
+ Handler: noopHandler,
}
router, err := NewRainTreeRouter(busMock, rtCfg)
@@ -119,6 +120,7 @@ func TestRainTreeRouter_RemovePeer(t *testing.T) {
Addr: selfAddr,
PeerstoreProvider: peerstoreProviderMock,
CurrentHeightProvider: currentHeightProviderMock,
+ Handler: noopHandler,
}
router, err := NewRainTreeRouter(busMock, rtCfg)
diff --git a/p2p/testutil.go b/p2p/testutil.go
new file mode 100644
index 000000000..808f2d497
--- /dev/null
+++ b/p2p/testutil.go
@@ -0,0 +1,48 @@
+//go:build test
+
+package p2p
+
+import (
+ libp2pHost "github.com/libp2p/go-libp2p/core/host"
+ typesP2P "github.com/pokt-network/pocket/p2p/types"
+ "github.com/pokt-network/pocket/shared/modules"
+)
+
+// WithHost associates an existing (i.e. "started") libp2p `host.Host`
+// with this module, instead of creating a new one on `#Start()`.
+// Primarily intended for testing.
+func WithHost(host libp2pHost.Host) modules.ModuleOption {
+ return func(m modules.InitializableModule) {
+ mod, ok := m.(*p2pModule)
+ if ok {
+ mod.host = host
+ mod.logger.Debug().Msg("using host provided via `WithHost`")
+ }
+ }
+}
+
+// WithUnstakedActorRouter assigns the given router to the P2P modules
+// `#unstakedActor` field, used to communicate between unstaked actors
+// and the rest of the network, plus as a redundancy to the staked actor
+// router when broadcasting.
+func WithUnstakedActorRouter(router typesP2P.Router) modules.ModuleOption {
+ return func(m modules.InitializableModule) {
+ mod, ok := m.(*p2pModule)
+ if ok {
+ mod.unstakedActorRouter = router
+ mod.logger.Debug().Msg("using unstaked actor router provided via `WithUnstakeActorRouter`")
+ }
+ }
+}
+
+// WithStakedActorRouter assigns the given router to the P2P modules'
+// `#stakedActor` field, exclusively used to communicate between staked actors.
+func WithStakedActorRouter(router typesP2P.Router) modules.ModuleOption {
+ return func(m modules.InitializableModule) {
+ mod, ok := m.(*p2pModule)
+ if ok {
+ mod.stakedActorRouter = router
+ mod.logger.Debug().Msg("using staked actor router provided via `WithStakeActorRouter`")
+ }
+ }
+}
diff --git a/p2p/transport_encryption_test.go b/p2p/transport_encryption_test.go
index d95cb6496..799340a17 100644
--- a/p2p/transport_encryption_test.go
+++ b/p2p/transport_encryption_test.go
@@ -14,6 +14,7 @@ import (
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
typesP2P "github.com/pokt-network/pocket/p2p/types"
+ mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/configs/types"
@@ -23,7 +24,7 @@ import (
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
)
-func TestP2pModule_Insecure_Error(t *testing.T) {
+func TestP2pModule_RainTreeRouter_Insecure_Error(t *testing.T) {
// TECHDEBT(#609): refactor mock setup with similar test utilities.
ctrl := gomock.NewController(t)
hostname := "127.0.0.1"
@@ -54,7 +55,7 @@ func TestP2pModule_Insecure_Error(t *testing.T) {
telemetryMock.EXPECT().GetEventMetricsAgent().Return(eventMetricsAgentMock).AnyTimes()
telemetryMock.EXPECT().GetModuleName().Return(modules.TelemetryModuleName).AnyTimes()
- busMock := createMockBus(t, runtimeMgrMock)
+ busMock := createMockBus(t, runtimeMgrMock, nil)
busMock.EXPECT().GetConsensusModule().Return(mockConsensusModule).AnyTimes()
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
busMock.EXPECT().GetTelemetryModule().Return(telemetryMock).AnyTimes()
@@ -73,7 +74,10 @@ func TestP2pModule_Insecure_Error(t *testing.T) {
dnsDone := testutil.PrepareDNSMockFromServiceURLs(t, serviceURLs)
t.Cleanup(dnsDone)
- p2pMod, err := Create(busMock)
+ routerMock := mock_types.NewMockRouter(ctrl)
+ routerMock.EXPECT().Close().Times(1)
+
+ p2pMod, err := Create(busMock, WithUnstakedActorRouter(routerMock))
require.NoError(t, err)
err = p2pMod.Start()
@@ -114,6 +118,6 @@ func TestP2pModule_Insecure_Error(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
- _, err = clearNode.NewStream(ctx, libp2pPeerInfo.ID, protocol.PoktProtocolID)
+ _, err = clearNode.NewStream(ctx, libp2pPeerInfo.ID, protocol.RaintreeProtocolID)
require.ErrorContains(t, err, "failed to negotiate security protocol: protocols not supported:")
}
diff --git a/p2p/types/errors.go b/p2p/types/errors.go
index 0688e5757..632bdd534 100644
--- a/p2p/types/errors.go
+++ b/p2p/types/errors.go
@@ -1,6 +1,13 @@
package types
-import "fmt"
+import (
+ "errors"
+ "fmt"
+)
+
+var (
+ ErrUnknownPeer = errors.New("unknown peer")
+)
func ErrUnknownEventType(msg any) error {
return fmt.Errorf("unknown event type: %v", msg)
diff --git a/p2p/types/proto/background.proto b/p2p/types/proto/background.proto
new file mode 100644
index 000000000..da1e138e7
--- /dev/null
+++ b/p2p/types/proto/background.proto
@@ -0,0 +1,13 @@
+syntax = "proto3";
+package background;
+
+option go_package = "github.com/pokt-network/pocket/p2p/types";
+
+// BackgroundMessage is intended to be used with the background router for
+// communication with unstaked actors. For unstaked actors, this is the only
+// means of communication with the network. For staked actors, this functions
+// as a redundancy for broadcast propagation (in addition to the staked actor
+// router broadcast message - i.e. `RainTreeMessage`).
+message BackgroundMessage {
+ bytes data = 1;
+}
diff --git a/p2p/raintree/types/proto/raintree.proto b/p2p/types/proto/raintree.proto
similarity index 100%
rename from p2p/raintree/types/proto/raintree.proto
rename to p2p/types/proto/raintree.proto
diff --git a/p2p/types/router.go b/p2p/types/router.go
index 37080acbd..0aacf29f8 100644
--- a/p2p/types/router.go
+++ b/p2p/types/router.go
@@ -14,6 +14,7 @@ type Router interface {
Broadcast(data []byte) error
Send(data []byte, address cryptoPocket.Address) error
+ Close() error
// GetPeerstore is used by the P2P module to update the staked actor router's
// (`RainTreeRouter`) peerstore.
diff --git a/p2p/utils/host.go b/p2p/utils/host.go
index e9c6e130b..3597856b7 100644
--- a/p2p/utils/host.go
+++ b/p2p/utils/host.go
@@ -6,10 +6,11 @@ import (
"time"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
+ libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
+ "go.uber.org/multierr"
+
"github.com/pokt-network/pocket/logger"
- "github.com/pokt-network/pocket/p2p/protocol"
typesP2P "github.com/pokt-network/pocket/p2p/types"
- "go.uber.org/multierr"
)
const (
@@ -76,7 +77,7 @@ func RemovePeerFromLibp2pHost(host libp2pHost.Host, peer typesP2P.Peer) error {
}
// Libp2pSendToPeer sends data to the given pocket peer from the given libp2p host.
-func Libp2pSendToPeer(host libp2pHost.Host, data []byte, peer typesP2P.Peer) error {
+func Libp2pSendToPeer(host libp2pHost.Host, protocolID libp2pProtocol.ID, data []byte, peer typesP2P.Peer) error {
// TECHDEBT(#595): add ctx to interface methods and propagate down.
ctx := context.TODO()
@@ -94,7 +95,7 @@ func Libp2pSendToPeer(host libp2pHost.Host, data []byte, peer typesP2P.Peer) err
logger.Global.Debug().Err(err).Msg("logging resource scope stats")
}
- stream, err := host.NewStream(ctx, peerInfo.ID, protocol.PoktProtocolID)
+ stream, err := host.NewStream(ctx, peerInfo.ID, protocolID)
if err != nil {
return fmt.Errorf("opening stream: %w", err)
}
diff --git a/p2p/utils_test.go b/p2p/utils_test.go
index 633ea1425..26ddbeb80 100644
--- a/p2p/utils_test.go
+++ b/p2p/utils_test.go
@@ -20,6 +20,7 @@ import (
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
+ mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
@@ -29,6 +30,7 @@ import (
"github.com/pokt-network/pocket/runtime/test_artifacts"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
+ "github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
"github.com/pokt-network/pocket/telemetry"
@@ -107,10 +109,20 @@ func waitForNetworkSimulationCompletion(t *testing.T, wg *sync.WaitGroup) {
func createP2PModules(t *testing.T, busMocks []*mockModules.MockBus, netMock mocknet.Mocknet) (p2pModules map[string]*p2pModule) {
peerIDs := setupMockNetPeers(t, netMock, len(busMocks))
+ ctrl := gomock.NewController(t)
+ noopBackgroundRouterMock := mock_types.NewMockRouter(ctrl)
+ noopBackgroundRouterMock.EXPECT().Broadcast(gomock.Any()).Times(1)
+ noopBackgroundRouterMock.EXPECT().Close().Times(len(busMocks))
+
p2pModules = make(map[string]*p2pModule, len(busMocks))
for i := range busMocks {
host := netMock.Host(peerIDs[i])
- p2pMod, err := Create(busMocks[i], WithHostOption(host))
+ p2pMod, err := Create(
+ busMocks[i],
+ WithHost(host),
+ // mock background router to prevent & ignore background message propagation.
+ WithUnstakedActorRouter(noopBackgroundRouterMock),
+ )
require.NoError(t, err)
p2pModules[validatorId(i+1)] = p2pMod.(*p2pModule)
}
@@ -180,15 +192,23 @@ func createMockRuntimeMgrs(t *testing.T, numValidators int) []modules.RuntimeMgr
return mockRuntimeMgrs
}
-func createMockBuses(t *testing.T, runtimeMgrs []modules.RuntimeMgr) []*mockModules.MockBus {
+func createMockBuses(
+ t *testing.T,
+ runtimeMgrs []modules.RuntimeMgr,
+ readWriteWaitGroup *sync.WaitGroup,
+) []*mockModules.MockBus {
mockBuses := make([]*mockModules.MockBus, len(runtimeMgrs))
for i := range mockBuses {
- mockBuses[i] = createMockBus(t, runtimeMgrs[i])
+ mockBuses[i] = createMockBus(t, runtimeMgrs[i], readWriteWaitGroup)
}
return mockBuses
}
-func createMockBus(t *testing.T, runtimeMgr modules.RuntimeMgr) *mockModules.MockBus {
+func createMockBus(
+ t *testing.T,
+ runtimeMgr modules.RuntimeMgr,
+ readWriteWaitGroup *sync.WaitGroup,
+) *mockModules.MockBus {
ctrl := gomock.NewController(t)
mockBus := mockModules.NewMockBus(ctrl)
mockBus.EXPECT().GetRuntimeMgr().Return(runtimeMgr).AnyTimes()
@@ -199,6 +219,14 @@ func createMockBus(t *testing.T, runtimeMgr modules.RuntimeMgr) *mockModules.Moc
mockModulesRegistry.EXPECT().GetModule(peerstore_provider.ModuleName).Return(nil, runtime.ErrModuleNotRegistered(peerstore_provider.ModuleName)).AnyTimes()
mockModulesRegistry.EXPECT().GetModule(current_height_provider.ModuleName).Return(nil, runtime.ErrModuleNotRegistered(current_height_provider.ModuleName)).AnyTimes()
mockBus.EXPECT().GetModulesRegistry().Return(mockModulesRegistry).AnyTimes()
+ mockBus.EXPECT().PublishEventToBus(gomock.AssignableToTypeOf(&messaging.PocketEnvelope{})).
+ Do(func(envelope *messaging.PocketEnvelope) {
+ fmt.Println("[valId: unknown] Read")
+ fmt.Printf("content type: %s\n", envelope.Content.GetTypeUrl())
+ if readWriteWaitGroup != nil {
+ readWriteWaitGroup.Done()
+ }
+ }).AnyTimes() // TECHDEBT: assert number of times. Consider `waitForEventsInternal` or similar as in consensus.
mockBus.EXPECT().PublishEventToBus(gomock.Any()).AnyTimes()
return mockBus
}
@@ -313,11 +341,32 @@ func prepareEventMetricsAgentMock(t *testing.T, valId string, wg *sync.WaitGroup
ctrl := gomock.NewController(t)
eventMetricsAgentMock := mockModules.NewMockEventMetricsAgent(ctrl)
- eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Eq(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL), gomock.Any()).Do(func(n, e any, l ...any) {
+ // TECHDEBT(#886): The number of times each telemetry event is expected
+ // (below) is dependent on the number of redundant messages all validators see,
+ // which is a function of the network size. Until this function is derived and
+ // implemented, we cannot predict the number of times each event is expected.
+ _ = expectedNumNetworkWrites
+
+ eventMetricsAgentMock.EXPECT().EmitEvent(
+ gomock.Any(),
+ gomock.Any(),
+ gomock.Eq(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL),
+ gomock.Any(),
+ ).Do(func(n, e any, l ...any) {
+ log.Printf("[valId: %s] Write\n", valId)
+ wg.Done()
+ }).AnyTimes() // TECHDEBT: expect specific number of non-redundant writes once known.
+ eventMetricsAgentMock.EXPECT().EmitEvent(
+ gomock.Any(),
+ gomock.Eq(telemetry.P2P_BROADCAST_MESSAGE_REDUNDANCY_PER_BLOCK_EVENT_METRIC_NAME),
+ gomock.Any(),
+ gomock.Any(), // nonce
+ gomock.Any(),
+ gomock.Any(), // blockHeight
+ ).Do(func(n, e any, l ...any) {
log.Printf("[valId: %s] Write\n", valId)
wg.Done()
- }).Times(expectedNumNetworkWrites)
+ }).AnyTimes() // TECHDEBT: expect specific number of redundant writes once known.
eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Not(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL), gomock.Any()).AnyTimes()
return eventMetricsAgentMock
diff --git a/shared/messaging/envelope.go b/shared/messaging/envelope.go
index 4150fbfba..f65304007 100644
--- a/shared/messaging/envelope.go
+++ b/shared/messaging/envelope.go
@@ -4,6 +4,8 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
+
+ cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
)
// PackMessage returns a *PocketEnvelope after having packed the message supplied as an argument
@@ -12,7 +14,10 @@ func PackMessage(message proto.Message) (*PocketEnvelope, error) {
if err != nil {
return nil, err
}
- return &PocketEnvelope{Content: anyMsg}, nil
+ return &PocketEnvelope{
+ Content: anyMsg,
+ Nonce: cryptoPocket.GetNonce(),
+ }, nil
}
// UnpackMessage extracts the message inside the PocketEnvelope decorating it with typing information