Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] [Tooling] Peer discovery peer connections subcommand #801

Open
wants to merge 2 commits into
base: feat/peer-discovery-list
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions app/client/cli/peer/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package peer

import (
"fmt"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/p2p/debug"
"github.com/pokt-network/pocket/shared/messaging"
)

var (
connectionsCmd = &cobra.Command{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere else we have a function that returns a new instance of cobra.Command.

I think we should either:

  1. Stay consisstent
  2. Stay consistent + add a TODO to change the others
  3. Change all of them

Use: "connections",
Short: "Print open peer connections",
Comment on lines +16 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Use: "connections",
Short: "Print open peer connections",
Use: "Connections",
Short: "Print open peer connections",
Long: "Prints ..." // list out printConnectionsHeader
Aliases: []string{"conns"},

RunE: connectionsRunE,
}
)

func init() {
PeerCmd.AddCommand(connectionsCmd)
}

func connectionsRunE(cmd *cobra.Command, _ []string) error {
var routerType debug.RouterType

bus, err := helpers.GetBusFromCmd(cmd)
if err != nil {
return err
}

switch {
case stakedFlag:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OPTIONAL: The way I would've done this is by having a smaller private helper function like below that returns the router typ:

func foo() routerType {
	switch
	case stakedFlag && !unstakedFlag && !allFlag {
		return debug.StakeRouterType
	}
	case unstakedFlag && !stakedFlag && !allFlag {
		return debug.UnstakedRouterType
	}
	case stakedFlag || unstakedFlag {
		return ErrRouterType
	}
	// even if `allFlag` is false, we still want to print all connections
	default:
		return debug.AllRouterTypes
	}
}

Wdyt?

if unstakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.StakedRouterType
case unstakedFlag:
if stakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.UnstakedRouterType
// even if `allFlag` is false, we still want to print all connections
default:
if stakedFlag || unstakedFlag {
return ErrRouterType
}
routerType = debug.AllRouterTypes
}

debugMsg := &messaging.DebugMessage{
Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS,
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST,
Message: &anypb.Any{
Value: []byte(routerType),
},
}
debugMsgAny, err := anypb.New(debugMsg)
if err != nil {
return fmt.Errorf("creating anypb from debug message: %w", err)
}

if localFlag {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if this is false we don't print anything?

I don't fully understand its use. Is it like a "verbose" flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This localFlag determines whether we print locally ie to the terminal that ran the command or on the node itself AFAIK. With -l we will see the output in our terminal, otherwise it will be logged by the node

if err := debug.PrintPeerConnections(bus, routerType); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}

// TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before
// p2p broadcast can be used with to reach unstaked actors.
// CONSIDERATION: add the peer commands to the interactive CLI as the P2P module
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why don't we do this?

// instance could persist between commands. Other interactive CLI commands which
// rely on unstaked actor router broadcast are working as expected.

// TECHDEBT(#810, #811): use broadcast instead to reach all peers.
return sendToStakedPeers(cmd, debugMsgAny)
}
1 change: 1 addition & 0 deletions charts/pocket/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ config:
use_rain_tree: true
is_empty_connection_type: false
private_key: "" # @ignored This value is needed but ignored - use privateKeySecretKeyRef instead
# TODO: I think this has been renamed to `max_nonces`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cna you please update it everywhere else as well?

Screenshot 2023-07-13 at 4 14 38 PM
Screenshot 2023-07-13 at 4 14 31 PM
Screenshot 2023-07-13 at 4 14 23 PM

max_mempool_count: 100000
enable_peer_discovery_debug_rpc: false
telemetry:
Expand Down
6 changes: 5 additions & 1 deletion p2p/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST,
messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
if !m.cfg.EnablePeerDiscoveryDebugRpc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can skip the first switch statement and just return (with a debug log or error) if if !m.cfg.EnablePeerDiscoveryDebugRpc {

return typesP2P.ErrPeerDiscoveryDebugRPCDisabled
}
Expand All @@ -23,6 +24,9 @@ func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
routerType := debug.RouterType(msg.Message.Value)
return debug.PrintPeerList(m.GetBus(), routerType)
case messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
routerType := debug.RouterType(msg.Message.Value)
return debug.PrintPeerConnections(m.GetBus(), routerType)
default:
return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action)
}
Expand Down
149 changes: 149 additions & 0 deletions p2p/debug/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package debug
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a debug build tag?


import (
"fmt"
"os"
"strconv"

libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"

"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/shared/modules"
)

var printConnectionsHeader = []string{"Peer ID", "Multiaddr", "Opened", "Direction", "NumStreams"}

func PrintPeerConnections(bus modules.Bus, routerType RouterType) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add godoco comments for functions exposed outside of this pacakge

var (
connections []libp2pNetwork.Conn
routerPlurality = ""
)

if routerType == AllRouterTypes {
routerPlurality = "s"
}

connections, err := getFilteredConnections(bus, routerType)
if err != nil {
return fmt.Errorf("getting connecions: %w", err)
}

if err := LogSelfAddress(bus); err != nil {
return fmt.Errorf("printing self address: %w", err)
}

