Skip to content

Commit

Permalink
debug: peer discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jul 12, 2023
1 parent 9d678e6 commit c0f6c1a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
43 changes: 40 additions & 3 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,42 @@ func (rtr *backgroundRouter) AddPeer(peer typesP2P.Peer) error {
// Noop if peer with the pokt address already exists in the peerstore.
// TECHDEBT: add method(s) to update peers.
if p := rtr.pstore.GetPeer(peer.GetAddress()); p != nil {
rtr.logger.Warn().
Str("pokt_address", peer.GetAddress().String()).
Msg("peer already in peerstore")
return nil
}

// addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
//if err != nil {
// return fmt.Errorf("converting peer to libp2p addr info: %w", err)
//}
//
//// TECHDEBT: revisit `isReplacable` parameter below.
//// see: https://github.com/libp2p/go-libp2p-kbucket/blob/v0.6.3/table.go#L192
//if _, err := rtr.kadDHT.RoutingTable().TryAddPeer(
// addrInfo.ID,
// true,
// false,
//); err != nil {
// return fmt.Errorf("adding peer to kademlia routing table: %w", err)
//}

if err := utils.AddPeerToLibp2pHost(rtr.host, peer); err != nil {
return err
}

//// TECHDEBT(#595): add ctx to interface methods and propagate down.
// peerInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
//if err != nil {
// return err
//}
//
//if err := rtr.host.Connect(context.TODO(), peerInfo); err != nil {
// // TDOO_THIS_COMMIT: what to do?
// return err
//}

return rtr.pstore.AddPeer(peer)
}

Expand Down Expand Up @@ -288,9 +317,9 @@ func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) {
func (rtr *backgroundRouter) setupPeerDiscovery(ctx context.Context) (err error) {
dhtMode := dht.ModeAutoServer
// NB: don't act as a bootstrap node in peer discovery in client debug mode
if isClientDebugMode(rtr.GetBus()) {
dhtMode = dht.ModeClient
}
// if isClientDebugMode(rtr.GetBus()) {
// dhtMode = dht.ModeClient
//}

rtr.kadDHT, err = dht.New(ctx, rtr.host, dht.Mode(dhtMode))
return err
Expand Down Expand Up @@ -337,6 +366,10 @@ func (rtr *backgroundRouter) setupSubscription() (err error) {

// TECHDEBT(#859): integrate with `p2pModule#bootstrap()`.
func (rtr *backgroundRouter) bootstrap(ctx context.Context) error {
rtr.logger.Warn().Fields(map[string]any{
"bootstrap_peers_count": len(rtr.pstore.GetPeerList()),
}).Msg("bootstrapping...")

// CONSIDERATION: add `GetPeers` method, which returns a map,
// to the `PeerstoreProvider` interface to simplify this loop.
for _, peer := range rtr.pstore.GetPeerList() {
Expand Down Expand Up @@ -408,6 +441,7 @@ func (rtr *backgroundRouter) topicValidator(_ context.Context, _ libp2pPeer.ID,
// readSubscription is a while loop for receiving and handling messages from the
// subscription. It is intended to be called as a goroutine.
func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
rtr.logger.Warn().Msg("reading subscription...")
for {
if err := ctx.Err(); err != nil {
if err != context.Canceled {
Expand All @@ -417,6 +451,7 @@ func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
return
}
msg, err := rtr.subscription.Next(ctx)
rtr.logger.Warn().Msg("next read from subscription")

if err != nil {
rtr.logger.Error().Err(err).
Expand All @@ -433,6 +468,7 @@ func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
}

func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error {
rtr.logger.Warn().Msg("handling background message")
var backgroundMsg typesP2P.BackgroundMessage
if err := proto.Unmarshal(backgroundMsgBz, &backgroundMsg); err != nil {
return err
Expand All @@ -441,6 +477,7 @@ func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error {
// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if backgroundMsg.Data == nil {
rtr.logger.Warn().Msg("background message has no data")
return nil
}

Expand Down
12 changes: 8 additions & 4 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,23 +358,27 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) {
t.Log("bootstrapping...")
bootstrapHost := testHosts[0]
bootstrapAddr := bootstrapHost.Addrs()[0]
// ==> /ipv4/10.0.0.4/tcp/42069
for _, h := range testHosts {
// Don't connect `bootsrapHost` to itself.
if h.ID() == bootstrapHost.ID() {
continue
}

p2pAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", bootstrapHost.ID()))
require.NoError(t, err)
// p2pAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", bootstrapHost.ID()))
//require.NoError(t, err)

addrInfo := libp2pPeer.AddrInfo{
ID: bootstrapHost.ID(),
Addrs: []multiaddr.Multiaddr{
bootstrapAddr.Encapsulate(p2pAddr),
bootstrapAddr,
// bootstrapAddr.Encapsulate(p2pAddr),
// ==> /ipv4/10.0.0.4/tcp/42069/p2p/a2x3b4c5d6e7f8g9h0i...
},
}

t.Logf("connecting to %s...", addrInfo.ID.String())
err = h.Connect(ctx, addrInfo)
err := h.Connect(ctx, addrInfo)
require.NoError(t, err)
}
}
Expand Down
2 changes: 2 additions & 0 deletions p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (m *p2pModule) configureBootstrapNodes() error {
// bootstrap attempts to bootstrap from a bootstrap node
// TECHDEBT(#859): refactor bootstrapping.
func (m *p2pModule) bootstrap() error {
m.logger.Warn().Msg("bootstrapping...")

var pstore typesP2P.Peerstore

for _, bootstrapNode := range m.bootstrapNodes {
Expand Down

0 comments on commit c0f6c1a

Please sign in to comment.