diff --git a/app/client/cli/peer/connections.go b/app/client/cli/peer/connections.go new file mode 100644 index 000000000..ef99a3190 --- /dev/null +++ b/app/client/cli/peer/connections.go @@ -0,0 +1,80 @@ +package peer + +import ( + "fmt" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/pokt-network/pocket/app/client/cli/helpers" + "github.com/pokt-network/pocket/p2p/debug" + "github.com/pokt-network/pocket/shared/messaging" +) + +var ( + connectionsCmd = &cobra.Command{ + Use: "connections", + Short: "Print open peer connections", + RunE: connectionsRunE, + } +) + +func init() { + PeerCmd.AddCommand(connectionsCmd) +} + +func connectionsRunE(cmd *cobra.Command, _ []string) error { + var routerType debug.RouterType + + bus, err := helpers.GetBusFromCmd(cmd) + if err != nil { + return err + } + + switch { + case stakedFlag: + if unstakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.StakedRouterType + case unstakedFlag: + if stakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.UnstakedRouterType + // even if `allFlag` is false, we still want to print all connections + default: + if stakedFlag || unstakedFlag { + return ErrRouterType + } + routerType = debug.AllRouterTypes + } + + debugMsg := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS, + Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST, + Message: &anypb.Any{ + Value: []byte(routerType), + }, + } + debugMsgAny, err := anypb.New(debugMsg) + if err != nil { + return fmt.Errorf("creating anypb from debug message: %w", err) + } + + if localFlag { + if err := debug.PrintPeerConnections(bus, routerType); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil + } + + // TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before + // p2p broadcast can be used with to reach unstaked actors. + // CONSIDERATION: add the peer commands to the interactive CLI as the P2P module + // instance could persist between commands. Other interactive CLI commands which + // rely on unstaked actor router broadcast are working as expected. + + // TECHDEBT(#810, #811): use broadcast instead to reach all peers. + return sendToStakedPeers(cmd, debugMsgAny) +} diff --git a/charts/pocket/values.yaml b/charts/pocket/values.yaml index afa383c2f..745e46bfb 100644 --- a/charts/pocket/values.yaml +++ b/charts/pocket/values.yaml @@ -100,6 +100,7 @@ config: use_rain_tree: true is_empty_connection_type: false private_key: "" # @ignored This value is needed but ignored - use privateKeySecretKeyRef instead + # TODO: I think this has been renamed to `max_nonces` max_mempool_count: 100000 enable_peer_discovery_debug_rpc: false telemetry: diff --git a/p2p/debug.go b/p2p/debug.go index 7c352eb5a..e4da506a1 100644 --- a/p2p/debug.go +++ b/p2p/debug.go @@ -10,7 +10,8 @@ import ( func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { switch msg.Action { - case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: + case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST, + messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS: if !m.cfg.EnablePeerDiscoveryDebugRpc { return typesP2P.ErrPeerDiscoveryDebugRPCDisabled } @@ -23,6 +24,9 @@ func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: routerType := debug.RouterType(msg.Message.Value) return debug.PrintPeerList(m.GetBus(), routerType) + case messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS: + routerType := debug.RouterType(msg.Message.Value) + return debug.PrintPeerConnections(m.GetBus(), routerType) default: return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action) } diff --git a/p2p/debug/connections.go b/p2p/debug/connections.go new file mode 100644 index 000000000..5232c87c1 --- /dev/null +++ b/p2p/debug/connections.go @@ -0,0 +1,149 @@ +package debug + +import ( + "fmt" + "os" + "strconv" + + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + libp2pPeer "github.com/libp2p/go-libp2p/core/peer" + + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" +) + +var printConnectionsHeader = []string{"Peer ID", "Multiaddr", "Opened", "Direction", "NumStreams"} + +func PrintPeerConnections(bus modules.Bus, routerType RouterType) error { + var ( + connections []libp2pNetwork.Conn + routerPlurality = "" + ) + + if routerType == AllRouterTypes { + routerPlurality = "s" + } + + connections, err := getFilteredConnections(bus, routerType) + if err != nil { + return fmt.Errorf("getting connecions: %w", err) + } + + if err := LogSelfAddress(bus); err != nil { + return fmt.Errorf("printing self address: %w", err) + } + + // NB: Intentionally printing with `fmt` instead of the logger to match + // `utils.PrintPeerListTable` which does not use the logger due to + // incompatibilities with the tabwriter. + // (This doesn't seem to work as expected; i.e. not printing at all in tilt.) + if _, err := fmt.Fprintf( + os.Stdout, + "%s router peerstore%s:\n", + routerType, + routerPlurality, + ); err != nil { + return fmt.Errorf("printing to stdout: %w", err) + } + + if err := PrintConnectionsTable(connections); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil +} + +func PrintConnectionsTable(conns []libp2pNetwork.Conn) error { + return utils.PrintTable(printConnectionsHeader, peerConnsRowConsumerFactory(conns)) +} + +func getFilteredConnections( + bus modules.Bus, + routerType RouterType, +) ([]libp2pNetwork.Conn, error) { + var ( + pstore typesP2P.Peerstore + idsToInclude map[libp2pPeer.ID]struct{} + p2pModule = bus.GetP2PModule() + connections = p2pModule.GetConnections() + ) + + // TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider + // is retrievable as a proper submodule. + pstoreProviderModule, err := bus.GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return nil, fmt.Errorf("getting peerstore provider: %w", err) + } + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return nil, fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) + } + //-- + + switch routerType { + case AllRouterTypes: + // return early; no need to filter + return connections, nil + case StakedRouterType: + pstore, err = pstoreProvider.GetStakedPeerstoreAtCurrentHeight() + if err != nil { + return nil, fmt.Errorf("getting staked peerstore: %w", err) + } + case UnstakedRouterType: + pstore, err = pstoreProvider.GetUnstakedPeerstore() + if err != nil { + return nil, fmt.Errorf("getting unstaked peerstore: %w", err) + } + } + + idsToInclude, err = getPeerIDs(pstore.GetPeerList()) + if err != nil { + return nil, fmt.Errorf("getting peer IDs: %w", err) + } + + var filteredConnections []libp2pNetwork.Conn + for _, conn := range connections { + if _, ok := idsToInclude[conn.RemotePeer()]; ok { + filteredConnections = append(filteredConnections, conn) + } + } + return filteredConnections, nil +} + +func peerConnsRowConsumerFactory(conns []libp2pNetwork.Conn) utils.RowConsumer { + return func(provideRow utils.RowProvider) error { + for _, conn := range conns { + if err := provideRow( + conn.RemotePeer().String(), + conn.RemoteMultiaddr().String(), + conn.Stat().Opened.String(), + conn.Stat().Direction.String(), + strconv.Itoa(conn.Stat().NumStreams), + ); err != nil { + return err + } + } + return nil + } +} + +func getPeerIDs(peers []typesP2P.Peer) (map[libp2pPeer.ID]struct{}, error) { + ids := make(map[libp2pPeer.ID]struct{}) + for _, peer := range peers { + addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer) + if err != nil { + return nil, err + } + + // ID already in set; continue + if _, ok := ids[addrInfo.ID]; ok { + continue + } + + // add ID to set + ids[addrInfo.ID] = struct{}{} + } + return ids, nil +} diff --git a/p2p/module.go b/p2p/module.go index 6bb8f479a..58f64322e 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p" libp2pHost "github.com/libp2p/go-libp2p/core/host" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" "github.com/multiformats/go-multiaddr" "go.uber.org/multierr" "google.golang.org/protobuf/proto" @@ -274,6 +275,11 @@ func (m *p2pModule) GetAddress() (cryptoPocket.Address, error) { return m.address, nil } +// GetConnections implements the respective `modules.P2PModule` interface method. +func (m *p2pModule) GetConnections() []libp2pNetwork.Conn { + return m.host.Network().Conns() +} + // setupDependencies sets up the module's current height and peerstore providers. func (m *p2pModule) setupDependencies() error { if err := m.setupPeerstoreProvider(); err != nil { diff --git a/shared/messaging/proto/debug_message.proto b/shared/messaging/proto/debug_message.proto index 55a87c695..9019ab85e 100644 --- a/shared/messaging/proto/debug_message.proto +++ b/shared/messaging/proto/debug_message.proto @@ -23,6 +23,7 @@ enum DebugMessageAction { DEBUG_PERSISTENCE_CLEAR_STATE = 8; DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9; DEBUG_P2P_PEER_LIST = 10; + DEBUG_P2P_PEER_CONNECTIONS = 11; } message DebugMessage { diff --git a/shared/modules/p2p_module.go b/shared/modules/p2p_module.go index e306d4fd2..097b557cb 100644 --- a/shared/modules/p2p_module.go +++ b/shared/modules/p2p_module.go @@ -3,8 +3,10 @@ package modules //go:generate mockgen -destination=./mocks/p2p_module_mock.go github.com/pokt-network/pocket/shared/modules P2PModule import ( - cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" "google.golang.org/protobuf/types/known/anypb" + + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" ) const P2PModuleName = "p2p" @@ -17,6 +19,10 @@ type P2PModule interface { // TECHDEBT(#811): uncomment after moving `typesP2P.Peerstore` interface to a shared package // GetUnstakedPeerstore() (typesP2P.Peerstore, error) + // GetConnections returns a list of all connections between the local libp2p + // host and connected remote peers. + GetConnections() []libp2pNetwork.Conn + // A network broadcast to all staked actors on the network using RainTree Broadcast(msg *anypb.Any) error