// NB: Intentionally printing with `fmt` instead of the logger to match
// `utils.PrintPeerListTable` which does not use the logger due to
// incompatibilities with the tabwriter.
// (This doesn't seem to work as expected; i.e. not printing at all in tilt.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply logger or fmt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To rephrase, it doesn't seem like writing to stdout with anything other than tabwriter produces any output in the logs.

if _, err := fmt.Fprintf(
os.Stdout,
"%s router peerstore%s:\n",
routerType,
routerPlurality,
); err != nil {
return fmt.Errorf("printing to stdout: %w", err)
}

if err := PrintConnectionsTable(connections); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}

func PrintConnectionsTable(conns []libp2pNetwork.Conn) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does PrintConnectionsTable need to be publically exposed given that we use PrintPeerConnections in the cli?

return utils.PrintTable(printConnectionsHeader, peerConnsRowConsumerFactory(conns))
}

func getFilteredConnections(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what filter does?

For example: Retrieve connections based on the router type and only return connections that are active (i.e. open) w.r.t the remote host

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns only connections with identities matching those found in the given router type.

bus modules.Bus,
routerType RouterType,
) ([]libp2pNetwork.Conn, error) {
var (
pstore typesP2P.Peerstore
idsToInclude map[libp2pPeer.ID]struct{}
p2pModule = bus.GetP2PModule()
connections = p2pModule.GetConnections()
)

// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

810 is resolved . Ditto elsewhere

// is retrievable as a proper submodule.
pstoreProviderModule, err := bus.GetModulesRegistry().
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return nil, fmt.Errorf("getting peerstore provider: %w", err)
}
pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider)
if !ok {
return nil, fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
}
//--
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm?


switch routerType {
case AllRouterTypes:
// return early; no need to filter
return connections, nil
case StakedRouterType:
pstore, err = pstoreProvider.GetStakedPeerstoreAtCurrentHeight()
if err != nil {
return nil, fmt.Errorf("getting staked peerstore: %w", err)
}
case UnstakedRouterType:
pstore, err = pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return nil, fmt.Errorf("getting unstaked peerstore: %w", err)
}
}

idsToInclude, err = getPeerIDs(pstore.GetPeerList())
if err != nil {
return nil, fmt.Errorf("getting peer IDs: %w", err)
}

var filteredConnections []libp2pNetwork.Conn
for _, conn := range connections {
if _, ok := idsToInclude[conn.RemotePeer()]; ok {
filteredConnections = append(filteredConnections, conn)
}
}
return filteredConnections, nil
}

func peerConnsRowConsumerFactory(conns []libp2pNetwork.Conn) utils.RowConsumer {
return func(provideRow utils.RowProvider) error {
for _, conn := range conns {
if err := provideRow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we don't return an error when provideRow errors? Is a partial list not an option?

conn.RemotePeer().String(),
conn.RemoteMultiaddr().String(),
conn.Stat().Opened.String(),
conn.Stat().Direction.String(),
strconv.Itoa(conn.Stat().NumStreams),
); err != nil {
return err
}
}
return nil
}
}

func getPeerIDs(peers []typesP2P.Peer) (map[libp2pPeer.ID]struct{}, error) {
ids := make(map[libp2pPeer.ID]struct{})
for _, peer := range peers {
addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return nil, err
}

// ID already in set; continue
if _, ok := ids[addrInfo.ID]; ok {
continue
}

// add ID to set
ids[addrInfo.ID] = struct{}{}
}
return ids, nil
}
6 changes: 6 additions & 0 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -274,6 +275,11 @@ func (m *p2pModule) GetAddress() (cryptoPocket.Address, error) {
return m.address, nil
}

// GetConnections implements the respective `modules.P2PModule` interface method.
func (m *p2pModule) GetConnections() []libp2pNetwork.Conn {
return m.host.Network().Conns()
}

// setupDependencies sets up the module's current height and peerstore providers.
func (m *p2pModule) setupDependencies() error {
if err := m.setupPeerstoreProvider(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions shared/messaging/proto/debug_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum DebugMessageAction {
DEBUG_PERSISTENCE_CLEAR_STATE = 8;
DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9;
DEBUG_P2P_PEER_LIST = 10;
DEBUG_P2P_PEER_CONNECTIONS = 11;
}

message DebugMessage {
Expand Down
8 changes: 7 additions & 1 deletion shared/modules/p2p_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package modules
//go:generate mockgen -destination=./mocks/p2p_module_mock.go github.com/pokt-network/pocket/shared/modules P2PModule

import (
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"google.golang.org/protobuf/types/known/anypb"

cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
)

const P2PModuleName = "p2p"
Expand All @@ -17,6 +19,10 @@ type P2PModule interface {
// TECHDEBT(#811): uncomment after moving `typesP2P.Peerstore` interface to a shared package
// GetUnstakedPeerstore() (typesP2P.Peerstore, error)

// GetConnections returns a list of all connections between the local libp2p
// host and connected remote peers.
GetConnections() []libp2pNetwork.Conn

// A network broadcast to all staked actors on the network using RainTree
Broadcast(msg *anypb.Any) error

Expand Down