Skip to content

Commit

Permalink
feat: add peer list subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jul 12, 2023
1 parent 2394dc2 commit 76234d2
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 8 deletions.
108 changes: 108 additions & 0 deletions app/client/cli/peer/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package peer

import (
"fmt"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p"
"github.com/pokt-network/pocket/p2p/debug"
"github.com/pokt-network/pocket/shared/messaging"
)

var (
listCmd = &cobra.Command{
Use: "list",
Short: "Print addresses and service URLs of known peers",
RunE: listRunE,
}

ErrRouterType = fmt.Errorf("must specify one of --staked, --unstaked, or --all")
)

func init() {
PeerCmd.AddCommand(listCmd)
}

func listRunE(cmd *cobra.Command, _ []string) error {
var routerType debug.RouterType

bus, err := helpers.GetBusFromCmd(cmd)
if err != nil {
return err
}

switch {
case stakedFlag:
if unstakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.StakedRouterType
case unstakedFlag:
if stakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.UnstakedRouterType
case allFlag:
if stakedFlag || unstakedFlag {
return ErrRouterType
}
routerType = debug.AllRouterTypes
default:
return ErrRouterType
}

debugMsg := &messaging.DebugMessage{
Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST,
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST,
Message: &anypb.Any{
Value: []byte(routerType),
},
}
debugMsgAny, err := anypb.New(debugMsg)
if err != nil {
return fmt.Errorf("creating anypb from debug message: %w", err)
}

if localFlag {
if err := p2p.PrintPeerList(bus, routerType); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}

// TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before
// p2p broadcast can be used with to reach unstaked actors.
// CONSIDERATION: add the peer commands to the interactive CLI as the P2P module
// instance could persist between commands. Other interactive CLI commands which
// rely on unstaked actor router broadcast are working as expected.

// TECHDEBT(#810, #811): use broadcast instead to reach all peers.
return sendToStakedPeers(cmd, debugMsgAny)
}

func sendToStakedPeers(cmd *cobra.Command, debugMsgAny *anypb.Any) error {
bus, err := helpers.GetBusFromCmd(cmd)
if err != nil {
return err
}

pstore, err := helpers.FetchPeerstore(cmd)
if err != nil {
logger.Global.Fatal().Err(err).Msg("Unable to retrieve the pstore")
}

if pstore.Size() == 0 {
logger.Global.Fatal().Msg("No validators found")
}

for _, peer := range pstore.GetPeerList() {
if err := bus.GetP2PModule().Send(peer.GetAddress(), debugMsgAny); err != nil {
logger.Global.Error().Err(err).Msg("Failed to send debug message")
}
}
return nil
}
4 changes: 2 additions & 2 deletions app/client/cli/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var (
)

func init() {
PeerCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", false, "operations apply to both staked & unstaked router peerstores")
PeerCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", true, "operations apply to both staked & unstaked router peerstores (default)")
PeerCmd.PersistentFlags().BoolVarP(&stakedFlag, "staked", "s", false, "operations only apply to staked router peerstore (i.e. raintree)")
PeerCmd.PersistentFlags().BoolVarP(&unstakedFlag, "unstaked", "u", false, "operations only apply to unstaked router peerstore (i.e. gossipsub)")
PeerCmd.PersistentFlags().BoolVarP(&localFlag, "local", "l", false, "operations apply to the local (CLI binary's) P2P module rather than being sent to the --remote_cli_url")
PeerCmd.PersistentFlags().BoolVarP(&localFlag, "local", "l", false, "operations apply to the local (CLI binary's) P2P module instead of being broadcast")
}
128 changes: 128 additions & 0 deletions p2p/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package p2p

import (
"fmt"
"os"

"github.com/pokt-network/pocket/p2p/debug"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
)

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
if !m.cfg.EnablePeerDiscoveryDebugRpc {
return typesP2P.ErrPeerDiscoveryDebugRPCDisabled
}
default:
// This debug message isn't intended for the P2P module, ignore it.
return nil
}

switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
routerType := debug.RouterType(msg.Message.Value)
return PrintPeerList(m.GetBus(), routerType)
default:
return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action)
}
}

