diff --git a/app/client/cli/peer/list.go b/app/client/cli/peer/list.go new file mode 100644 index 000000000..a4afc1da8 --- /dev/null +++ b/app/client/cli/peer/list.go @@ -0,0 +1,108 @@ +package peer + +import ( + "fmt" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/pokt-network/pocket/app/client/cli/helpers" + "github.com/pokt-network/pocket/logger" + "github.com/pokt-network/pocket/p2p" + "github.com/pokt-network/pocket/p2p/debug" + "github.com/pokt-network/pocket/shared/messaging" +) + +var ( + listCmd = &cobra.Command{ + Use: "list", + Short: "Print addresses and service URLs of known peers", + RunE: listRunE, + } + + ErrRouterType = fmt.Errorf("must specify one of --staked, --unstaked, or --all") +) + +func init() { + PeerCmd.AddCommand(listCmd) +} + +func listRunE(cmd *cobra.Command, _ []string) error { + var routerType debug.RouterType + + bus, err := helpers.GetBusFromCmd(cmd) + if err != nil { + return err + } + + switch { + case stakedFlag: + if unstakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.StakedRouterType + case unstakedFlag: + if stakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.UnstakedRouterType + case allFlag: + if stakedFlag || unstakedFlag { + return ErrRouterType + } + routerType = debug.AllRouterTypes + default: + return ErrRouterType + } + + debugMsg := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST, + Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST, + Message: &anypb.Any{ + Value: []byte(routerType), + }, + } + debugMsgAny, err := anypb.New(debugMsg) + if err != nil { + return fmt.Errorf("creating anypb from debug message: %w", err) + } + + if localFlag { + if err := p2p.PrintPeerList(bus, routerType); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil + } + + // TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before + // p2p broadcast can be used with to reach unstaked actors. + // CONSIDERATION: add the peer commands to the interactive CLI as the P2P module + // instance could persist between commands. Other interactive CLI commands which + // rely on unstaked actor router broadcast are working as expected. + + // TECHDEBT(#810, #811): use broadcast instead to reach all peers. + return sendToStakedPeers(cmd, debugMsgAny) +} + +func sendToStakedPeers(cmd *cobra.Command, debugMsgAny *anypb.Any) error { + bus, err := helpers.GetBusFromCmd(cmd) + if err != nil { + return err + } + + pstore, err := helpers.FetchPeerstore(cmd) + if err != nil { + logger.Global.Fatal().Err(err).Msg("Unable to retrieve the pstore") + } + + if pstore.Size() == 0 { + logger.Global.Fatal().Msg("No validators found") + } + + for _, peer := range pstore.GetPeerList() { + if err := bus.GetP2PModule().Send(peer.GetAddress(), debugMsgAny); err != nil { + logger.Global.Error().Err(err).Msg("Failed to send debug message") + } + } + return nil +} diff --git a/app/client/cli/peer/peer.go b/app/client/cli/peer/peer.go index 1d9ddb2c8..f7164c693 100644 --- a/app/client/cli/peer/peer.go +++ b/app/client/cli/peer/peer.go @@ -20,8 +20,8 @@ var ( ) func init() { - PeerCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", false, "operations apply to both staked & unstaked router peerstores") + PeerCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", true, "operations apply to both staked & unstaked router peerstores (default)") PeerCmd.PersistentFlags().BoolVarP(&stakedFlag, "staked", "s", false, "operations only apply to staked router peerstore (i.e. raintree)") PeerCmd.PersistentFlags().BoolVarP(&unstakedFlag, "unstaked", "u", false, "operations only apply to unstaked router peerstore (i.e. gossipsub)") - PeerCmd.PersistentFlags().BoolVarP(&localFlag, "local", "l", false, "operations apply to the local (CLI binary's) P2P module rather than being sent to the --remote_cli_url") + PeerCmd.PersistentFlags().BoolVarP(&localFlag, "local", "l", false, "operations apply to the local (CLI binary's) P2P module instead of being broadcast") } diff --git a/p2p/debug.go b/p2p/debug.go new file mode 100644 index 000000000..2f8700105 --- /dev/null +++ b/p2p/debug.go @@ -0,0 +1,128 @@ +package p2p + +import ( + "fmt" + "os" + + "github.com/pokt-network/pocket/p2p/debug" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/shared/messaging" + "github.com/pokt-network/pocket/shared/modules" +) + +func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { + switch msg.Action { + case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: + if !m.cfg.EnablePeerDiscoveryDebugRpc { + return typesP2P.ErrPeerDiscoveryDebugRPCDisabled + } + default: + // This debug message isn't intended for the P2P module, ignore it. + return nil + } + + switch msg.Action { + case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: + routerType := debug.RouterType(msg.Message.Value) + return PrintPeerList(m.GetBus(), routerType) + default: + return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action) + } +} + +func PrintPeerList(bus modules.Bus, routerType debug.RouterType) error { + var ( + peers typesP2P.PeerList + pstorePlurality = "" + ) + + currentHeightProvider := bus.GetCurrentHeightProvider() + + // TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider + // is retrievable as a proper submodule. + pstoreProviderModule, err := bus.GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return fmt.Errorf("getting peerstore provider: %w", err) + } + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) + } + //-- + + switch routerType { + case debug.StakedRouterType: + // TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()` + // interface method. + currentHeight := currentHeightProvider.CurrentHeight() + pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight) + if err != nil { + return fmt.Errorf("getting unstaked peerstore: %v", err) + } + + peers = pstore.GetPeerList() + case debug.UnstakedRouterType: + pstore, err := pstoreProvider.GetUnstakedPeerstore() + if err != nil { + return fmt.Errorf("getting unstaked peerstore: %v", err) + } + + peers = pstore.GetPeerList() + case debug.AllRouterTypes: + pstorePlurality = "s" + + // TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()` + currentHeight := currentHeightProvider.CurrentHeight() + stakedPStore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight) + if err != nil { + return fmt.Errorf("getting unstaked peerstore: %v", err) + } + unstakedPStore, err := pstoreProvider.GetUnstakedPeerstore() + if err != nil { + return fmt.Errorf("getting unstaked peerstore: %v", err) + } + + unstakedPeers := unstakedPStore.GetPeerList() + stakedPeers := stakedPStore.GetPeerList() + additionalPeers, _ := unstakedPeers.Delta(stakedPeers) + + // NB: there should never be any "additional" peers. This would represent + // a staked actor who is not participating in background gossip for some + // reason. It's possible that a staked actor node which has restarted + // recently and hasn't yet completed background router bootstrapping may + // result in peers experiencing this state. + if len(additionalPeers) == 0 { + return debug.PrintPeerListTable(unstakedPeers) + } + + allPeers := append(typesP2P.PeerList{}, unstakedPeers...) + allPeers = append(allPeers, additionalPeers...) + peers = allPeers + default: + return fmt.Errorf("unsupported router type: %s", routerType) + } + + if err := debug.LogSelfAddress(bus); err != nil { + return fmt.Errorf("printing self address: %w", err) + } + + // NB: Intentionally printing with `fmt` instead of the logger to match + // `utils.PrintPeerListTable` which does not use the logger due to + // incompatibilities with the tabwriter. + // (This doesn't seem to work as expected; i.e. not printing at all in tilt.) + if _, err := fmt.Fprintf( + os.Stdout, + "%s router peerstore%s:\n", + routerType, + pstorePlurality, + ); err != nil { + return fmt.Errorf("printing to stdout: %w", err) + } + + if err := debug.PrintPeerListTable(peers); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil +} diff --git a/p2p/debug/peers.go b/p2p/debug/peers.go new file mode 100644 index 000000000..56afd9e89 --- /dev/null +++ b/p2p/debug/peers.go @@ -0,0 +1,62 @@ +package debug + +import ( + "fmt" + "os" + + "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" +) + +type RouterType string + +const ( + StakedRouterType RouterType = "staked" + UnstakedRouterType RouterType = "unstaked" + AllRouterTypes RouterType = "all" +) + +var peerListTableHeader = []string{"Peer ID", "Pokt Address", "ServiceURL"} + +func LogSelfAddress(bus modules.Bus) error { + p2pModule := bus.GetP2PModule() + if p2pModule == nil { + return fmt.Errorf("no p2p module found on the bus") + } + + selfAddr, err := p2pModule.GetAddress() + if err != nil { + return fmt.Errorf("getting self address: %w", err) + } + + _, err = fmt.Fprintf(os.Stdout, "self address: %s", selfAddr.String()) + return err +} + +// PrintPeerListTable prints a table of the passed peers to stdout. Header row is defined +// by `peerListTableHeader`. Row printing behavior is defined by `peerListRowConsumerFactory`. +func PrintPeerListTable(peers types.PeerList) error { + return utils.PrintTable(peerListTableHeader, peerListRowConsumerFactory(peers)) +} + +func peerListRowConsumerFactory(peers types.PeerList) utils.RowConsumer { + return func(provideRow utils.RowProvider) error { + for _, peer := range peers { + libp2pAddrInfo, err := utils.Libp2pAddrInfoFromPeer(peer) + if err != nil { + return fmt.Errorf("converting peer to libp2p addr info: %w", err) + } + + err = provideRow( + libp2pAddrInfo.ID.String(), + peer.GetAddress().String(), + peer.GetServiceURL(), + ) + if err != nil { + return err + } + } + return nil + } +} diff --git a/p2p/event_handler.go b/p2p/event_handler.go index 60e8afca4..2c612ab27 100644 --- a/p2p/event_handler.go +++ b/p2p/event_handler.go @@ -2,7 +2,6 @@ package p2p import ( "fmt" - "google.golang.org/protobuf/types/known/anypb" "github.com/pokt-network/pocket/shared/codec" @@ -100,7 +99,3 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error { return nil } - -func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { - return nil -} diff --git a/p2p/utils/logging.go b/p2p/utils/logging.go index ac999c62a..ea9a787c6 100644 --- a/p2p/utils/logging.go +++ b/p2p/utils/logging.go @@ -1,14 +1,27 @@ package utils import ( + "fmt" "net" + "os" + "text/tabwriter" "github.com/libp2p/go-libp2p/core/network" + "github.com/rs/zerolog" + "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/modules" - "github.com/rs/zerolog" ) +// RowProvider is a function which receives a variadic number of "column" values. +// It is intended to be passed to a `RowConsumer` so that the consumer can operate +// on the column values, row-by-row, without having to know how to produce them. +type RowProvider func(columns ...string) error + +// RowConsumer is any function which receives a `RowProvider` in order to consume +// its column values, row-by-row. +type RowConsumer func(RowProvider) error + type scopeCallback func(scope network.ResourceScope) error // LogScopeStatFactory returns a function which prints the given scope stat fields @@ -41,6 +54,63 @@ func LogIncomingMsg(logger *modules.Logger, hostname string, peer types.Peer) { logMessage(logger, msg, hostname, peer) } +// Print table prints a table to stdout. Header row is defined by `header`. Row printing +// behavior is defined by `consumeRows`. Header length SHOULD match row length. +func PrintTable(header []string, consumeRows RowConsumer) error { + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 0, 1, ' ', 0) + + // Print header + for _, col := range header { + if _, err := fmt.Fprintf(w, "| %s\t", col); err != nil { + return err + } + } + if _, err := fmt.Fprintln(w, "|"); err != nil { + return err + } + + // Print separator + for _, col := range header { + if _, err := fmt.Fprintf(w, "| "); err != nil { + return err + } + for range col { + if _, err := fmt.Fprintf(w, "-"); err != nil { + return err + } + } + if _, err := fmt.Fprintf(w, "\t"); err != nil { + return err + } + } + if _, err := fmt.Fprintln(w, "|"); err != nil { + return err + } + + // Print rows -- `consumeRows` will call this function once for each row. + if err := consumeRows(func(row ...string) error { + for _, col := range row { + if _, err := fmt.Fprintf(w, "| %s\t", col); err != nil { + return err + } + } + if _, err := fmt.Fprintln(w, "|"); err != nil { + return err + } + return nil + }); err != nil { + return err + } + + // Flush the buffer and print the table + if err := w.Flush(); err != nil { + return err + } + + return nil +} + func logMessage(logger *modules.Logger, msg, hostname string, peer types.Peer) { remoteHostname, _, err := net.SplitHostPort(peer.GetServiceURL()) if err != nil { diff --git a/shared/messaging/proto/debug_message.proto b/shared/messaging/proto/debug_message.proto index 7ce079afa..55a87c695 100644 --- a/shared/messaging/proto/debug_message.proto +++ b/shared/messaging/proto/debug_message.proto @@ -22,6 +22,7 @@ enum DebugMessageAction { DEBUG_PERSISTENCE_CLEAR_STATE = 8; DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9; + DEBUG_P2P_PEER_LIST = 10; } message DebugMessage {