func PrintPeerList(bus modules.Bus, routerType debug.RouterType) error {
var (
peers typesP2P.PeerList
pstorePlurality = ""
)

currentHeightProvider := bus.GetCurrentHeightProvider()

// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule.
pstoreProviderModule, err := bus.GetModulesRegistry().
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return fmt.Errorf("getting peerstore provider: %w", err)
}
pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider)
if !ok {
return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
}
//--

switch routerType {
case debug.StakedRouterType:
// TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()`
// interface method.
currentHeight := currentHeightProvider.CurrentHeight()
pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight)
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

peers = pstore.GetPeerList()
case debug.UnstakedRouterType:
pstore, err := pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

peers = pstore.GetPeerList()
case debug.AllRouterTypes:
pstorePlurality = "s"

// TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()`
currentHeight := currentHeightProvider.CurrentHeight()
stakedPStore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight)
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}
unstakedPStore, err := pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

unstakedPeers := unstakedPStore.GetPeerList()
stakedPeers := stakedPStore.GetPeerList()
additionalPeers, _ := unstakedPeers.Delta(stakedPeers)

// NB: there should never be any "additional" peers. This would represent
// a staked actor who is not participating in background gossip for some
// reason. It's possible that a staked actor node which has restarted
// recently and hasn't yet completed background router bootstrapping may
// result in peers experiencing this state.
if len(additionalPeers) == 0 {
return debug.PrintPeerListTable(unstakedPeers)
}

allPeers := append(typesP2P.PeerList{}, unstakedPeers...)
allPeers = append(allPeers, additionalPeers...)
peers = allPeers
default:
return fmt.Errorf("unsupported router type: %s", routerType)
}

if err := debug.LogSelfAddress(bus); err != nil {
return fmt.Errorf("printing self address: %w", err)
}

// NB: Intentionally printing with `fmt` instead of the logger to match
// `utils.PrintPeerListTable` which does not use the logger due to
// incompatibilities with the tabwriter.
// (This doesn't seem to work as expected; i.e. not printing at all in tilt.)
if _, err := fmt.Fprintf(
os.Stdout,
"%s router peerstore%s:\n",
routerType,
pstorePlurality,
); err != nil {
return fmt.Errorf("printing to stdout: %w", err)
}

if err := debug.PrintPeerListTable(peers); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}
62 changes: 62 additions & 0 deletions p2p/debug/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package debug

import (
"fmt"
"os"

"github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/shared/modules"
)

type RouterType string

const (
StakedRouterType RouterType = "staked"
UnstakedRouterType RouterType = "unstaked"
AllRouterTypes RouterType = "all"
)

var peerListTableHeader = []string{"Peer ID", "Pokt Address", "ServiceURL"}

func LogSelfAddress(bus modules.Bus) error {
p2pModule := bus.GetP2PModule()
if p2pModule == nil {
return fmt.Errorf("no p2p module found on the bus")
}

selfAddr, err := p2pModule.GetAddress()
if err != nil {
return fmt.Errorf("getting self address: %w", err)
}

_, err = fmt.Fprintf(os.Stdout, "self address: %s", selfAddr.String())
return err
}

// PrintPeerListTable prints a table of the passed peers to stdout. Header row is defined
// by `peerListTableHeader`. Row printing behavior is defined by `peerListRowConsumerFactory`.
func PrintPeerListTable(peers types.PeerList) error {
return utils.PrintTable(peerListTableHeader, peerListRowConsumerFactory(peers))
}

func peerListRowConsumerFactory(peers types.PeerList) utils.RowConsumer {
return func(provideRow utils.RowProvider) error {
for _, peer := range peers {
libp2pAddrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return fmt.Errorf("converting peer to libp2p addr info: %w", err)
}

err = provideRow(
libp2pAddrInfo.ID.String(),
peer.GetAddress().String(),
peer.GetServiceURL(),
)
if err != nil {
return err
}
}
return nil
}
}
5 changes: 0 additions & 5 deletions p2p/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"fmt"

"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/shared/codec"
Expand Down Expand Up @@ -100,7 +99,3 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {

return nil
}

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
return nil
}
Loading

0 comments on commit 76234d2

Please sign in to comment.