From 74ffa67a668746a6ffdeb1b850fb5e19b12527f7 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 22 Sep 2023 13:27:29 +0200 Subject: [PATCH] Add broadcast state machine for storing records in the DHT (#930) Co-authored-by: Ian Davis <18375+iand@users.noreply.github.com> --- v2/dht.go | 9 - v2/dht_test.go | 4 +- v2/go.mod | 4 +- v2/handlers.go | 4 +- v2/internal/coord/behaviour.go | 4 - v2/internal/coord/brdcst.go | 230 +++++++++++++ v2/internal/coord/brdcst/brdcst.go | 143 ++++++++ v2/internal/coord/brdcst/brdcst_test.go | 35 ++ v2/internal/coord/brdcst/config.go | 74 +++++ v2/internal/coord/brdcst/config_test.go | 44 +++ v2/internal/coord/brdcst/doc.go | 5 + v2/internal/coord/brdcst/followup.go | 252 ++++++++++++++ v2/internal/coord/brdcst/pool.go | 347 ++++++++++++++++++++ v2/internal/coord/brdcst/pool_test.go | 301 +++++++++++++++++ v2/internal/coord/brdcst_events.go | 34 ++ v2/internal/coord/coordinator.go | 131 ++++++-- v2/internal/coord/coordinator_test.go | 8 +- v2/internal/coord/{ => coordt}/coretypes.go | 11 +- v2/internal/coord/event.go | 31 +- v2/internal/coord/network.go | 37 ++- v2/internal/coord/query.go | 35 +- v2/internal/coord/query/pool.go | 67 ++-- v2/internal/coord/query/pool_test.go | 67 ++-- v2/internal/coord/query/query.go | 25 +- v2/internal/coord/query/query_test.go | 45 +-- v2/internal/coord/routing.go | 16 +- v2/internal/coord/routing/bootstrap.go | 5 +- v2/internal/coord/routing/bootstrap_test.go | 8 +- v2/internal/coord/routing/include_test.go | 3 +- v2/internal/coord/routing_test.go | 13 +- v2/kadt/kadt.go | 57 +++- v2/pb/msg.aux.go | 29 +- v2/pb/msg.aux_test.go | 23 ++ v2/query_test.go | 6 +- v2/router.go | 23 +- v2/routing.go | 77 ++++- v2/routing_test.go | 36 +- 37 files changed, 1992 insertions(+), 251 deletions(-) create mode 100644 v2/internal/coord/brdcst.go create mode 100644 v2/internal/coord/brdcst/brdcst.go create mode 100644 v2/internal/coord/brdcst/brdcst_test.go create mode 100644 v2/internal/coord/brdcst/config.go create mode 100644 v2/internal/coord/brdcst/config_test.go create mode 100644 v2/internal/coord/brdcst/doc.go create mode 100644 v2/internal/coord/brdcst/followup.go create mode 100644 v2/internal/coord/brdcst/pool.go create mode 100644 v2/internal/coord/brdcst/pool_test.go create mode 100644 v2/internal/coord/brdcst_events.go rename v2/internal/coord/{ => coordt}/coretypes.go (92%) diff --git a/v2/dht.go b/v2/dht.go index 0afeb408..1dbcfecc 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -2,7 +2,6 @@ package dht import ( "context" - "crypto/sha256" "fmt" "io" "sync" @@ -13,7 +12,6 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/plprobelab/go-kademlia/key" "golang.org/x/exp/slog" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" @@ -339,13 +337,6 @@ func (d *DHT) AddAddresses(ctx context.Context, ais []peer.AddrInfo, ttl time.Du return d.kad.AddNodes(ctx, ids) } -// newSHA256Key returns a [kadt.KadKey] that conforms to the [kad.Key] interface by -// SHA256 hashing the given bytes and wrapping them in a [kadt.KadKey]. -func newSHA256Key(data []byte) kadt.Key { - h := sha256.Sum256(data) - return key.NewKey256(h[:]) -} - // typedBackend returns the backend at the given namespace. It is casted to the // provided type. If the namespace doesn't exist or the type cast failed, this // function returns an error. Can't be a method on [DHT] because of the generic diff --git a/v2/dht_test.go b/v2/dht_test.go index 44d68bc4..0ee635df 100644 --- a/v2/dht_test.go +++ b/v2/dht_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" ) @@ -82,7 +82,7 @@ func TestAddAddresses(t *testing.T) { // local routing table should not contain the node _, err := local.kad.GetNode(ctx, kadt.PeerID(remote.host.ID())) - require.ErrorIs(t, err, coord.ErrNodeNotFound) + require.ErrorIs(t, err, coordt.ErrNodeNotFound) remoteAddrInfo := peer.AddrInfo{ ID: remote.host.ID(), diff --git a/v2/go.mod b/v2/go.mod index cd8a0748..5eb07d61 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -15,6 +15,8 @@ require ( github.com/libp2p/go-msgio v0.3.0 github.com/multiformats/go-base32 v0.1.0 github.com/multiformats/go-multiaddr v0.11.0 + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/plprobelab/go-kademlia v0.0.0-20230913171354-443ec1f56080 github.com/prometheus/client_golang v1.16.0 // indirect github.com/stretchr/testify v1.8.4 @@ -84,14 +86,12 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect - github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect diff --git a/v2/handlers.go b/v2/handlers.go index 5b8536f3..74e55a2b 100644 --- a/v2/handlers.go +++ b/v2/handlers.go @@ -110,7 +110,7 @@ func (d *DHT) handleGetValue(ctx context.Context, remote peer.ID, req *pb.Messag resp := &pb.Message{ Type: pb.Message_GET_VALUE, Key: req.GetKey(), - CloserPeers: d.closerPeers(ctx, remote, newSHA256Key(req.GetKey())), + CloserPeers: d.closerPeers(ctx, remote, kadt.NewKey(req.GetKey())), } ns, path, err := record.SplitKey(k) // get namespace (prefix of the key) @@ -226,7 +226,7 @@ func (d *DHT) handleGetProviders(ctx context.Context, remote peer.ID, req *pb.Me resp := &pb.Message{ Type: pb.Message_GET_PROVIDERS, Key: k, - CloserPeers: d.closerPeers(ctx, remote, newSHA256Key(k)), + CloserPeers: d.closerPeers(ctx, remote, kadt.NewKey(k)), ProviderPeers: pbProviders, } diff --git a/v2/internal/coord/behaviour.go b/v2/internal/coord/behaviour.go index aa69917f..74609944 100644 --- a/v2/internal/coord/behaviour.go +++ b/v2/internal/coord/behaviour.go @@ -37,10 +37,6 @@ type Behaviour[I BehaviourEvent, O BehaviourEvent] interface { Perform(ctx context.Context) (O, bool) } -type SM[E any, S any] interface { - Advance(context.Context, E) S -} - type WorkQueueFunc[E BehaviourEvent] func(context.Context, E) bool // WorkQueue is buffered queue of work to be performed. diff --git a/v2/internal/coord/brdcst.go b/v2/internal/coord/brdcst.go new file mode 100644 index 00000000..0c4dadfc --- /dev/null +++ b/v2/internal/coord/brdcst.go @@ -0,0 +1,230 @@ +package coord + +import ( + "context" + "sync" + + "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/slog" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/brdcst" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/pb" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" +) + +type PooledBroadcastBehaviour struct { + pool coordt.StateMachine[brdcst.PoolEvent, brdcst.PoolState] + waiters map[coordt.QueryID]NotifyCloser[BehaviourEvent] + + pendingMu sync.Mutex + pending []BehaviourEvent + ready chan struct{} + + logger *slog.Logger + tracer trace.Tracer +} + +var _ Behaviour[BehaviourEvent, BehaviourEvent] = (*PooledBroadcastBehaviour)(nil) + +func NewPooledBroadcastBehaviour(brdcstPool *brdcst.Pool[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *PooledBroadcastBehaviour { + b := &PooledBroadcastBehaviour{ + pool: brdcstPool, + waiters: make(map[coordt.QueryID]NotifyCloser[BehaviourEvent]), + ready: make(chan struct{}, 1), + logger: logger.With("behaviour", "pooledBroadcast"), + tracer: tracer, + } + return b +} + +func (b *PooledBroadcastBehaviour) Ready() <-chan struct{} { + return b.ready +} + +func (b *PooledBroadcastBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { + ctx, span := b.tracer.Start(ctx, "PooledBroadcastBehaviour.Notify") + defer span.End() + + b.pendingMu.Lock() + defer b.pendingMu.Unlock() + + var cmd brdcst.PoolEvent + switch ev := ev.(type) { + case *EventStartBroadcast: + cmd = &brdcst.EventPoolStartBroadcast[kadt.Key, kadt.PeerID, *pb.Message]{ + QueryID: ev.QueryID, + Target: ev.Target, + Message: ev.Message, + Seed: ev.Seed, + Config: ev.Config, + } + if ev.Notify != nil { + b.waiters[ev.QueryID] = ev.Notify + } + + case *EventGetCloserNodesSuccess: + for _, info := range ev.CloserNodes { + b.pending = append(b.pending, &EventAddNode{ + NodeID: info, + }) + } + + waiter, ok := b.waiters[ev.QueryID] + if ok { + waiter.Notify(ctx, &EventQueryProgressed{ + NodeID: ev.To, + QueryID: ev.QueryID, + }) + } + + cmd = &brdcst.EventPoolGetCloserNodesSuccess[kadt.Key, kadt.PeerID]{ + NodeID: ev.To, + QueryID: ev.QueryID, + Target: ev.Target, + CloserNodes: ev.CloserNodes, + } + + case *EventGetCloserNodesFailure: + // queue an event that will notify the routing behaviour of a failed node + b.pending = append(b.pending, &EventNotifyNonConnectivity{ + ev.To, + }) + + cmd = &brdcst.EventPoolGetCloserNodesFailure[kadt.Key, kadt.PeerID]{ + NodeID: ev.To, + QueryID: ev.QueryID, + Target: ev.Target, + Error: ev.Err, + } + + case *EventSendMessageSuccess: + for _, info := range ev.CloserNodes { + b.pending = append(b.pending, &EventAddNode{ + NodeID: info, + }) + } + waiter, ok := b.waiters[ev.QueryID] + if ok { + waiter.Notify(ctx, &EventQueryProgressed{ + NodeID: ev.To, + QueryID: ev.QueryID, + Response: ev.Response, + }) + } + // TODO: How do we know it's a StoreRecord response? + cmd = &brdcst.EventPoolStoreRecordSuccess[kadt.Key, kadt.PeerID, *pb.Message]{ + QueryID: ev.QueryID, + NodeID: ev.To, + Request: ev.Request, + Response: ev.Response, + } + + case *EventSendMessageFailure: + // queue an event that will notify the routing behaviour of a failed node + b.pending = append(b.pending, &EventNotifyNonConnectivity{ + ev.To, + }) + + // TODO: How do we know it's a StoreRecord response? + cmd = &brdcst.EventPoolStoreRecordFailure[kadt.Key, kadt.PeerID, *pb.Message]{ + NodeID: ev.To, + QueryID: ev.QueryID, + Request: ev.Request, + Error: ev.Err, + } + + case *EventStopQuery: + cmd = &brdcst.EventPoolStopBroadcast{ + QueryID: ev.QueryID, + } + } + + // attempt to advance the broadcast pool + ev, ok := b.advancePool(ctx, cmd) + if ok { + b.pending = append(b.pending, ev) + } + if len(b.pending) > 0 { + select { + case b.ready <- struct{}{}: + default: + } + } +} + +func (b *PooledBroadcastBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { + ctx, span := b.tracer.Start(ctx, "PooledBroadcastBehaviour.Perform") + defer span.End() + + // No inbound work can be done until Perform is complete + b.pendingMu.Lock() + defer b.pendingMu.Unlock() + + for { + // drain queued events first. + if len(b.pending) > 0 { + var ev BehaviourEvent + ev, b.pending = b.pending[0], b.pending[1:] + + if len(b.pending) > 0 { + select { + case b.ready <- struct{}{}: + default: + } + } + return ev, true + } + + ev, ok := b.advancePool(ctx, &brdcst.EventPoolPoll{}) + if ok { + return ev, true + } + + // finally check if any pending events were accumulated in the meantime + if len(b.pending) == 0 { + return nil, false + } + } +} + +func (b *PooledBroadcastBehaviour) advancePool(ctx context.Context, ev brdcst.PoolEvent) (out BehaviourEvent, term bool) { + ctx, span := b.tracer.Start(ctx, "PooledBroadcastBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + pstate := b.pool.Advance(ctx, ev) + switch st := pstate.(type) { + case *brdcst.StatePoolIdle: + // nothing to do + case *brdcst.StatePoolFindCloser[kadt.Key, kadt.PeerID]: + return &EventOutboundGetCloserNodes{ + QueryID: st.QueryID, + To: st.NodeID, + Target: st.Target, + Notify: b, + }, true + case *brdcst.StatePoolStoreRecord[kadt.Key, kadt.PeerID, *pb.Message]: + return &EventOutboundSendMessage{ + QueryID: st.QueryID, + To: st.NodeID, + Message: st.Message, + Notify: b, + }, true + case *brdcst.StatePoolBroadcastFinished[kadt.Key, kadt.PeerID]: + waiter, ok := b.waiters[st.QueryID] + if ok { + waiter.Notify(ctx, &EventBroadcastFinished{ + QueryID: st.QueryID, + Contacted: st.Contacted, + Errors: st.Errors, + }) + waiter.Close() + } + } + + return nil, false +} diff --git a/v2/internal/coord/brdcst/brdcst.go b/v2/internal/coord/brdcst/brdcst.go new file mode 100644 index 00000000..5d16b973 --- /dev/null +++ b/v2/internal/coord/brdcst/brdcst.go @@ -0,0 +1,143 @@ +package brdcst + +import ( + "github.com/plprobelab/go-kademlia/kad" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" +) + +// BroadcastState must be implemented by all states that a [Broadcast] state +// machine can reach. There are multiple different broadcast state machines that +// all have in common to "emit" a [BroadcastState] and accept a +// [BroadcastEvent]. Recall, states are basically the "events" that a state +// machine emits which other state machines or behaviours could react upon. +type BroadcastState interface { + broadcastState() +} + +// StateBroadcastFindCloser indicates to the broadcast [Pool] or any other upper +// layer that a [Broadcast] state machine wants to query the given node (NodeID) +// for closer nodes to the target key (Target). +type StateBroadcastFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message + NodeID N // the node to send the message to + Target K // the key that the query wants to find closer nodes for +} + +// StateBroadcastStoreRecord indicates to the broadcast [Pool] or any other +// upper layer that a [Broadcast] state machine wants to store a record using +// the given Message with the given NodeID. +type StateBroadcastStoreRecord[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message + NodeID N // the node to send the message to + Message M // the message the broadcast behaviour wants to send +} + +// StateBroadcastWaiting indicates that a [Broadcast] state machine is waiting +// for network I/O to finish. It means the state machine isn't idle, but that +// there are operations in-flight that it is waiting on to finish. +type StateBroadcastWaiting struct { + QueryID coordt.QueryID // the id of the broadcast operation that is waiting +} + +// StateBroadcastFinished indicates that a [Broadcast] state machine has +// finished its operation. During that operation, all nodes in Contacted have +// been contacted to store the record. The Contacted slice does not contain +// the nodes we have queried to find the closest nodes to the target key - only +// the ones that we eventually contacted to store the record. The Errors map +// maps the string representation of any node N in the Contacted slice to a +// potential error struct that contains the original Node and error. In the best +// case, this Errors map is empty. +type StateBroadcastFinished[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the broadcast operation that has finished + Contacted []N // all nodes we contacted to store the record (successful or not) + Errors map[string]struct { // any error that occurred for any node that we contacted + Node N // a node from the Contacted slice + Err error // the error that happened when contacting that Node + } +} + +// StateBroadcastIdle means that a [Broadcast] state machine has finished all of +// its operation. This state will be emitted if the state machine is polled to +// advance its state but has already finished its operation. The last meaningful +// state will be [StateBroadcastFinished]. Being idle is different from waiting +// for network I/O to finish (see [StateBroadcastWaiting]). +type StateBroadcastIdle struct{} + +func (*StateBroadcastFindCloser[K, N]) broadcastState() {} +func (*StateBroadcastStoreRecord[K, N, M]) broadcastState() {} +func (*StateBroadcastWaiting) broadcastState() {} +func (*StateBroadcastFinished[K, N]) broadcastState() {} +func (*StateBroadcastIdle) broadcastState() {} + +// BroadcastEvent is an event intended to advance the state of a [Broadcast] +// state machine. [Broadcast] state machines only operate on events that +// implement this interface. An "Event" is the opposite of a "State." An "Event" +// flows into the state machine and a "State" flows out of it. +// +// Currently, there are the [FollowUp] and [Optimistic] state machines. +type BroadcastEvent interface { + broadcastEvent() +} + +// EventBroadcastPoll is an event that signals a [Broadcast] state machine that +// it can perform housekeeping work such as time out queries. +type EventBroadcastPoll struct{} + +// EventBroadcastStart is an event that instructs a broadcast state machine to +// start the operation. +type EventBroadcastStart[K kad.Key[K], N kad.NodeID[K]] struct { + Target K // the key we want to store the record for + Seed []N // the closest nodes we know so far and from where we start the operation +} + +// EventBroadcastStop notifies a [Broadcast] state machine to stop the +// operation. This comprises all in-flight queries. +type EventBroadcastStop struct{} + +// EventBroadcastNodeResponse notifies a [Broadcast] state machine that a remote +// node (NodeID) has successfully responded with closer nodes (CloserNodes) to +// the Target key that's stored on the [Broadcast] state machine +type EventBroadcastNodeResponse[K kad.Key[K], N kad.NodeID[K]] struct { + NodeID N // the node the message was sent to and that replied + CloserNodes []N // the closer nodes sent by the node +} + +// EventBroadcastNodeFailure notifies a [Broadcast] state machine that a remote +// node (NodeID) has failed responding with closer nodes to the target key. +type EventBroadcastNodeFailure[K kad.Key[K], N kad.NodeID[K]] struct { + NodeID N // the node the message was sent to and that has replied + Error error // the error that caused the failure, if any +} + +// EventBroadcastStoreRecordSuccess notifies a broadcast [Broadcast] state +// machine that storing a record with a remote node (NodeID) was successful. The +// message that was sent is held in Request, and the returned value is contained +// in Response. However, in the case of the Amino DHT, nodes do not respond with +// a confirmation, so Response will always be nil. Check out +// [pb.Message.ExpectResponse] for information about which requests should +// receive a response. +type EventBroadcastStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + NodeID N // the node the message was sent to + Request M // the message that was sent to the remote node + Response M // the reply we got from the remote node (nil in many cases of the Amino DHT) +} + +// EventBroadcastStoreRecordFailure notifies a broadcast [Broadcast] state +// machine that storing a record with a remote node (NodeID) has failed. The +// message that was sent is held in Request, and the error will be in Error. +type EventBroadcastStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + NodeID N // the node the message was sent to + Request M // the message that was sent to the remote node + Error error // the error that caused the failure, if any +} + +// broadcastEvent() ensures that only events accepted by a [Broadcast] state +// machine can be assigned to the [BroadcastEvent] interface. +func (*EventBroadcastStop) broadcastEvent() {} +func (*EventBroadcastPoll) broadcastEvent() {} +func (*EventBroadcastStart[K, N]) broadcastEvent() {} +func (*EventBroadcastNodeResponse[K, N]) broadcastEvent() {} +func (*EventBroadcastNodeFailure[K, N]) broadcastEvent() {} +func (*EventBroadcastStoreRecordSuccess[K, N, M]) broadcastEvent() {} +func (*EventBroadcastStoreRecordFailure[K, N, M]) broadcastEvent() {} diff --git a/v2/internal/coord/brdcst/brdcst_test.go b/v2/internal/coord/brdcst/brdcst_test.go new file mode 100644 index 00000000..baf1d4bc --- /dev/null +++ b/v2/internal/coord/brdcst/brdcst_test.go @@ -0,0 +1,35 @@ +package brdcst + +import ( + "testing" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" +) + +func TestBroadcastState_interface_conformance(t *testing.T) { + states := []BroadcastState{ + &StateBroadcastIdle{}, + &StateBroadcastWaiting{}, + &StateBroadcastStoreRecord[tiny.Key, tiny.Node, tiny.Message]{}, + &StateBroadcastFindCloser[tiny.Key, tiny.Node]{}, + &StateBroadcastFinished[tiny.Key, tiny.Node]{}, + } + for _, st := range states { + st.broadcastState() // drives test coverage + } +} + +func TestBroadcastEvent_interface_conformance(t *testing.T) { + events := []BroadcastEvent{ + &EventBroadcastStop{}, + &EventBroadcastPoll{}, + &EventBroadcastStart[tiny.Key, tiny.Node]{}, + &EventBroadcastNodeResponse[tiny.Key, tiny.Node]{}, + &EventBroadcastNodeFailure[tiny.Key, tiny.Node]{}, + &EventBroadcastStoreRecordSuccess[tiny.Key, tiny.Node, tiny.Message]{}, + &EventBroadcastStoreRecordFailure[tiny.Key, tiny.Node, tiny.Message]{}, + } + for _, ev := range events { + ev.broadcastEvent() // drives test coverage + } +} diff --git a/v2/internal/coord/brdcst/config.go b/v2/internal/coord/brdcst/config.go new file mode 100644 index 00000000..4d6d425b --- /dev/null +++ b/v2/internal/coord/brdcst/config.go @@ -0,0 +1,74 @@ +package brdcst + +import ( + "fmt" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" +) + +// ConfigPool specifies the configuration for a broadcast [Pool]. +type ConfigPool struct { + pCfg *query.PoolConfig +} + +// Validate checks the configuration options and returns an error if any have +// invalid values. +func (cfg *ConfigPool) Validate() error { + if cfg.pCfg == nil { + return fmt.Errorf("query pool config must not be nil") + } + + return nil +} + +// DefaultConfigPool returns the default configuration options for a Pool. +// Options may be overridden before passing to NewPool +func DefaultConfigPool() *ConfigPool { + return &ConfigPool{ + pCfg: query.DefaultPoolConfig(), + } +} + +// Config is an interface that all broadcast configurations must implement. +// Because we have multiple ways of broadcasting records to the network, like +// [FollowUp] or [Optimistic], the [EventPoolStartBroadcast] has a configuration +// field that depending on the concrete type of [Config] initializes the +// respective state machine. Then the broadcast operation will performed based +// on the encoded rules in that state machine. +type Config interface { + broadcastConfig() +} + +func (c *ConfigFollowUp) broadcastConfig() {} +func (c *ConfigOptimistic) broadcastConfig() {} + +// ConfigFollowUp specifies the configuration for the [FollowUp] state machine. +type ConfigFollowUp struct{} + +// Validate checks the configuration options and returns an error if any have +// invalid values. +func (c *ConfigFollowUp) Validate() error { + return nil +} + +// DefaultConfigFollowUp returns the default configuration options for the +// [FollowUp] state machine. +func DefaultConfigFollowUp() *ConfigFollowUp { + return &ConfigFollowUp{} +} + +// ConfigOptimistic specifies the configuration for the [Optimistic] state +// machine. +type ConfigOptimistic struct{} + +// Validate checks the configuration options and returns an error if any have +// invalid values. +func (c *ConfigOptimistic) Validate() error { + return nil +} + +// DefaultConfigOptimistic returns the default configuration options for the +// [Optimistic] state machine. +func DefaultConfigOptimistic() *ConfigOptimistic { + return &ConfigOptimistic{} +} diff --git a/v2/internal/coord/brdcst/config_test.go b/v2/internal/coord/brdcst/config_test.go new file mode 100644 index 00000000..68447a1f --- /dev/null +++ b/v2/internal/coord/brdcst/config_test.go @@ -0,0 +1,44 @@ +package brdcst + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfigPool_Validate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultConfigPool() + assert.NoError(t, cfg.Validate()) + }) + + t.Run("nil pool config", func(t *testing.T) { + cfg := DefaultConfigPool() + cfg.pCfg = nil + assert.Error(t, cfg.Validate()) + }) +} + +func TestConfigFollowUp_Validate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultConfigFollowUp() + assert.NoError(t, cfg.Validate()) + }) +} + +func TestConfigOptimistic_Validate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultConfigOptimistic() + assert.NoError(t, cfg.Validate()) + }) +} + +func TestConfig_interface_conformance(t *testing.T) { + configs := []Config{ + &ConfigFollowUp{}, + &ConfigOptimistic{}, + } + for _, c := range configs { + c.broadcastConfig() // drives test coverage + } +} diff --git a/v2/internal/coord/brdcst/doc.go b/v2/internal/coord/brdcst/doc.go new file mode 100644 index 00000000..847849a0 --- /dev/null +++ b/v2/internal/coord/brdcst/doc.go @@ -0,0 +1,5 @@ +/* +Package brdcst contains state machines that implement algorithms for +broadcasting records into the DHT network. +*/ +package brdcst diff --git a/v2/internal/coord/brdcst/followup.go b/v2/internal/coord/brdcst/followup.go new file mode 100644 index 00000000..27d14d30 --- /dev/null +++ b/v2/internal/coord/brdcst/followup.go @@ -0,0 +1,252 @@ +package brdcst + +import ( + "context" + "fmt" + + "github.com/plprobelab/go-kademlia/kad" + "go.opentelemetry.io/otel/trace" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" +) + +// FollowUp is a [Broadcast] state machine and encapsulates the logic around +// doing a "classic" put operation. This mimics the algorithm employed in the +// original go-libp2p-kad-dht v1 code base. It first queries the closest nodes +// to a certain target key, and after they were discovered, it "follows up" with +// storing the record with these closest nodes. +type FollowUp[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + // the unique ID for this broadcast operation + queryID coordt.QueryID + + // a struct holding configuration options + cfg *ConfigFollowUp + + // a reference to the query pool in which the "get closer nodes" queries + // will be spawned. This pool is governed by the broadcast [Pool]. + // Unfortunately, having a reference here breaks the hierarchy but it makes + // the logic much easier to implement. + pool *query.Pool[K, N, M] + + // the message that we will send to the closest nodes in the follow-up phase + msg M + + // the closest nodes to the target key. This will be filled after the query + // for the closest nodes has finished (when the query pool emits a + // [query.StatePoolQueryFinished] event). + closest []N + + // nodes we still need to store records with. This map will be filled with + // all the closest nodes after the query has finished. + todo map[string]N + + // nodes we have contacted to store the record but haven't heard a response yet + waiting map[string]N + + // nodes that successfully hold the record for us + success map[string]N + + // nodes that failed to hold the record for us + failed map[string]struct { + Node N + Err error + } +} + +// NewFollowUp initializes a new [FollowUp] struct. +func NewFollowUp[K kad.Key[K], N kad.NodeID[K], M coordt.Message](qid coordt.QueryID, pool *query.Pool[K, N, M], msg M, cfg *ConfigFollowUp) *FollowUp[K, N, M] { + return &FollowUp[K, N, M]{ + queryID: qid, + cfg: cfg, + pool: pool, + msg: msg, + todo: map[string]N{}, + waiting: map[string]N{}, + success: map[string]N{}, + failed: map[string]struct { + Node N + Err error + }{}, + } +} + +// Advance advances the state of the [FollowUp] [Broadcast] state machine. It +// first handles the event by mapping it to a potential event for the query +// pool. If the [BroadcastEvent] maps to a [query.PoolEvent], it gets forwarded +// to the query pool and handled in [FollowUp.advancePool]. If it doesn't map to +// a query pool event, we check if there are any nodes we should contact to hold +// the record for us and emit that instruction instead. Similarly, if we're +// waiting on responses or are completely finished, we return that as well. +func (f *FollowUp[K, N, M]) Advance(ctx context.Context, ev BroadcastEvent) (out BroadcastState) { + ctx, span := tele.StartSpan(ctx, "FollowUp.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + pev := f.handleEvent(ctx, ev) + if pev != nil { + if state, terminal := f.advancePool(ctx, pev); terminal { + return state + } + } + + _, isStopEvent := ev.(*EventBroadcastStop) + if isStopEvent { + for _, n := range f.todo { + delete(f.todo, n.String()) + f.failed[n.String()] = struct { + Node N + Err error + }{Node: n, Err: fmt.Errorf("cancelled")} + } + + for _, n := range f.waiting { + delete(f.waiting, n.String()) + f.failed[n.String()] = struct { + Node N + Err error + }{Node: n, Err: fmt.Errorf("cancelled")} + } + } + + for k, n := range f.todo { + delete(f.todo, k) + f.waiting[k] = n + return &StateBroadcastStoreRecord[K, N, M]{ + QueryID: f.queryID, + NodeID: n, + Message: f.msg, + } + } + + if len(f.waiting) > 0 { + return &StateBroadcastWaiting{} + } + + if isStopEvent || (len(f.todo) == 0 && len(f.closest) != 0) { + return &StateBroadcastFinished[K, N]{ + QueryID: f.queryID, + Contacted: f.closest, + Errors: f.failed, + } + } + + return &StateBroadcastIdle{} +} + +// handleEvent receives a [BroadcastEvent] and returns the corresponding query +// pool event ([query.PoolEvent]). Some [BroadcastEvent] events don't map to +// a query pool event, in which case this method handles that event and returns +// nil. +func (f *FollowUp[K, N, M]) handleEvent(ctx context.Context, ev BroadcastEvent) (out query.PoolEvent) { + _, span := tele.StartSpan(ctx, "FollowUp.handleEvent", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + switch ev := ev.(type) { + case *EventBroadcastStart[K, N]: + return &query.EventPoolAddFindCloserQuery[K, N]{ + QueryID: f.queryID, + Target: ev.Target, + Seed: ev.Seed, + } + case *EventBroadcastStop: + if f.isQueryDone() { + return nil + } + + return &query.EventPoolStopQuery{ + QueryID: f.queryID, + } + case *EventBroadcastNodeResponse[K, N]: + return &query.EventPoolNodeResponse[K, N]{ + QueryID: f.queryID, + NodeID: ev.NodeID, + CloserNodes: ev.CloserNodes, + } + case *EventBroadcastNodeFailure[K, N]: + return &query.EventPoolNodeFailure[K, N]{ + QueryID: f.queryID, + NodeID: ev.NodeID, + Error: ev.Error, + } + case *EventBroadcastStoreRecordSuccess[K, N, M]: + delete(f.waiting, ev.NodeID.String()) + f.success[ev.NodeID.String()] = ev.NodeID + case *EventBroadcastStoreRecordFailure[K, N, M]: + delete(f.waiting, ev.NodeID.String()) + f.failed[ev.NodeID.String()] = struct { + Node N + Err error + }{Node: ev.NodeID, Err: ev.Error} + case *EventBroadcastPoll: + // ignore, nothing to do + return &query.EventPoolPoll{} + default: + panic(fmt.Sprintf("unexpected event: %T", ev)) + } + + return nil +} + +// advancePool advances the query pool with the given query pool event that was +// returned by [FollowUp.handleEvent]. The additional boolean value indicates +// whether the returned [BroadcastState] should be ignored. +func (f *FollowUp[K, N, M]) advancePool(ctx context.Context, ev query.PoolEvent) (out BroadcastState, term bool) { + ctx, span := tele.StartSpan(ctx, "FollowUp.advanceQuery", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + state := f.pool.Advance(ctx, ev) + switch st := state.(type) { + case *query.StatePoolFindCloser[K, N]: + return &StateBroadcastFindCloser[K, N]{ + QueryID: st.QueryID, + NodeID: st.NodeID, + Target: st.Target, + }, true + case *query.StatePoolWaitingAtCapacity: + return &StateBroadcastWaiting{ + QueryID: f.queryID, + }, true + case *query.StatePoolWaitingWithCapacity: + return &StateBroadcastWaiting{ + QueryID: f.queryID, + }, true + case *query.StatePoolQueryFinished[K, N]: + f.closest = st.ClosestNodes + + for _, n := range st.ClosestNodes { + f.todo[n.String()] = n + } + + case *query.StatePoolQueryTimeout: + return &StateBroadcastFinished[K, N]{ + QueryID: f.queryID, + Contacted: make([]N, 0), + Errors: map[string]struct { + Node N + Err error + }{}, + }, true + case *query.StatePoolIdle: + // nothing to do + default: + panic(fmt.Sprintf("unexpected pool state: %T", st)) + } + + return nil, false +} + +// isQueryDone returns true if the DHT walk/ query phase has finished. +// This is indicated by the fact that the [FollowUp.closest] slice is filled. +func (f *FollowUp[K, N, M]) isQueryDone() bool { + return len(f.closest) != 0 +} diff --git a/v2/internal/coord/brdcst/pool.go b/v2/internal/coord/brdcst/pool.go new file mode 100644 index 00000000..bba83dad --- /dev/null +++ b/v2/internal/coord/brdcst/pool.go @@ -0,0 +1,347 @@ +package brdcst + +import ( + "context" + "fmt" + + "github.com/plprobelab/go-kademlia/kad" + "go.opentelemetry.io/otel/trace" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" +) + +// Broadcast is a type alias for a specific kind of state machine that any +// kind of broadcast strategy state machine must implement. Currently, there +// are the [FollowUp] and [Optimistic] state machines. +type Broadcast = coordt.StateMachine[BroadcastEvent, BroadcastState] + +// Pool is a [coordt.StateMachine] that manages all running broadcast +// operations. In the future it could limit the number of concurrent operations, +// but right now it is just keeping track of all running broadcasts. The +// referenced [query.Pool] is passed down to the respective broadcast state +// machines. This is not nice because it breaks the hierarchy but makes things +// way easier. +// +// Conceptually, a broadcast consists of finding the closest nodes to a certain +// key and then storing the record with them. There are a few different +// strategies that can be applied. For now, these are the [FollowUp] and the [Optimistic] +// strategies. In the future, we also want to support [Reprovide Sweep]. +// However, this requires a different type of query as we are not looking for +// the closest nodes but rather enumerating the keyspace. In any case, this +// broadcast [Pool] would keep track of all running broadcasts. +// +// [Reprovide Sweep]: https://www.notion.so/pl-strflt/DHT-Reprovide-Sweep-3108adf04e9d4086bafb727b17ae033d?pvs=4 +type Pool[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + qp *query.Pool[K, N, M] // the query pool of "get closer peers" queries + bcs map[coordt.QueryID]Broadcast // all currently running broadcast operations + cfg ConfigPool // cfg is a copy of the optional configuration supplied to the Pool +} + +// NewPool initializes a new broadcast pool. If cfg is nil, the +// [DefaultConfigPool] will be used. Each broadcast pool creates its own query +// pool ([query.Pool]). A query pool limits the number of concurrent queries +// and already exists "stand-alone" beneath the [coord.PooledQueryBehaviour]. +// We are initializing a new one in here because: +// 1. it allows us to apply different limits to either broadcast or ordinary +// "get closer nodes" queries +// 2. the query pool logic will stay simpler +// 3. we don't need to cross communicated from the broadcast to the query pool +// 4. +func NewPool[K kad.Key[K], N kad.NodeID[K], M coordt.Message](self N, cfg *ConfigPool) (*Pool[K, N, M], error) { + if cfg == nil { + cfg = DefaultConfigPool() + } else if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("validate pool config: %w", err) + } + + qp, err := query.NewPool[K, N, M](self, cfg.pCfg) + if err != nil { + return nil, fmt.Errorf("new query pool: %w", err) + } + + return &Pool[K, N, M]{ + qp: qp, + bcs: map[coordt.QueryID]Broadcast{}, + cfg: *cfg, + }, nil +} + +// Advance advances the state of the broadcast [Pool]. It first handles the +// event by extracting the broadcast state machine that should handle this event +// from the [Pool.bcs] map and constructing the correct event for that broadcast +// state machine. If either the state machine wasn't found (shouldn't happen) or +// there's no corresponding broadcast event ([EventPoolPoll] for example) don't +// do anything and instead try to advance the other broadcast state machines. +func (p *Pool[K, N, M]) Advance(ctx context.Context, ev PoolEvent) (out PoolState) { + ctx, span := tele.StartSpan(ctx, "Pool.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + sm, bev := p.handleEvent(ctx, ev) + if sm != nil && bev != nil { + if state, terminal := p.advanceBroadcast(ctx, sm, bev); terminal { + return state + } + } + + // advance other state machines until we have reached a terminal state in any + for _, bsm := range p.bcs { + if sm == bsm { + continue + } + + state, terminal := p.advanceBroadcast(ctx, bsm, &EventBroadcastPoll{}) + if terminal { + return state + } + } + + return &StatePoolIdle{} +} + +// handleEvent receives a broadcast [PoolEvent] and returns the corresponding +// broadcast state machine [FollowUp] or [Optimistic] plus the event for that +// state machine. If any return parameter is nil, either the pool event was for +// an unknown query or the event doesn't need to be forwarded to the state +// machine. +func (p *Pool[K, N, M]) handleEvent(ctx context.Context, ev PoolEvent) (sm Broadcast, out BroadcastEvent) { + _, span := tele.StartSpan(ctx, "Pool.handleEvent", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() + + switch ev := ev.(type) { + case *EventPoolStartBroadcast[K, N, M]: + // first initialize the state machine for the broadcast desired strategy + switch cfg := ev.Config.(type) { + case *ConfigFollowUp: + p.bcs[ev.QueryID] = NewFollowUp(ev.QueryID, p.qp, ev.Message, cfg) + case *ConfigOptimistic: + panic("implement me") + } + + // start the new state machine + return p.bcs[ev.QueryID], &EventBroadcastStart[K, N]{ + Target: ev.Target, + Seed: ev.Seed, + } + + case *EventPoolStopBroadcast: + return p.bcs[ev.QueryID], &EventBroadcastStop{} + + case *EventPoolGetCloserNodesSuccess[K, N]: + return p.bcs[ev.QueryID], &EventBroadcastNodeResponse[K, N]{ + NodeID: ev.NodeID, + CloserNodes: ev.CloserNodes, + } + + case *EventPoolGetCloserNodesFailure[K, N]: + return p.bcs[ev.QueryID], &EventBroadcastNodeFailure[K, N]{ + NodeID: ev.NodeID, + Error: ev.Error, + } + + case *EventPoolStoreRecordSuccess[K, N, M]: + return p.bcs[ev.QueryID], &EventBroadcastStoreRecordSuccess[K, N, M]{ + NodeID: ev.NodeID, + Request: ev.Request, + Response: ev.Response, + } + + case *EventPoolStoreRecordFailure[K, N, M]: + return p.bcs[ev.QueryID], &EventBroadcastStoreRecordFailure[K, N, M]{ + NodeID: ev.NodeID, + Request: ev.Request, + Error: ev.Error, + } + + case *EventPoolPoll: + // no event to process + + default: + panic(fmt.Sprintf("unexpected event: %T", ev)) + } + + return nil, nil +} + +// advanceBroadcast advances the given broadcast state machine ([FollowUp] or +// [Optimistic]) and returns the new [Pool] state ([PoolState]). The additional +// boolean value indicates whether the returned [PoolState] should be ignored. +func (p *Pool[K, N, M]) advanceBroadcast(ctx context.Context, sm Broadcast, bev BroadcastEvent) (PoolState, bool) { + ctx, span := tele.StartSpan(ctx, "Pool.advanceBroadcast", trace.WithAttributes(tele.AttrInEvent(bev))) + defer span.End() + + state := sm.Advance(ctx, bev) + switch st := state.(type) { + case *StateBroadcastFindCloser[K, N]: + return &StatePoolFindCloser[K, N]{ + QueryID: st.QueryID, + NodeID: st.NodeID, + Target: st.Target, + }, true + case *StateBroadcastWaiting: + return &StatePoolWaiting{}, true + case *StateBroadcastStoreRecord[K, N, M]: + return &StatePoolStoreRecord[K, N, M]{ + QueryID: st.QueryID, + NodeID: st.NodeID, + Message: st.Message, + }, true + case *StateBroadcastFinished[K, N]: + delete(p.bcs, st.QueryID) + return &StatePoolBroadcastFinished[K, N]{ + QueryID: st.QueryID, + Contacted: st.Contacted, + Errors: st.Errors, + }, true + } + + return nil, false +} + +// PoolState must be implemented by all states that a [Pool] can reach. States +// are basically the events that the [Pool] emits that other state machines or +// behaviours could react upon. +type PoolState interface { + poolState() +} + +// StatePoolFindCloser indicates to the broadcast behaviour that a broadcast +// state machine and indirectly the broadcast pool wants to query the given node +// (NodeID) for closer nodes to the target key (Target). +type StatePoolFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message + Target K // the key that the query wants to find closer nodes for + NodeID N // the node to send the message to +} + +// StatePoolWaiting indicates that the broadcast [Pool] is waiting for network +// I/O to finish. It means the [Pool] isn't idle, but there are operations +// in-flight that it is waiting on to finish. +type StatePoolWaiting struct{} + +// StatePoolStoreRecord indicates to the upper layer that the broadcast [Pool] +// wants to store a record using the given Message with the given NodeID. The +// network behaviour should take over and notify the [coord.PooledBroadcastBehaviour] +// about updates. +type StatePoolStoreRecord[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message + NodeID N // the node to send the message to + Message M // the message that should be sent to the remote node +} + +// StatePoolBroadcastFinished indicates that the broadcast operation with the +// id QueryID has finished. During that operation, all nodes in Contacted have +// been contacted to store the record. The Contacted slice does not contain +// the nodes we have queried to find the closest nodes to the target key - only +// the ones that we eventually contacted to store the record. The Errors map +// maps the string representation of any node N in the Contacted slice to a +// potential error struct that contains the original Node and error. In the best +// case, this Errors map is empty. +type StatePoolBroadcastFinished[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the broadcast operation that has finished + Contacted []N // all nodes we contacted to store the record (successful or not) + Errors map[string]struct { // any error that occurred for any node that we contacted + Node N // a node from the Contacted slice + Err error // the error that happened when contacting that Node + } +} + +// StatePoolIdle means that the broadcast [Pool] is not managing any broadcast +// operations at this time. +type StatePoolIdle struct{} + +// poolState() ensures that only [PoolState]s can be returned by advancing the +// [Pool] state machine. +func (*StatePoolFindCloser[K, N]) poolState() {} +func (*StatePoolWaiting) poolState() {} +func (*StatePoolStoreRecord[K, N, M]) poolState() {} +func (*StatePoolBroadcastFinished[K, N]) poolState() {} +func (*StatePoolIdle) poolState() {} + +// PoolEvent is an event intended to advance the state of the broadcast [Pool] +// state machine. The [Pool] state machine only operates on events that +// implement this interface. An "Event" is the opposite of a "State." An "Event" +// flows into the state machine and a "State" flows out of it. +type PoolEvent interface { + poolEvent() +} + +// EventPoolPoll is an event that signals the broadcast [Pool] state machine +// that it can perform housekeeping work such as time out queries. +type EventPoolPoll struct{} + +// EventPoolStartBroadcast is an event that attempts to start a new broadcast +// operation. This is the entry point. +type EventPoolStartBroadcast[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the unique ID for this operation + Target K // the key we want to store the record for + Message M // the message that we want to send to the closest peers (this encapsulates the payload we want to store) + Seed []N // the closest nodes we know so far and from where we start the operation + Config Config // the configuration for this operation. Most importantly, this defines the broadcast strategy ([FollowUp] or [Optimistic]) +} + +// EventPoolStopBroadcast notifies broadcast [Pool] to stop a broadcast +// operation. +type EventPoolStopBroadcast struct { + QueryID coordt.QueryID // the id of the broadcast operation that should be stopped +} + +// EventPoolGetCloserNodesSuccess notifies a [Pool] that a remote node (NodeID) +// has successfully responded with closer nodes (CloserNodes) to the Target key +// for the broadcast operation with the given id (QueryID). +type EventPoolGetCloserNodesSuccess[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the broadcast operation that this response belongs to + NodeID N // the node the message was sent to and that replied + Target K // the key we want are searching closer nodes for + CloserNodes []N // the closer nodes sent by the node NodeID +} + +// EventPoolGetCloserNodesFailure notifies a [Pool] that a remote node (NodeID) +// has failed responding with closer nodes to the Target key for the broadcast +// operation with the given id (QueryID). +type EventPoolGetCloserNodesFailure[K kad.Key[K], N kad.NodeID[K]] struct { + QueryID coordt.QueryID // the id of the query that sent the message + NodeID N // the node the message was sent to and that has replied + Target K // the key we want are searching closer nodes for + Error error // the error that caused the failure, if any +} + +// EventPoolStoreRecordSuccess noties the broadcast [Pool] that storing a record +// with a remote node (NodeID) was successful. The message that was sent is held +// in Request, and the returned value is contained in Response. However, in the +// case of the Amino DHT, nodes do not respond with a confirmation, so Response +// will always be nil. Check out [pb.Message.ExpectResponse] for information +// about which requests should receive a response. +type EventPoolStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the id of the query that sent the message + NodeID N // the node the message was sent to + Request M // the message that was sent to the remote node + Response M // the reply we got from the remote node (nil in many cases of the Amino DHT) +} + +// EventPoolStoreRecordFailure noties the broadcast [Pool] that storing a record +// with a remote node (NodeID) has failed. The message that was sent is hold +// in Request, and the error will be in Error. +type EventPoolStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the id of the query that sent the message + NodeID N // the node the message was sent to + Request M // the message that was sent to the remote node + Error error // the error that caused the failure +} + +// poolEvent() ensures that only events accepted by a broadcast [Pool] can be +// assigned to the [PoolEvent] interface. +func (*EventPoolStopBroadcast) poolEvent() {} +func (*EventPoolPoll) poolEvent() {} +func (*EventPoolStartBroadcast[K, N, M]) poolEvent() {} +func (*EventPoolGetCloserNodesSuccess[K, N]) poolEvent() {} +func (*EventPoolGetCloserNodesFailure[K, N]) poolEvent() {} +func (*EventPoolStoreRecordSuccess[K, N, M]) poolEvent() {} +func (*EventPoolStoreRecordFailure[K, N, M]) poolEvent() {} diff --git a/v2/internal/coord/brdcst/pool_test.go b/v2/internal/coord/brdcst/pool_test.go new file mode 100644 index 00000000..f9404f3a --- /dev/null +++ b/v2/internal/coord/brdcst/pool_test.go @@ -0,0 +1,301 @@ +package brdcst + +import ( + "context" + "fmt" + "testing" + + "github.com/plprobelab/go-kademlia/key" + "github.com/stretchr/testify/require" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" +) + +// Assert that Pool implements the common state machine interface +var _ coordt.StateMachine[PoolEvent, PoolState] = (*Pool[tiny.Key, tiny.Node, tiny.Message])(nil) + +func TestPoolStopWhenNoQueries(t *testing.T) { + ctx := context.Background() + cfg := DefaultConfigPool() + + self := tiny.NewNode(0) + + p, err := NewPool[tiny.Key, tiny.Node, tiny.Message](self, cfg) + require.NoError(t, err) + + state := p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolIdle{}, state) +} + +func TestPool_FollowUp_lifecycle(t *testing.T) { + // This test attempts to cover the whole lifecycle of + // a follow-up broadcast operation. + // + // We have a network of three peers: a, b, and, c + // First, we query all three while peer c fails to respond + // Second, we store the record with the remaining a and b, while b fails to respond + + ctx := context.Background() + cfg := DefaultConfigPool() + + self := tiny.NewNode(0) + + p, err := NewPool[tiny.Key, tiny.Node, tiny.Message](self, cfg) + require.NoError(t, err) + + msg := tiny.Message{Content: "store this"} + target := tiny.Key(0b00000001) + a := tiny.NewNode(0b00000100) // 4 + b := tiny.NewNode(0b00000011) // 3 + c := tiny.NewNode(0b00000010) // 2 + + queryID := coordt.QueryID("test") + + state := p.Advance(ctx, &EventPoolStartBroadcast[tiny.Key, tiny.Node, tiny.Message]{ + QueryID: queryID, + Target: target, + Message: msg, + Seed: []tiny.Node{a}, + Config: DefaultConfigFollowUp(), + }) + + // the query should attempt to contact the node it was given + st, ok := state.(*StatePoolFindCloser[tiny.Key, tiny.Node]) + require.True(t, ok) + + require.Equal(t, queryID, st.QueryID) // the query should be the one just added + require.Equal(t, a, st.NodeID) // the query should attempt to contact the node it was given + require.True(t, key.Equal(target, st.Target)) // with the correct target + + // polling the state machine returns waiting + state = p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolWaiting{}, state) + + // notify pool that the node was contacted successfully + // with a single closer node. + state = p.Advance(ctx, &EventPoolGetCloserNodesSuccess[tiny.Key, tiny.Node]{ + QueryID: queryID, + Target: target, + NodeID: a, + CloserNodes: []tiny.Node{a, b}, + }) + + // the query should attempt to contact the single closer node it has found + st, ok = state.(*StatePoolFindCloser[tiny.Key, tiny.Node]) + require.True(t, ok, "state is %T", state) + + require.Equal(t, queryID, st.QueryID) // the query should be the same + require.Equal(t, b, st.NodeID) // the query should attempt to contact the newly discovered node + require.True(t, key.Equal(target, st.Target)) // with the correct target + + // notify pool that the node was contacted successfully + // with no new node. + state = p.Advance(ctx, &EventPoolGetCloserNodesSuccess[tiny.Key, tiny.Node]{ + QueryID: queryID, + Target: target, + NodeID: b, + CloserNodes: []tiny.Node{b, c}, // returns additional node + }) + + // the query should attempt to contact the newly closer node it has found + st, ok = state.(*StatePoolFindCloser[tiny.Key, tiny.Node]) + require.True(t, ok) + + require.Equal(t, queryID, st.QueryID) // the query should be the same + require.Equal(t, c, st.NodeID) // the query should attempt to contact the newly discovered node + require.True(t, key.Equal(target, st.Target)) // with the correct target + + // this last node times out -> start contacting the other two + timeoutErr := fmt.Errorf("timeout") + state = p.Advance(ctx, &EventPoolGetCloserNodesFailure[tiny.Key, tiny.Node]{ + QueryID: queryID, + NodeID: c, + Target: target, + Error: timeoutErr, + }) + + // This means we should start the follow-up phase + srState, ok := state.(*StatePoolStoreRecord[tiny.Key, tiny.Node, tiny.Message]) + require.True(t, ok, "state is %T", state) + + require.Equal(t, queryID, srState.QueryID) + firstContactedNode := srState.NodeID + require.True(t, a == srState.NodeID || b == srState.NodeID) // we should contact either node - there's no inherent order + require.Equal(t, msg.Content, srState.Message.Content) + + // polling the state machine should trigger storing the record with + // the second node + state = p.Advance(ctx, &EventPoolPoll{}) + srState, ok = state.(*StatePoolStoreRecord[tiny.Key, tiny.Node, tiny.Message]) + require.True(t, ok, "state is %T", state) + + require.Equal(t, queryID, srState.QueryID) + require.True(t, a == srState.NodeID || b == srState.NodeID) // we should contact either node - there's no inherent order + require.NotEqual(t, firstContactedNode, srState.NodeID) // should be the other one now + require.Equal(t, msg.Content, srState.Message.Content) + + // since we have two requests in-flight, polling should return a waiting state machine + state = p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolWaiting{}, state) + + // first response from storing the record comes back + state = p.Advance(ctx, &EventPoolStoreRecordSuccess[tiny.Key, tiny.Node, tiny.Message]{ + QueryID: queryID, + NodeID: a, + Request: msg, + }) + require.IsType(t, &StatePoolWaiting{}, state) + + // second response from storing the record comes back and it failed! + state = p.Advance(ctx, &EventPoolStoreRecordFailure[tiny.Key, tiny.Node, tiny.Message]{ + QueryID: queryID, + NodeID: b, + Request: msg, + Error: timeoutErr, + }) + + // since we have contacted all nodes we knew, the broadcast has finished + finishState, ok := state.(*StatePoolBroadcastFinished[tiny.Key, tiny.Node]) + require.True(t, ok, "state is %T", state) + + require.Equal(t, queryID, finishState.QueryID) + require.Len(t, finishState.Contacted, 2) + require.Len(t, finishState.Errors, 1) + require.Equal(t, finishState.Errors[b.String()].Node, b) + require.Equal(t, finishState.Errors[b.String()].Err, timeoutErr) + + state = p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolIdle{}, state) + + require.Nil(t, p.bcs[queryID]) // should have been removed +} + +func TestPool_FollowUp_stop_during_query(t *testing.T) { + // This test attempts to cover the case where a followup broadcast operation + // is cancelled during the query phase + + ctx := context.Background() + cfg := DefaultConfigPool() + + self := tiny.NewNode(0) + + p, err := NewPool[tiny.Key, tiny.Node, tiny.Message](self, cfg) + require.NoError(t, err) + + msg := tiny.Message{Content: "store this"} + target := tiny.Key(0b00000001) + a := tiny.NewNode(0b00000100) // 4 + + queryID := coordt.QueryID("test") + + state := p.Advance(ctx, &EventPoolStartBroadcast[tiny.Key, tiny.Node, tiny.Message]{ + QueryID: queryID, + Target: target, + Message: msg, + Seed: []tiny.Node{a}, + Config: DefaultConfigFollowUp(), + }) + + // the query should attempt to contact the node it was given + st, ok := state.(*StatePoolFindCloser[tiny.Key, tiny.Node]) + require.True(t, ok, "state is %T", state) + + require.Equal(t, queryID, st.QueryID) // the query should be the one just added + require.Equal(t, a, st.NodeID) // the query should attempt to contact the node it was given + require.True(t, key.Equal(target, st.Target)) // with the correct target + + // polling the state machine returns waiting + state = p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolWaiting{}, state) + + state = p.Advance(ctx, &EventPoolStopBroadcast{ + QueryID: queryID, + }) + finish, ok := state.(*StatePoolBroadcastFinished[tiny.Key, tiny.Node]) + require.True(t, ok, "state is %T", state) + require.Len(t, finish.Contacted, 0) +} + +func TestPool_FollowUp_stop_during_followup_phase(t *testing.T) { + ctx := context.Background() + cfg := DefaultConfigPool() + + self := tiny.NewNode(0) + + p, err := NewPool[tiny.Key, tiny.Node, tiny.Message](self, cfg) + require.NoError(t, err) + + msg := tiny.Message{Content: "store this"} + target := tiny.Key(0b00000001) + a := tiny.NewNode(0b00000100) // 4 + b := tiny.NewNode(0b00000011) // 3 + + queryID := coordt.QueryID("test") + + state := p.Advance(ctx, &EventPoolStartBroadcast[tiny.Key, tiny.Node, tiny.Message]{ + QueryID: queryID, + Target: target, + Message: msg, + Seed: []tiny.Node{a, b}, + Config: DefaultConfigFollowUp(), + }) + + require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) + state = p.Advance(ctx, &EventPoolPoll{}) + require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) + + state = p.Advance(ctx, &EventPoolGetCloserNodesSuccess[tiny.Key, tiny.Node]{ + QueryID: queryID, + Target: target, + NodeID: a, + CloserNodes: []tiny.Node{a, b}, + }) + require.IsType(t, &StatePoolWaiting{}, state) + + state = p.Advance(ctx, &EventPoolGetCloserNodesSuccess[tiny.Key, tiny.Node]{ + QueryID: queryID, + Target: target, + NodeID: b, + CloserNodes: []tiny.Node{a, b}, + }) + require.IsType(t, &StatePoolStoreRecord[tiny.Key, tiny.Node, tiny.Message]{}, state) + + state = p.Advance(ctx, &EventPoolStopBroadcast{ + QueryID: queryID, + }) + + st, ok := state.(*StatePoolBroadcastFinished[tiny.Key, tiny.Node]) + require.True(t, ok, "state is %T", state) + require.Equal(t, st.QueryID, queryID) + require.Len(t, st.Contacted, 2) + require.Len(t, st.Errors, 2) +} + +func TestPoolState_interface_conformance(t *testing.T) { + states := []PoolState{ + &StatePoolIdle{}, + &StatePoolWaiting{}, + &StatePoolStoreRecord[tiny.Key, tiny.Node, tiny.Message]{}, + &StatePoolFindCloser[tiny.Key, tiny.Node]{}, + &StatePoolBroadcastFinished[tiny.Key, tiny.Node]{}, + } + for _, st := range states { + st.poolState() // drives test coverage + } +} + +func TestPoolEvent_interface_conformance(t *testing.T) { + events := []PoolEvent{ + &EventPoolStopBroadcast{}, + &EventPoolPoll{}, + &EventPoolStartBroadcast[tiny.Key, tiny.Node, tiny.Message]{}, + &EventPoolGetCloserNodesSuccess[tiny.Key, tiny.Node]{}, + &EventPoolGetCloserNodesFailure[tiny.Key, tiny.Node]{}, + &EventPoolStoreRecordSuccess[tiny.Key, tiny.Node, tiny.Message]{}, + &EventPoolStoreRecordFailure[tiny.Key, tiny.Node, tiny.Message]{}, + } + for _, ev := range events { + ev.poolEvent() // drives test coverage + } +} diff --git a/v2/internal/coord/brdcst_events.go b/v2/internal/coord/brdcst_events.go new file mode 100644 index 00000000..ac016c25 --- /dev/null +++ b/v2/internal/coord/brdcst_events.go @@ -0,0 +1,34 @@ +package coord + +import ( + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/brdcst" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/pb" +) + +// EventStartBroadcast starts a new +type EventStartBroadcast struct { + QueryID coordt.QueryID + Target kadt.Key + Message *pb.Message + Seed []kadt.PeerID + Config brdcst.Config + Notify NotifyCloser[BehaviourEvent] +} + +func (*EventStartBroadcast) behaviourEvent() {} + +// EventBroadcastFinished is emitted by the coordinator when a broadcasting +// a record to the network has finished, either through running to completion or +// by being canceled. +type EventBroadcastFinished struct { + QueryID coordt.QueryID + Contacted []kadt.PeerID + Errors map[string]struct { + Node kadt.PeerID + Err error + } +} + +func (*EventBroadcastFinished) behaviourEvent() {} diff --git a/v2/internal/coord/coordinator.go b/v2/internal/coord/coordinator.go index d3d619b2..18e09467 100644 --- a/v2/internal/coord/coordinator.go +++ b/v2/internal/coord/coordinator.go @@ -20,6 +20,8 @@ import ( "go.uber.org/zap/exp/zapslog" "golang.org/x/exp/slog" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/brdcst" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" @@ -46,7 +48,7 @@ type Coordinator struct { rt kad.RoutingTable[kadt.Key, kadt.PeerID] // rtr is the message router used to send messages - rtr Router[kadt.Key, kadt.PeerID, *pb.Message] + rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] // networkBehaviour is the behaviour responsible for communicating with the network networkBehaviour *NetworkBehaviour @@ -57,6 +59,9 @@ type Coordinator struct { // queryBehaviour is the behaviour responsible for running user-submitted queries queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent] + // brdcstBehaviour is the behaviour responsible for running user-submitted queries to store records with nodes + brdcstBehaviour Behaviour[BehaviourEvent, BehaviourEvent] + // tele provides tracing and metric reporting capabilities tele *Telemetry @@ -162,7 +167,7 @@ func DefaultCoordinatorConfig() *CoordinatorConfig { } } -func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) { +func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) { if cfg == nil { cfg = DefaultCoordinatorConfig() } else if err := cfg.Validate(); err != nil { @@ -194,7 +199,7 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess bootstrapCfg.RequestConcurrency = cfg.RequestConcurrency bootstrapCfg.RequestTimeout = cfg.RequestTimeout - bootstrap, err := routing.NewBootstrap(kadt.PeerID(self), bootstrapCfg) + bootstrap, err := routing.NewBootstrap(self, bootstrapCfg) if err != nil { return nil, fmt.Errorf("bootstrap: %w", err) } @@ -228,6 +233,13 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer) + b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil) + if err != nil { + return nil, fmt.Errorf("broadcast: %w", err) + } + + brdcstBehaviour := NewPooledBroadcastBehaviour(b, cfg.Logger, tele.Tracer) + ctx, cancel := context.WithCancel(context.Background()) d := &Coordinator{ @@ -242,7 +254,9 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess networkBehaviour: networkBehaviour, routingBehaviour: routingBehaviour, queryBehaviour: queryBehaviour, - routingNotifier: nullRoutingNotifier{}, + brdcstBehaviour: brdcstBehaviour, + + routingNotifier: nullRoutingNotifier{}, } go d.eventLoop(ctx) @@ -266,9 +280,11 @@ func (c *Coordinator) eventLoop(ctx context.Context) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop") defer span.End() + for { var ev BehaviourEvent var ok bool + select { case <-ctx.Done(): // coordinator is closing @@ -279,6 +295,8 @@ func (c *Coordinator) eventLoop(ctx context.Context) { ev, ok = c.routingBehaviour.Perform(ctx) case <-c.queryBehaviour.Ready(): ev, ok = c.queryBehaviour.Perform(ctx) + case <-c.brdcstBehaviour.Ready(): + ev, ok = c.brdcstBehaviour.Perform(ctx) } if ok { @@ -296,6 +314,8 @@ func (c *Coordinator) dispatchEvent(ctx context.Context, ev BehaviourEvent) { c.networkBehaviour.Notify(ctx, ev) case QueryCommand: c.queryBehaviour.Notify(ctx, ev) + case BrdcstCommand: + c.brdcstBehaviour.Notify(ctx, ev) case RoutingCommand: c.routingBehaviour.Notify(ctx, ev) case RoutingNotification: @@ -316,11 +336,11 @@ func (c *Coordinator) SetRoutingNotifier(rn RoutingNotifier) { // GetNode retrieves the node associated with the given node id from the DHT's local routing table. // If the node isn't found in the table, it returns ErrNodeNotFound. -func (c *Coordinator) GetNode(ctx context.Context, id kadt.PeerID) (Node, error) { +func (c *Coordinator) GetNode(ctx context.Context, id kadt.PeerID) (coordt.Node, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetNode") defer span.End() if _, exists := c.rt.GetNode(id.Key()); !exists { - return nil, ErrNodeNotFound + return nil, coordt.ErrNodeNotFound } nh, err := c.networkBehaviour.getNodeHandler(ctx, id) @@ -331,11 +351,11 @@ func (c *Coordinator) GetNode(ctx context.Context, id kadt.PeerID) (Node, error) } // GetClosestNodes requests the n closest nodes to the key from the node's local routing table. -func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) { +func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetClosestNodes") defer span.End() closest := c.rt.NearestNodes(k, n) - nodes := make([]Node, 0, len(closest)) + nodes := make([]coordt.Node, 0, len(closest)) for _, id := range closest { nh, err := c.networkBehaviour.getNodeHandler(ctx, id) if err != nil { @@ -348,13 +368,13 @@ func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([ // GetValue requests that the node return any value associated with the supplied key. // If the node does not have a value for the key it returns ErrValueNotFound. -func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (Value, error) { +func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error) { panic("not implemented") } // PutValue requests that the node stores a value to be associated with the supplied key. // If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted. -func (c *Coordinator) PutValue(ctx context.Context, r Value, q int) error { +func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error { panic("not implemented") } @@ -369,7 +389,7 @@ func (c *Coordinator) PutValue(ctx context.Context, r Value, q int) error { // numResults specifies the minimum number of nodes to successfully contact before considering iteration complete. // The query is considered to be exhausted when it has received responses from at least this number of nodes // and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1. -func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn QueryFunc, numResults int) ([]kadt.PeerID, QueryStats, error) { +func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query") defer span.End() @@ -378,16 +398,16 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn Quer seeds, err := c.GetClosestNodes(ctx, target, 20) if err != nil { - return nil, QueryStats{}, err + return nil, coordt.QueryStats{}, err } seedIDs := make([]kadt.PeerID, 0, len(seeds)) for _, s := range seeds { - seedIDs = append(seedIDs, kadt.PeerID(s.ID())) + seedIDs = append(seedIDs, s.ID()) } waiter := NewWaiter[BehaviourEvent]() - queryID := c.newQueryID() + queryID := c.newOperationID() cmd := &EventStartFindCloserQuery{ QueryID: queryID, @@ -414,7 +434,7 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn Quer // numResults specifies the minimum number of nodes to successfully contact before considering iteration complete. // The query is considered to be exhausted when it has received responses from at least this number of nodes // and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1. -func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn QueryFunc, numResults int) (QueryStats, error) { +func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coordt.QueryFunc, numResults int) (coordt.QueryStats, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage") defer span.End() @@ -427,16 +447,16 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn Quer seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults) if err != nil { - return QueryStats{}, err + return coordt.QueryStats{}, err } seedIDs := make([]kadt.PeerID, 0, len(seeds)) for _, s := range seeds { - seedIDs = append(seedIDs, kadt.PeerID(s.ID())) + seedIDs = append(seedIDs, s.ID()) } waiter := NewWaiter[BehaviourEvent]() - queryID := c.newQueryID() + queryID := c.newOperationID() cmd := &EventStartMessageQuery{ QueryID: queryID, @@ -454,8 +474,47 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn Quer return stats, err } -func (c *Coordinator) waitForQuery(ctx context.Context, queryID query.QueryID, waiter *Waiter[BehaviourEvent], fn QueryFunc) ([]kadt.PeerID, QueryStats, error) { - var lastStats QueryStats +func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error { + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord") + defer span.End() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20) + if err != nil { + return err + } + + seedIDs := make([]kadt.PeerID, 0, len(seeds)) + for _, s := range seeds { + seedIDs = append(seedIDs, s.ID()) + } + + waiter := NewWaiter[BehaviourEvent]() + queryID := c.newOperationID() + + cmd := &EventStartBroadcast{ + QueryID: queryID, + Target: msg.Target(), + Message: msg, + Seed: seedIDs, + Notify: waiter, + Config: brdcst.DefaultConfigFollowUp(), + } + + // queue the start of the query + c.brdcstBehaviour.Notify(ctx, cmd) + + contacted, errs, err := c.waitForBroadcast(ctx, waiter) + fmt.Println(contacted) + fmt.Println(errs) + + return err +} + +func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, waiter *Waiter[BehaviourEvent], fn coordt.QueryFunc) ([]kadt.PeerID, coordt.QueryStats, error) { + var lastStats coordt.QueryStats for { select { case <-ctx.Done(): @@ -464,20 +523,20 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID query.QueryID, w ctx, ev := wev.Ctx, wev.Event switch ev := ev.(type) { case *EventQueryProgressed: - lastStats = QueryStats{ + lastStats = coordt.QueryStats{ Start: ev.Stats.Start, Requests: ev.Stats.Requests, Success: ev.Stats.Success, Failure: ev.Stats.Failure, } - nh, err := c.networkBehaviour.getNodeHandler(ctx, kadt.PeerID(ev.NodeID)) + nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID) if err != nil { // ignore unknown node break } err = fn(ctx, nh.ID(), ev.Response, lastStats) - if errors.Is(err, ErrSkipRemaining) { + if errors.Is(err, coordt.ErrSkipRemaining) { // done c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID}) return nil, lastStats, nil @@ -500,6 +559,28 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID query.QueryID, w } } +func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *Waiter[BehaviourEvent]) ([]kadt.PeerID, map[string]struct { + Node kadt.PeerID + Err error +}, error, +) { + for { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case wev := <-waiter.Chan(): + switch ev := wev.Event.(type) { + case *EventQueryProgressed: + case *EventBroadcastFinished: + return ev.Contacted, ev.Errors, nil + + default: + panic(fmt.Sprintf("unexpected event: %T", ev)) + } + } + } +} + // AddNodes suggests new DHT nodes to be added to the routing table. // If the routing table is updated as a result of this operation an EventRoutingUpdated notification // is emitted on the routing notification channel. @@ -559,9 +640,9 @@ func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) return nil } -func (c *Coordinator) newQueryID() query.QueryID { +func (c *Coordinator) newOperationID() coordt.QueryID { next := c.lastQueryID.Add(1) - return query.QueryID(fmt.Sprintf("%016x", next)) + return coordt.QueryID(fmt.Sprintf("%016x", next)) } // A BufferedRoutingNotifier is a [RoutingNotifier] that buffers [RoutingNotification] events and provides methods diff --git a/v2/internal/coord/coordinator_test.go b/v2/internal/coord/coordinator_test.go index c267b4a0..bbec6a38 100644 --- a/v2/internal/coord/coordinator_test.go +++ b/v2/internal/coord/coordinator_test.go @@ -5,6 +5,8 @@ import ( "log" "testing" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" @@ -106,7 +108,7 @@ func TestExhaustiveQuery(t *testing.T) { visited := make(map[string]int) // Record the nodes as they are visited - qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats QueryStats) error { + qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats coordt.QueryStats) error { visited[id.String()]++ return nil } @@ -144,7 +146,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { rn := NewBufferedRoutingNotifier() c.SetRoutingNotifier(rn) - qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats QueryStats) error { + qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats coordt.QueryStats) error { return nil } @@ -255,7 +257,7 @@ func TestIncludeNode(t *testing.T) { // the routing table should not contain the node yet _, err = d.GetNode(ctx, candidate) - require.ErrorIs(t, err, ErrNodeNotFound) + require.ErrorIs(t, err, coordt.ErrNodeNotFound) // inject a new node err = d.AddNodes(ctx, []kadt.PeerID{candidate}) diff --git a/v2/internal/coord/coretypes.go b/v2/internal/coord/coordt/coretypes.go similarity index 92% rename from v2/internal/coord/coretypes.go rename to v2/internal/coord/coordt/coretypes.go index 12c9ba26..2e000c81 100644 --- a/v2/internal/coord/coretypes.go +++ b/v2/internal/coord/coordt/coretypes.go @@ -1,4 +1,4 @@ -package coord +package coordt import ( "context" @@ -11,6 +11,15 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) +// TODO: rename to something like OperationID. This type isn't only used to identify queries but also other operations like broadcasts. +type QueryID string + +const InvalidQueryID QueryID = "" + +type StateMachine[E any, S any] interface { + Advance(context.Context, E) S +} + // Value is a value that may be stored in the DHT. type Value interface { Key() kadt.Key diff --git a/v2/internal/coord/event.go b/v2/internal/coord/event.go index a0037732..fddc40ef 100644 --- a/v2/internal/coord/event.go +++ b/v2/internal/coord/event.go @@ -1,6 +1,7 @@ package coord import ( + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "github.com/libp2p/go-libp2p-kad-dht/v2/pb" @@ -28,6 +29,12 @@ type QueryCommand interface { queryCommand() } +// BrdcstCommand is a type of [BehaviourEvent] that instructs a [BrdcstBehaviour] to perform an action. +type BrdcstCommand interface { + BehaviourEvent + brdcstCommand() +} + type NodeHandlerRequest interface { BehaviourEvent nodeHandlerRequest() @@ -51,7 +58,7 @@ func (*EventStartBootstrap) behaviourEvent() {} func (*EventStartBootstrap) routingCommand() {} type EventOutboundGetCloserNodes struct { - QueryID query.QueryID + QueryID coordt.QueryID To kadt.PeerID Target kadt.Key Notify Notify[BehaviourEvent] @@ -62,7 +69,7 @@ func (*EventOutboundGetCloserNodes) nodeHandlerRequest() {} func (*EventOutboundGetCloserNodes) networkCommand() {} type EventOutboundSendMessage struct { - QueryID query.QueryID + QueryID coordt.QueryID To kadt.PeerID Message *pb.Message Notify Notify[BehaviourEvent] @@ -73,7 +80,7 @@ func (*EventOutboundSendMessage) nodeHandlerRequest() {} func (*EventOutboundSendMessage) networkCommand() {} type EventStartMessageQuery struct { - QueryID query.QueryID + QueryID coordt.QueryID Target kadt.Key Message *pb.Message KnownClosestNodes []kadt.PeerID @@ -85,7 +92,7 @@ func (*EventStartMessageQuery) behaviourEvent() {} func (*EventStartMessageQuery) queryCommand() {} type EventStartFindCloserQuery struct { - QueryID query.QueryID + QueryID coordt.QueryID Target kadt.Key KnownClosestNodes []kadt.PeerID Notify NotifyCloser[BehaviourEvent] @@ -96,7 +103,7 @@ func (*EventStartFindCloserQuery) behaviourEvent() {} func (*EventStartFindCloserQuery) queryCommand() {} type EventStopQuery struct { - QueryID query.QueryID + QueryID coordt.QueryID } func (*EventStopQuery) behaviourEvent() {} @@ -113,7 +120,7 @@ func (*EventAddNode) routingCommand() {} // EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an // [EventOutboundGetCloserNodes] event has produced a successful response. type EventGetCloserNodesSuccess struct { - QueryID query.QueryID + QueryID coordt.QueryID To kadt.PeerID // To is the peer that the GetCloserNodes request was sent to. Target kadt.Key CloserNodes []kadt.PeerID @@ -125,7 +132,7 @@ func (*EventGetCloserNodesSuccess) nodeHandlerResponse() {} // EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an // [EventOutboundGetCloserNodes] event has failed to produce a valid response. type EventGetCloserNodesFailure struct { - QueryID query.QueryID + QueryID coordt.QueryID To kadt.PeerID // To is the peer that the GetCloserNodes request was sent to. Target kadt.Key Err error @@ -137,7 +144,8 @@ func (*EventGetCloserNodesFailure) nodeHandlerResponse() {} // EventSendMessageSuccess notifies a behaviour that a SendMessage request, initiated by an // [EventOutboundSendMessage] event has produced a successful response. type EventSendMessageSuccess struct { - QueryID query.QueryID + QueryID coordt.QueryID + Request *pb.Message To kadt.PeerID // To is the peer that the SendMessage request was sent to. Response *pb.Message CloserNodes []kadt.PeerID @@ -149,7 +157,8 @@ func (*EventSendMessageSuccess) nodeHandlerResponse() {} // EventSendMessageFailure notifies a behaviour that a SendMessage request, initiated by an // [EventOutboundSendMessage] event has failed to produce a valid response. type EventSendMessageFailure struct { - QueryID query.QueryID + QueryID coordt.QueryID + Request *pb.Message To kadt.PeerID // To is the peer that the SendMessage request was sent to. Target kadt.Key Err error @@ -161,7 +170,7 @@ func (*EventSendMessageFailure) nodeHandlerResponse() {} // EventQueryProgressed is emitted by the coordinator when a query has received a // response from a node. type EventQueryProgressed struct { - QueryID query.QueryID + QueryID coordt.QueryID NodeID kadt.PeerID Response *pb.Message Stats query.QueryStats @@ -172,7 +181,7 @@ func (*EventQueryProgressed) behaviourEvent() {} // EventQueryFinished is emitted by the coordinator when a query has finished, either through // running to completion or by being canceled. type EventQueryFinished struct { - QueryID query.QueryID + QueryID coordt.QueryID Stats query.QueryStats ClosestNodes []kadt.PeerID } diff --git a/v2/internal/coord/network.go b/v2/internal/coord/network.go index d4087564..487a2506 100644 --- a/v2/internal/coord/network.go +++ b/v2/internal/coord/network.go @@ -5,18 +5,19 @@ import ( "fmt" "sync" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/plprobelab/go-kademlia/key" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) type NetworkBehaviour struct { // rtr is the message router used to send messages - rtr Router[kadt.Key, kadt.PeerID, *pb.Message] + rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] nodeHandlersMu sync.Mutex nodeHandlers map[kadt.PeerID]*NodeHandler // TODO: garbage collect node handlers @@ -29,7 +30,7 @@ type NetworkBehaviour struct { tracer trace.Tracer } -func NewNetworkBehaviour(rtr Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NetworkBehaviour { +func NewNetworkBehaviour(rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NetworkBehaviour { b := &NetworkBehaviour{ rtr: rtr, nodeHandlers: make(map[kadt.PeerID]*NodeHandler), @@ -51,21 +52,19 @@ func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { switch ev := ev.(type) { case *EventOutboundGetCloserNodes: b.nodeHandlersMu.Lock() - p := kadt.PeerID(ev.To) - nh, ok := b.nodeHandlers[p] + nh, ok := b.nodeHandlers[ev.To] if !ok { - nh = NewNodeHandler(p, b.rtr, b.logger, b.tracer) - b.nodeHandlers[p] = nh + nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer) + b.nodeHandlers[ev.To] = nh } b.nodeHandlersMu.Unlock() nh.Notify(ctx, ev) case *EventOutboundSendMessage: b.nodeHandlersMu.Lock() - p := kadt.PeerID(ev.To) - nh, ok := b.nodeHandlers[p] + nh, ok := b.nodeHandlers[ev.To] if !ok { - nh = NewNodeHandler(p, b.rtr, b.logger, b.tracer) - b.nodeHandlers[p] = nh + nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer) + b.nodeHandlers[ev.To] = nh } b.nodeHandlersMu.Unlock() nh.Notify(ctx, ev) @@ -122,13 +121,13 @@ func (b *NetworkBehaviour) getNodeHandler(ctx context.Context, id kadt.PeerID) ( type NodeHandler struct { self kadt.PeerID - rtr Router[kadt.Key, kadt.PeerID, *pb.Message] + rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] queue *WorkQueue[NodeHandlerRequest] logger *slog.Logger tracer trace.Tracer } -func NewNodeHandler(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NodeHandler { +func NewNodeHandler(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NodeHandler { h := &NodeHandler{ self: self, rtr: rtr, @@ -179,6 +178,7 @@ func (h *NodeHandler) send(ctx context.Context, ev NodeHandlerRequest) bool { cmd.Notify.Notify(ctx, &EventSendMessageFailure{ QueryID: cmd.QueryID, To: h.self, + Request: cmd.Message, Err: fmt.Errorf("NodeHandler: %w", err), }) return false @@ -187,6 +187,7 @@ func (h *NodeHandler) send(ctx context.Context, ev NodeHandlerRequest) bool { cmd.Notify.Notify(ctx, &EventSendMessageSuccess{ QueryID: cmd.QueryID, To: h.self, + Request: cmd.Message, Response: resp, CloserNodes: resp.CloserNodes(), }) @@ -203,13 +204,13 @@ func (h *NodeHandler) ID() kadt.PeerID { // GetClosestNodes requests the n closest nodes to the key from the node's local routing table. // The node may return fewer nodes than requested. -func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) { +func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error) { ctx, span := h.tracer.Start(ctx, "NodeHandler.GetClosestNodes") defer span.End() w := NewWaiter[BehaviourEvent]() ev := &EventOutboundGetCloserNodes{ - QueryID: query.QueryID(key.HexString(k)), + QueryID: coordt.QueryID(key.HexString(k)), To: h.self, Target: k, Notify: w, @@ -224,7 +225,7 @@ func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([ switch res := we.Event.(type) { case *EventGetCloserNodesSuccess: - nodes := make([]Node, 0, len(res.CloserNodes)) + nodes := make([]coordt.Node, 0, len(res.CloserNodes)) for _, info := range res.CloserNodes { // TODO use a global registry of node handlers nodes = append(nodes, NewNodeHandler(info, h.rtr, h.logger, h.tracer)) @@ -245,12 +246,12 @@ func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([ // GetValue requests that the node return any value associated with the supplied key. // If the node does not have a value for the key it returns ErrValueNotFound. -func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (Value, error) { +func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (coordt.Value, error) { panic("not implemented") } // PutValue requests that the node stores a value to be associated with the supplied key. // If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted. -func (h *NodeHandler) PutValue(ctx context.Context, r Value, q int) error { +func (h *NodeHandler) PutValue(ctx context.Context, r coordt.Value, q int) error { panic("not implemented") } diff --git a/v2/internal/coord/query.go b/v2/internal/coord/query.go index 91cbab09..5d1df302 100644 --- a/v2/internal/coord/query.go +++ b/v2/internal/coord/query.go @@ -5,17 +5,19 @@ import ( "fmt" "sync" - "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" - "github.com/libp2p/go-libp2p-kad-dht/v2/pb" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/pb" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) type PooledQueryBehaviour struct { pool *query.Pool[kadt.Key, kadt.PeerID, *pb.Message] - waiters map[query.QueryID]NotifyCloser[BehaviourEvent] + waiters map[coordt.QueryID]NotifyCloser[BehaviourEvent] pendingMu sync.Mutex pending []BehaviourEvent @@ -28,7 +30,7 @@ type PooledQueryBehaviour struct { func NewPooledQueryBehaviour(pool *query.Pool[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *PooledQueryBehaviour { h := &PooledQueryBehaviour{ pool: pool, - waiters: make(map[query.QueryID]NotifyCloser[BehaviourEvent]), + waiters: make(map[coordt.QueryID]NotifyCloser[BehaviourEvent]), ready: make(chan struct{}, 1), logger: logger.With("behaviour", "query"), tracer: tracer, @@ -47,29 +49,27 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { switch ev := ev.(type) { case *EventStartFindCloserQuery: cmd = &query.EventPoolAddFindCloserQuery[kadt.Key, kadt.PeerID]{ - QueryID: ev.QueryID, - Target: ev.Target, - KnownClosestNodes: ev.KnownClosestNodes, + QueryID: ev.QueryID, + Target: ev.Target, + Seed: ev.KnownClosestNodes, } if ev.Notify != nil { p.waiters[ev.QueryID] = ev.Notify } case *EventStartMessageQuery: cmd = &query.EventPoolAddQuery[kadt.Key, kadt.PeerID, *pb.Message]{ - QueryID: ev.QueryID, - Target: ev.Target, - Message: ev.Message, - KnownClosestNodes: ev.KnownClosestNodes, + QueryID: ev.QueryID, + Target: ev.Target, + Message: ev.Message, + Seed: ev.KnownClosestNodes, } if ev.Notify != nil { p.waiters[ev.QueryID] = ev.Notify } - case *EventStopQuery: cmd = &query.EventPoolStopQuery{ QueryID: ev.QueryID, } - case *EventGetCloserNodesSuccess: for _, info := range ev.CloserNodes { // TODO: do this after advancing pool @@ -189,9 +189,12 @@ func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, boo } } -func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (BehaviourEvent, bool) { - ctx, span := p.tracer.Start(ctx, "PooledQueryBehaviour.advancePool") - defer span.End() +func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) { + ctx, span := p.tracer.Start(ctx, "PooledQueryBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev))) + defer func() { + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() pstate := p.pool.Advance(ctx, ev) switch st := pstate.(type) { diff --git a/v2/internal/coord/query/pool.go b/v2/internal/coord/query/pool.go index a94566cb..d6edea86 100644 --- a/v2/internal/coord/query/pool.go +++ b/v2/internal/coord/query/pool.go @@ -9,16 +9,15 @@ import ( "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/kaderr" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) -type Message interface{} - -type Pool[K kad.Key[K], N kad.NodeID[K], M Message] struct { +type Pool[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { // self is the node id of the system the pool is running on self N queries []*Query[K, N, M] - queryIndex map[QueryID]*Query[K, N, M] + queryIndex map[coordt.QueryID]*Query[K, N, M] // cfg is a copy of the optional configuration supplied to the pool cfg PoolConfig @@ -94,7 +93,7 @@ func DefaultPoolConfig() *PoolConfig { } } -func NewPool[K kad.Key[K], N kad.NodeID[K], M Message](self N, cfg *PoolConfig) (*Pool[K, N, M], error) { +func NewPool[K kad.Key[K], N kad.NodeID[K], M coordt.Message](self N, cfg *PoolConfig) (*Pool[K, N, M], error) { if cfg == nil { cfg = DefaultPoolConfig() } else if err := cfg.Validate(); err != nil { @@ -105,7 +104,7 @@ func NewPool[K kad.Key[K], N kad.NodeID[K], M Message](self N, cfg *PoolConfig) self: self, cfg: *cfg, queries: make([]*Query[K, N, M], 0), - queryIndex: make(map[QueryID]*Query[K, N, M]), + queryIndex: make(map[coordt.QueryID]*Query[K, N, M]), }, nil } @@ -119,13 +118,13 @@ func (p *Pool[K, N, M]) Advance(ctx context.Context, ev PoolEvent) PoolState { // eventQueryID keeps track of a query that was advanced via a specific event, to avoid it // being advanced twice - eventQueryID := InvalidQueryID + eventQueryID := coordt.InvalidQueryID switch tev := ev.(type) { case *EventPoolAddFindCloserQuery[K, N]: - p.addFindCloserQuery(ctx, tev.QueryID, tev.Target, tev.KnownClosestNodes, tev.NumResults) + p.addFindCloserQuery(ctx, tev.QueryID, tev.Target, tev.Seed, tev.NumResults) case *EventPoolAddQuery[K, N, M]: - p.addQuery(ctx, tev.QueryID, tev.Target, tev.Message, tev.KnownClosestNodes, tev.NumResults) + p.addQuery(ctx, tev.QueryID, tev.Target, tev.Message, tev.Seed, tev.NumResults) // TODO: return error as state case *EventPoolStopQuery: if qry, ok := p.queryIndex[tev.QueryID]; ok { @@ -242,7 +241,7 @@ func (p *Pool[K, N, M]) advanceQuery(ctx context.Context, qry *Query[K, N, M], q return nil, false } -func (p *Pool[K, N, M]) removeQuery(queryID QueryID) { +func (p *Pool[K, N, M]) removeQuery(queryID coordt.QueryID) { for i := range p.queries { if p.queries[i].id != queryID { continue @@ -258,7 +257,7 @@ func (p *Pool[K, N, M]) removeQuery(queryID QueryID) { // addQuery adds a query to the pool, returning the new query id // TODO: remove target argument and use msg.Target -func (p *Pool[K, N, M]) addQuery(ctx context.Context, queryID QueryID, target K, msg M, knownClosestNodes []N, numResults int) error { +func (p *Pool[K, N, M]) addQuery(ctx context.Context, queryID coordt.QueryID, target K, msg M, knownClosestNodes []N, numResults int) error { if _, exists := p.queryIndex[queryID]; exists { return fmt.Errorf("query id already in use") } @@ -285,7 +284,7 @@ func (p *Pool[K, N, M]) addQuery(ctx context.Context, queryID QueryID, target K, } // addQuery adds a find closer query to the pool, returning the new query id -func (p *Pool[K, N, M]) addFindCloserQuery(ctx context.Context, queryID QueryID, target K, knownClosestNodes []N, numResults int) error { +func (p *Pool[K, N, M]) addFindCloserQuery(ctx context.Context, queryID coordt.QueryID, target K, knownClosestNodes []N, numResults int) error { if _, exists := p.queryIndex[queryID]; exists { return fmt.Errorf("query id already in use") } @@ -322,15 +321,15 @@ type StatePoolIdle struct{} // StatePoolFindCloser indicates that a pool query wants to send a find closer nodes message to a node. type StatePoolFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID + QueryID coordt.QueryID Target K // the key that the query wants to find closer nodes for NodeID N // the node to send the message to Stats QueryStats } // StatePoolSendMessage indicates that a pool query wants to send a message to a node. -type StatePoolSendMessage[K kad.Key[K], N kad.NodeID[K], M Message] struct { - QueryID QueryID +type StatePoolSendMessage[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID NodeID N // the node to send the message to Message M Stats QueryStats @@ -346,14 +345,14 @@ type StatePoolWaitingWithCapacity struct{} // StatePoolQueryFinished indicates that a query has finished. type StatePoolQueryFinished[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID + QueryID coordt.QueryID Stats QueryStats ClosestNodes []N } // StatePoolQueryTimeout indicates that a query has timed out. type StatePoolQueryTimeout struct { - QueryID QueryID + QueryID coordt.QueryID Stats QueryStats } @@ -373,38 +372,38 @@ type PoolEvent interface { // EventPoolAddQuery is an event that attempts to add a new query that finds closer nodes to a target key. type EventPoolAddFindCloserQuery[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID // the id to use for the new query - Target K // the target key for the query - KnownClosestNodes []N // an initial set of close nodes the query should use - NumResults int // the minimum number of nodes to successfully contact before considering iteration complete + QueryID coordt.QueryID // the id to use for the new query + Target K // the target key for the query + Seed []N // an initial set of close nodes the query should use + NumResults int // the minimum number of nodes to successfully contact before considering iteration complete } // EventPoolAddQuery is an event that attempts to add a new query that sends a message. -type EventPoolAddQuery[K kad.Key[K], N kad.NodeID[K], M Message] struct { - QueryID QueryID // the id to use for the new query - Target K // the target key for the query - Message M // message to be sent to each node - KnownClosestNodes []N // an initial set of close nodes the query should use - NumResults int // the minimum number of nodes to successfully contact before considering iteration complete +type EventPoolAddQuery[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID // the id to use for the new query + Target K // the target key for the query + Message M // message to be sent to each node + Seed []N // an initial set of close nodes the query should use + NumResults int // the minimum number of nodes to successfully contact before considering iteration complete } // EventPoolStopQuery notifies a [Pool] to stop a query. type EventPoolStopQuery struct { - QueryID QueryID // the id of the query that should be stopped + QueryID coordt.QueryID // the id of the query that should be stopped } // EventPoolNodeResponse notifies a [Pool] that an attempt to contact a node has received a successful response. type EventPoolNodeResponse[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID // the id of the query that sent the message - NodeID N // the node the message was sent to - CloserNodes []N // the closer nodes sent by the node + QueryID coordt.QueryID // the id of the query that sent the message + NodeID N // the node the message was sent to + CloserNodes []N // the closer nodes sent by the node } // EventPoolNodeFailure notifies a [Pool] that an attempt to contact a node has failed. type EventPoolNodeFailure[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID // the id of the query that sent the message - NodeID N // the node the message was sent to - Error error // the error that caused the failure, if any + QueryID coordt.QueryID // the id of the query that sent the message + NodeID N // the node the message was sent to + Error error // the error that caused the failure, if any } // EventPoolPoll is an event that signals the pool that it can perform housekeeping work such as time out queries. diff --git a/v2/internal/coord/query/pool_test.go b/v2/internal/coord/query/pool_test.go index d54c6d23..88f30091 100644 --- a/v2/internal/coord/query/pool_test.go +++ b/v2/internal/coord/query/pool_test.go @@ -8,6 +8,7 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/stretchr/testify/require" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" ) @@ -105,13 +106,13 @@ func TestPoolAddFindCloserQueryStartsIfCapacity(t *testing.T) { target := tiny.Key(0b00000001) a := tiny.NewNode(0b00000100) // 4 - queryID := QueryID("test") + queryID := coordt.QueryID("test") // first thing the new pool should do is start the query state := p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID, - Target: target, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID, + Target: target, + Seed: []tiny.Node{a}, }) require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) @@ -145,14 +146,14 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) { target := tiny.Key(0b00000001) a := tiny.NewNode(0b00000100) // 4 - queryID := QueryID("test") + queryID := coordt.QueryID("test") msg := tiny.Message{Content: "msg"} // first thing the new pool should do is start the query state := p.Advance(ctx, &EventPoolAddQuery[tiny.Key, tiny.Node, tiny.Message]{ - QueryID: queryID, - Target: target, - Message: msg, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID, + Target: target, + Message: msg, + Seed: []tiny.Node{a}, }) require.IsType(t, &StatePoolSendMessage[tiny.Key, tiny.Node, tiny.Message]{}, state) @@ -186,13 +187,13 @@ func TestPoolNodeResponse(t *testing.T) { target := tiny.Key(0b00000001) a := tiny.NewNode(0b00000100) // 4 - queryID := QueryID("test") + queryID := coordt.QueryID("test") // first thing the new pool should do is start the query state := p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID, - Target: target, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID, + Target: target, + Seed: []tiny.Node{a}, }) require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) @@ -234,11 +235,11 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) { d := tiny.NewNode(0b00100000) // 32 // Add the first query - queryID1 := QueryID("1") + queryID1 := coordt.QueryID("1") state := p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID1, - Target: target, - KnownClosestNodes: []tiny.Node{a, b, c, d}, + QueryID: queryID1, + Target: target, + Seed: []tiny.Node{a, b, c, d}, }) require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) @@ -248,11 +249,11 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) { require.Equal(t, a, st.NodeID) // Add the second query - queryID2 := QueryID("2") + queryID2 := coordt.QueryID("2") state = p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID2, - Target: target, - KnownClosestNodes: []tiny.Node{a, b, c, d}, + QueryID: queryID2, + Target: target, + Seed: []tiny.Node{a, b, c, d}, }) // the first query should continue its operation in preference to starting the new query @@ -316,11 +317,11 @@ func TestPoolRespectsConcurrency(t *testing.T) { a := tiny.NewNode(0b00000100) // 4 // Add the first query - queryID1 := QueryID("1") + queryID1 := coordt.QueryID("1") state := p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID1, - Target: target, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID1, + Target: target, + Seed: []tiny.Node{a}, }) require.IsType(t, &StatePoolFindCloser[tiny.Key, tiny.Node]{}, state) @@ -330,11 +331,11 @@ func TestPoolRespectsConcurrency(t *testing.T) { require.Equal(t, a, st.NodeID) // Add the second query - queryID2 := QueryID("2") + queryID2 := coordt.QueryID("2") state = p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID2, - Target: target, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID2, + Target: target, + Seed: []tiny.Node{a}, }) // the second query should start since the first query has a request in flight @@ -344,11 +345,11 @@ func TestPoolRespectsConcurrency(t *testing.T) { require.Equal(t, a, st.NodeID) // Add a third query - queryID3 := QueryID("3") + queryID3 := coordt.QueryID("3") state = p.Advance(ctx, &EventPoolAddFindCloserQuery[tiny.Key, tiny.Node]{ - QueryID: queryID3, - Target: target, - KnownClosestNodes: []tiny.Node{a}, + QueryID: queryID3, + Target: target, + Seed: []tiny.Node{a}, }) // the third query should wait since the pool has reached maximum concurrency diff --git a/v2/internal/coord/query/query.go b/v2/internal/coord/query/query.go index b0003a83..00168082 100644 --- a/v2/internal/coord/query/query.go +++ b/v2/internal/coord/query/query.go @@ -11,13 +11,10 @@ import ( "github.com/plprobelab/go-kademlia/key" "go.opentelemetry.io/otel/trace" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) -type QueryID string - -const InvalidQueryID QueryID = "" - type QueryStats struct { Start time.Time End time.Time @@ -74,9 +71,9 @@ func DefaultQueryConfig() *QueryConfig { } } -type Query[K kad.Key[K], N kad.NodeID[K], M Message] struct { +type Query[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { self N - id QueryID + id coordt.QueryID // cfg is a copy of the optional configuration supplied to the query cfg QueryConfig @@ -99,7 +96,7 @@ type Query[K kad.Key[K], N kad.NodeID[K], M Message] struct { inFlight int } -func NewFindCloserQuery[K kad.Key[K], N kad.NodeID[K], M Message](self N, id QueryID, target K, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig) (*Query[K, N, M], error) { +func NewFindCloserQuery[K kad.Key[K], N kad.NodeID[K], M coordt.Message](self N, id coordt.QueryID, target K, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig) (*Query[K, N, M], error) { var empty M q, err := NewQuery[K, N, M](self, id, target, empty, iter, knownClosestNodes, cfg) if err != nil { @@ -109,7 +106,7 @@ func NewFindCloserQuery[K kad.Key[K], N kad.NodeID[K], M Message](self N, id Que return q, nil } -func NewQuery[K kad.Key[K], N kad.NodeID[K], M Message](self N, id QueryID, target K, msg M, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig) (*Query[K, N, M], error) { +func NewQuery[K kad.Key[K], N kad.NodeID[K], M coordt.Message](self N, id coordt.QueryID, target K, msg M, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig) (*Query[K, N, M], error) { if cfg == nil { cfg = DefaultQueryConfig() } else if err := cfg.Validate(); err != nil { @@ -385,22 +382,22 @@ type QueryState interface { // StateQueryFinished indicates that the [Query] has finished. type StateQueryFinished[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID + QueryID coordt.QueryID Stats QueryStats ClosestNodes []N // contains the closest nodes to the target key that were found } // StateQueryFindCloser indicates that the [Query] wants to send a find closer nodes message to a node. type StateQueryFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID QueryID + QueryID coordt.QueryID Target K // the key that the query wants to find closer nodes for NodeID N // the node to send the message to Stats QueryStats } // StateQuerySendMessage indicates that the [Query] wants to send a message to a node. -type StateQuerySendMessage[K kad.Key[K], N kad.NodeID[K], M Message] struct { - QueryID QueryID +type StateQuerySendMessage[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { + QueryID coordt.QueryID NodeID N // the node to send the message to Message M Stats QueryStats @@ -408,13 +405,13 @@ type StateQuerySendMessage[K kad.Key[K], N kad.NodeID[K], M Message] struct { // StateQueryWaitingAtCapacity indicates that the [Query] is waiting for results and is at capacity. type StateQueryWaitingAtCapacity struct { - QueryID QueryID + QueryID coordt.QueryID Stats QueryStats } // StateQueryWaitingWithCapacity indicates that the [Query] is waiting for results but has no further nodes to contact. type StateQueryWaitingWithCapacity struct { - QueryID QueryID + QueryID coordt.QueryID Stats QueryStats } diff --git a/v2/internal/coord/query/query_test.go b/v2/internal/coord/query/query_test.go index 6cb1d9d1..f1a211a2 100644 --- a/v2/internal/coord/query/query_test.go +++ b/v2/internal/coord/query/query_test.go @@ -9,6 +9,7 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/stretchr/testify/require" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" ) @@ -65,7 +66,7 @@ func TestQueryMessagesNode(t *testing.T) { cfg := DefaultQueryConfig() cfg.Clock = clk - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -114,7 +115,7 @@ func TestQueryFindCloserNearest(t *testing.T) { cfg := DefaultQueryConfig() cfg.Clock = clk - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -145,7 +146,7 @@ func TestQueryCancelFinishesQuery(t *testing.T) { cfg := DefaultQueryConfig() cfg.Clock = clk - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -188,7 +189,7 @@ func TestQueryNoClosest(t *testing.T) { cfg := DefaultQueryConfig() cfg.Clock = clk - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -232,7 +233,7 @@ func TestQueryWaitsAtCapacity(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -286,7 +287,7 @@ func TestQueryTimedOutNodeMakesCapacity(t *testing.T) { cfg.RequestTimeout = 3 * time.Minute cfg.Concurrency = len(knownNodes) - 1 // one less than the number of initial nodes - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -391,7 +392,7 @@ func TestQueryFindCloserResponseMakesCapacity(t *testing.T) { cfg.Clock = clk cfg.Concurrency = len(knownNodes) - 1 // one less than the number of initial nodes - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -475,7 +476,7 @@ func TestQueryCloserNodesAreAddedToIteration(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -531,7 +532,7 @@ func TestQueryCloserNodesIgnoresDuplicates(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -585,7 +586,7 @@ func TestQueryCancelFinishesIteration(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -623,7 +624,7 @@ func TestQueryFinishedIgnoresLaterEvents(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -680,7 +681,7 @@ func TestQueryWithCloserIterIgnoresMessagesFromUnknownNodes(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -732,7 +733,7 @@ func TestQueryWithCloserIterFinishesWhenNumResultsReached(t *testing.T) { cfg.Concurrency = 4 cfg.NumResults = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -792,7 +793,7 @@ func TestQueryWithCloserIterContinuesUntilNumResultsReached(t *testing.T) { cfg.Concurrency = 4 cfg.NumResults = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -864,7 +865,7 @@ func TestQueryNotContactedMakesCapacity(t *testing.T) { cfg.Clock = clk cfg.Concurrency = len(knownNodes) - 1 // one less than the number of initial nodes - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -922,7 +923,7 @@ func TestFindCloserQueryAllNotContactedFinishes(t *testing.T) { cfg.Clock = clk cfg.Concurrency = len(knownNodes) // allow all to be contacted at once - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -981,7 +982,7 @@ func TestQueryAllContactedFinishes(t *testing.T) { cfg.Concurrency = len(knownNodes) // allow all to be contacted at once cfg.NumResults = len(knownNodes) + 1 // one more than the size of the network - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -1040,7 +1041,7 @@ func TestQueryNeverMessagesSelf(t *testing.T) { cfg.Clock = clk cfg.Concurrency = 2 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := a qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) @@ -1090,7 +1091,7 @@ func TestQueryMessagesNearest(t *testing.T) { cfg := DefaultQueryConfig() cfg.Clock = clk - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) msg := tiny.Message{Content: "msg"} @@ -1131,7 +1132,7 @@ func TestQueryMessageResponseMakesCapacity(t *testing.T) { cfg.Clock = clk cfg.Concurrency = len(knownNodes) - 1 // one less than the number of initial nodes - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) msg := tiny.Message{Content: "msg"} @@ -1206,7 +1207,7 @@ func TestQueryAllNotContactedFinishes(t *testing.T) { cfg.Clock = clk cfg.Concurrency = len(knownNodes) // allow all to be contacted at once - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) msg := tiny.Message{Content: "msg"} @@ -1269,7 +1270,7 @@ func TestFindCloserQueryIncludesPartialClosestNodesWhenCancelled(t *testing.T) { cfg.Concurrency = 4 cfg.NumResults = 4 - queryID := QueryID("test") + queryID := coordt.QueryID("test") self := tiny.NewNode(0) qry, err := NewFindCloserQuery[tiny.Key, tiny.Node, tiny.Message](self, queryID, target, iter, knownNodes, cfg) diff --git a/v2/internal/coord/routing.go b/v2/internal/coord/routing.go index 832bfa64..70e64868 100644 --- a/v2/internal/coord/routing.go +++ b/v2/internal/coord/routing.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" @@ -19,13 +20,13 @@ type RoutingBehaviour struct { self kadt.PeerID // bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table - bootstrap SM[routing.BootstrapEvent, routing.BootstrapState] + bootstrap coordt.StateMachine[routing.BootstrapEvent, routing.BootstrapState] // include is the inclusion state machine, responsible for vetting nodes before including them in the routing table - include SM[routing.IncludeEvent, routing.IncludeState] + include coordt.StateMachine[routing.IncludeEvent, routing.IncludeState] // probe is the node probing state machine, responsible for periodically checking connectivity of nodes in the routing table - probe SM[routing.ProbeEvent, routing.ProbeState] + probe coordt.StateMachine[routing.ProbeEvent, routing.ProbeState] pendingMu sync.Mutex pending []BehaviourEvent @@ -35,7 +36,14 @@ type RoutingBehaviour struct { tracer trace.Tracer } -func NewRoutingBehaviour(self kadt.PeerID, bootstrap SM[routing.BootstrapEvent, routing.BootstrapState], include SM[routing.IncludeEvent, routing.IncludeState], probe SM[routing.ProbeEvent, routing.ProbeState], logger *slog.Logger, tracer trace.Tracer) *RoutingBehaviour { +func NewRoutingBehaviour( + self kadt.PeerID, + bootstrap coordt.StateMachine[routing.BootstrapEvent, routing.BootstrapState], + include coordt.StateMachine[routing.IncludeEvent, routing.IncludeState], + probe coordt.StateMachine[routing.ProbeEvent, routing.ProbeState], + logger *slog.Logger, + tracer trace.Tracer, +) *RoutingBehaviour { r := &RoutingBehaviour{ self: self, bootstrap: bootstrap, diff --git a/v2/internal/coord/routing/bootstrap.go b/v2/internal/coord/routing/bootstrap.go index 914f9615..8580fbc1 100644 --- a/v2/internal/coord/routing/bootstrap.go +++ b/v2/internal/coord/routing/bootstrap.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) @@ -106,7 +107,7 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst qryCfg.Concurrency = b.cfg.RequestConcurrency qryCfg.RequestTimeout = b.cfg.RequestTimeout - queryID := query.QueryID("bootstrap") + queryID := coordt.QueryID("bootstrap") qry, err := query.NewFindCloserQuery[K, N, any](b.self, queryID, b.self.Key(), iter, tev.KnownClosestNodes, qryCfg) if err != nil { @@ -195,7 +196,7 @@ type BootstrapState interface { // StateBootstrapFindCloser indicates that the bootstrap query wants to send a find closer nodes message to a node. type StateBootstrapFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID query.QueryID + QueryID coordt.QueryID Target K // the key that the query wants to find closer nodes for NodeID N // the node to send the message to Stats query.QueryStats diff --git a/v2/internal/coord/routing/bootstrap_test.go b/v2/internal/coord/routing/bootstrap_test.go index 70c8b6f0..29123980 100644 --- a/v2/internal/coord/routing/bootstrap_test.go +++ b/v2/internal/coord/routing/bootstrap_test.go @@ -8,8 +8,8 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/stretchr/testify/require" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" ) func TestBootstrapConfigValidate(t *testing.T) { @@ -85,7 +85,7 @@ func TestBootstrapStart(t *testing.T) { st := state.(*StateBootstrapFindCloser[tiny.Key, tiny.Node]) // the query should be the one just added - require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, coordt.QueryID("bootstrap"), st.QueryID) // the query should attempt to contact the node it was given require.Equal(t, a, st.NodeID) @@ -118,7 +118,7 @@ func TestBootstrapMessageResponse(t *testing.T) { // the bootstrap should attempt to contact the node it was given st := state.(*StateBootstrapFindCloser[tiny.Key, tiny.Node]) - require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, coordt.QueryID("bootstrap"), st.QueryID) require.Equal(t, a, st.NodeID) // notify bootstrap that node was contacted successfully, but no closer nodes @@ -163,7 +163,7 @@ func TestBootstrapProgress(t *testing.T) { // the bootstrap should attempt to contact the closest node it was given require.IsType(t, &StateBootstrapFindCloser[tiny.Key, tiny.Node]{}, state) st := state.(*StateBootstrapFindCloser[tiny.Key, tiny.Node]) - require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, coordt.QueryID("bootstrap"), st.QueryID) require.Equal(t, a, st.NodeID) // next the bootstrap attempts to contact second nearest node diff --git a/v2/internal/coord/routing/include_test.go b/v2/internal/coord/routing/include_test.go index a565521a..a02def62 100644 --- a/v2/internal/coord/routing/include_test.go +++ b/v2/internal/coord/routing/include_test.go @@ -5,10 +5,11 @@ import ( "testing" "github.com/benbjohnson/clock" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/routing/simplert" "github.com/stretchr/testify/require" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/tiny" ) func TestIncludeConfigValidate(t *testing.T) { diff --git a/v2/internal/coord/routing_test.go b/v2/internal/coord/routing_test.go index c789c9dc..545680da 100644 --- a/v2/internal/coord/routing_test.go +++ b/v2/internal/coord/routing_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "go.opentelemetry.io/otel" "github.com/benbjohnson/clock" @@ -13,7 +15,6 @@ import ( "golang.org/x/exp/slog" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/internal/nettest" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" @@ -65,7 +66,7 @@ func TestRoutingBootstrapGetClosestNodesSuccess(t *testing.T) { routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, slog.Default(), otel.Tracer("test")) ev := &EventGetCloserNodesSuccess{ - QueryID: query.QueryID("bootstrap"), + QueryID: coordt.QueryID("bootstrap"), To: nodes[1].NodeID, Target: nodes[0].NodeID.Key(), CloserNodes: []kadt.PeerID{nodes[2].NodeID}, @@ -99,7 +100,7 @@ func TestRoutingBootstrapGetClosestNodesFailure(t *testing.T) { failure := errors.New("failed") ev := &EventGetCloserNodesFailure{ - QueryID: query.QueryID("bootstrap"), + QueryID: coordt.QueryID("bootstrap"), To: nodes[1].NodeID, Target: nodes[0].NodeID.Key(), Err: failure, @@ -163,7 +164,7 @@ func TestRoutingIncludeGetClosestNodesSuccess(t *testing.T) { routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, slog.Default(), otel.Tracer("test")) ev := &EventGetCloserNodesSuccess{ - QueryID: query.QueryID("include"), + QueryID: coordt.QueryID("include"), To: nodes[1].NodeID, Target: nodes[0].NodeID.Key(), CloserNodes: []kadt.PeerID{nodes[2].NodeID}, @@ -197,7 +198,7 @@ func TestRoutingIncludeGetClosestNodesFailure(t *testing.T) { failure := errors.New("failed") ev := &EventGetCloserNodesFailure{ - QueryID: query.QueryID("include"), + QueryID: coordt.QueryID("include"), To: nodes[1].NodeID, Target: nodes[0].NodeID.Key(), Err: failure, @@ -290,6 +291,6 @@ func TestRoutingIncludedNodeAddToProbeList(t *testing.T) { // confirm that the message is for the correct node oev = dev.(*EventOutboundGetCloserNodes) - require.Equal(t, query.QueryID("probe"), oev.QueryID) + require.Equal(t, coordt.QueryID("probe"), oev.QueryID) require.Equal(t, candidate, oev.To) } diff --git a/v2/kadt/kadt.go b/v2/kadt/kadt.go index 2ad4bbef..6e6aa8c2 100644 --- a/v2/kadt/kadt.go +++ b/v2/kadt/kadt.go @@ -11,8 +11,58 @@ import ( ) // Key is a type alias for the type of key that's used with this DHT -// implementation. -type Key = key.Key256 +// implementation. In the Amino DHT, we are sending around the preimage +// of the actual key that's used for calculating Kademlia distance. That's +// why this Key struct also holds the preimage bytes. +type Key struct { + key key.Key256 + preimage []byte +} + +var _ kad.Key[Key] = (*Key)(nil) + +// NewKey initializes a new key struct based on the given preimage bytes. These +// bytes are SHA256 hashed and stored as the actual Kademlia key that's used +// to calculate distances in the XOR keyspace. +func NewKey(preimage []byte) Key { + h := sha256.Sum256(preimage) + return Key{ + key: key.NewKey256(h[:]), + preimage: preimage, + } +} + +// MsgKey returns the bytes that should be used inside Kademlia RPC messages. +// The returned value is the preimage to the actual Kademlia key. To arrive +// at the Kademlia key, these MsgKey bytes must be SHA256 hashed +func (k Key) MsgKey() []byte { + return k.preimage +} + +func (k Key) BitLen() int { + return k.key.BitLen() +} + +func (k Key) Bit(i int) uint { + return k.key.Bit(i) +} + +func (k Key) Xor(other Key) Key { + return Key{key: k.key.Xor(other.key)} +} + +func (k Key) CommonPrefixLength(other Key) int { + return k.key.CommonPrefixLength(other.key) +} + +func (k Key) Compare(other Key) int { + return k.key.Compare(other.key) +} + +// HexString returns a string containing the hexadecimal representation of the key. +func (k Key) HexString() string { + return k.key.HexString() +} // PeerID is a type alias for [peer.ID] that implements the [kad.NodeID] // interface. This means we can use PeerID for any operation that interfaces @@ -26,8 +76,7 @@ var _ kad.NodeID[Key] = PeerID("") // SHA256 hashes of, in this case, peer.IDs. This means this Key method takes // the [peer.ID], hashes it and constructs a 256-bit key. func (p PeerID) Key() Key { - h := sha256.Sum256([]byte(p)) - return key.NewKey256(h[:]) + return NewKey([]byte(p)) } // String calls String on the underlying [peer.ID] and returns a string like diff --git a/v2/pb/msg.aux.go b/v2/pb/msg.aux.go index 68ac067a..d8066dfe 100644 --- a/v2/pb/msg.aux.go +++ b/v2/pb/msg.aux.go @@ -2,12 +2,11 @@ package pb import ( "bytes" - "crypto/sha256" + "fmt" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" - "github.com/plprobelab/go-kademlia/key" "golang.org/x/exp/slog" ) @@ -15,8 +14,30 @@ import ( // It is used to let these types conform to interfaces or add convenience methods. func (m *Message) Target() kadt.Key { - b := sha256.Sum256(m.Key) - return key.NewKey256(b[:]) + return kadt.NewKey(m.Key) +} + +// ExpectResponse returns true if we expect a response from the remote peer if +// we sent a message with the given type to them. For example, when a peer sends +// a PUT_VALUE message to another peer, that other peer won't respond with +// anything. +func (m *Message) ExpectResponse() bool { + switch m.Type { + case Message_PUT_VALUE: + return false + case Message_GET_VALUE: + return true + case Message_ADD_PROVIDER: + return false + case Message_GET_PROVIDERS: + return true + case Message_FIND_NODE: + return true + case Message_PING: + return true + default: + panic(fmt.Sprintf("unexpected message type %d", m.Type)) + } } // FromAddrInfo constructs a [Message_Peer] from the given [peer.AddrInfo]. diff --git a/v2/pb/msg.aux_test.go b/v2/pb/msg.aux_test.go index dc3bd016..757b5c00 100644 --- a/v2/pb/msg.aux_test.go +++ b/v2/pb/msg.aux_test.go @@ -2,8 +2,31 @@ package pb import ( "testing" + + "github.com/stretchr/testify/assert" ) +func TestMessage_ExpectResponse(t *testing.T) { + t.Run("all covered", func(t *testing.T) { + defer func() { + assert.Nil(t, recover()) + }() + + for msgTypeInt := range Message_MessageType_name { + msg := &Message{Type: Message_MessageType(msgTypeInt)} + msg.ExpectResponse() + } + }) + + t.Run("unexpected type", func(t *testing.T) { + defer func() { + assert.NotNil(t, recover()) + }() + msg := &Message{Type: Message_MessageType(-1)} + msg.ExpectResponse() + }) +} + func TestMessage_Peer_invalid_maddr(t *testing.T) { msg := Message_Peer{ Addrs: [][]byte{[]byte("invalid-maddr")}, diff --git a/v2/query_test.go b/v2/query_test.go index 86ea55c2..bb628a0c 100644 --- a/v2/query_test.go +++ b/v2/query_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" ) @@ -22,11 +22,11 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) { // d3 does not know about d1 _, err := d3.kad.GetNode(ctx, kadt.PeerID(d1.host.ID())) - require.ErrorIs(t, err, coord.ErrNodeNotFound) + require.ErrorIs(t, err, coordt.ErrNodeNotFound) // d1 does not know about d3 _, err = d1.kad.GetNode(ctx, kadt.PeerID(d3.host.ID())) - require.ErrorIs(t, err, coord.ErrNodeNotFound) + require.ErrorIs(t, err, coordt.ErrNodeNotFound) // // but when d3 queries d2, d1 and d3 discover each other _, _ = d3.FindPeer(ctx, "something") diff --git a/v2/router.go b/v2/router.go index 70bd69ca..bc586a39 100644 --- a/v2/router.go +++ b/v2/router.go @@ -13,7 +13,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "google.golang.org/protobuf/proto" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) @@ -26,15 +26,7 @@ type router struct { ProtocolID protocol.ID } -var _ coord.Router[kadt.Key, kadt.PeerID, *pb.Message] = (*router)(nil) - -func FindKeyRequest(k kadt.Key) *pb.Message { - marshalledKey, _ := k.MarshalBinary() - return &pb.Message{ - Type: pb.Message_FIND_NODE, - Key: marshalledKey, - } -} +var _ coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] = (*router)(nil) func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Message) (*pb.Message, error) { // TODO: what to do with addresses in peer.AddrInfo? @@ -63,6 +55,10 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag return nil, fmt.Errorf("write message: %w", err) } + if !req.ExpectResponse() { + return nil, nil + } + data, err := reader.ReadMsg() if err != nil { return nil, fmt.Errorf("read message: %w", err) @@ -80,7 +76,12 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag } func (r *router) GetClosestNodes(ctx context.Context, to kadt.PeerID, target kadt.Key) ([]kadt.PeerID, error) { - resp, err := r.SendMessage(ctx, to, FindKeyRequest(target)) + req := &pb.Message{ + Type: pb.Message_FIND_NODE, + Key: target.MsgKey(), + } + + resp, err := r.SendMessage(ctx, to, req) if err != nil { return nil, err } diff --git a/v2/routing.go b/v2/routing.go index 569e66b9..eec85c30 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -8,16 +8,18 @@ import ( "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" - "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" - "github.com/libp2p/go-libp2p-kad-dht/v2/pb" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/otel/attribute" otel "go.opentelemetry.io/otel/trace" + + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) var _ routing.Routing = (*DHT)(nil) @@ -42,10 +44,10 @@ func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { target := kadt.PeerID(id) var foundPeer peer.ID - fn := func(ctx context.Context, visited kadt.PeerID, msg *pb.Message, stats coord.QueryStats) error { + fn := func(ctx context.Context, visited kadt.PeerID, msg *pb.Message, stats coordt.QueryStats) error { if peer.ID(visited) == id { foundPeer = peer.ID(visited) - return coord.ErrSkipRemaining + return coordt.ErrSkipRemaining } return nil } @@ -89,8 +91,22 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { return nil } - // TODO reach out to Zikade - panic("implement me") + // construct message + addrInfo := peer.AddrInfo{ + ID: d.host.ID(), + Addrs: d.host.Addrs(), + } + + msg := &pb.Message{ + Type: pb.Message_ADD_PROVIDER, + Key: c.Hash(), + ProviderPeers: []*pb.Message_Peer{ + pb.FromAddrInfo(addrInfo), + }, + } + + // finally, find the closest peers to the target key. + return d.kad.BroadcastRecord(ctx, msg) } func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { @@ -110,15 +126,46 @@ func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-ch panic("implement me") } -func (d *DHT) PutValue(ctx context.Context, key string, value []byte, option ...routing.Option) error { +// PutValue satisfies the [routing.Routing] interface and will add the given +// value to the k-closest nodes to keyStr. The parameter keyStr should have the +// format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To +// identify the closest peers to keyStr, that complete string will be SHA256 +// hashed. +func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue") defer span.End() - if err := d.putValueLocal(ctx, key, value); err != nil { + // first parse the routing options + rOpt := routing.Options{} // routing config + if err := rOpt.Apply(opts...); err != nil { + return fmt.Errorf("apply routing options: %w", err) + } + + // then always store the given value locally + if err := d.putValueLocal(ctx, keyStr, value); err != nil { return fmt.Errorf("put value locally: %w", err) } - panic("implement me") + // if the routing system should operate in offline mode, stop here + if rOpt.Offline { + return nil + } + + // construct Kademlia-key. Yes, we hash the complete key string which + // includes the namespace prefix. + msg := &pb.Message{ + Type: pb.Message_PUT_VALUE, + Key: []byte(keyStr), + Record: record.MakePutRecord(keyStr, value), + } + + // finally, find the closest peers to the target key. + err := d.kad.BroadcastRecord(ctx, msg) + if err != nil { + return fmt.Errorf("query error: %w", err) + } + + return nil } // putValueLocal stores a value in the local datastore without querying the network. @@ -166,7 +213,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, option ...routing.Option // TODO: quorum var value []byte - fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coord.QueryStats) error { + fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error { if resp == nil { return nil } @@ -181,7 +228,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, option ...routing.Option value = resp.GetRecord().GetValue() - return coord.ErrSkipRemaining + return coordt.ErrSkipRemaining } _, err = d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize) @@ -216,6 +263,7 @@ func (d *DHT) getValueLocal(ctx context.Context, key string) ([]byte, error) { if !ok { return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val) } + return rec.GetValue(), nil } @@ -233,7 +281,10 @@ func (d *DHT) Bootstrap(ctx context.Context) error { seed := make([]kadt.PeerID, len(d.cfg.BootstrapPeers)) for i, addrInfo := range d.cfg.BootstrapPeers { seed[i] = kadt.PeerID(addrInfo.ID) - d.host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Hour) // TODO: TTL + // TODO: how to handle TTL if BootstrapPeers become dynamic and don't + // point to stable peers or consist of ephemeral peers that we have + // observed during a previous run. + d.host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) } return d.kad.Bootstrap(ctx, seed) diff --git a/v2/routing_test.go b/v2/routing_test.go index ec80da31..8647b56e 100644 --- a/v2/routing_test.go +++ b/v2/routing_test.go @@ -1,11 +1,12 @@ package dht import ( - "fmt" "testing" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" @@ -21,9 +22,38 @@ func makePkKeyValue(t *testing.T) (string, []byte) { id, err := peer.IDFromPublicKey(pub) require.NoError(t, err) - key := fmt.Sprintf("/pk/%s", string(id)) + return routing.KeyForPublicKey(id), v +} + +func TestDHT_PutValue_local_only(t *testing.T) { + ctx := kadtest.CtxShort(t) + + top := NewTopology(t) + d := top.AddServer(nil) + + key, v := makePkKeyValue(t) + + err := d.PutValue(ctx, key, v, routing.Offline) + require.NoError(t, err) +} + +func TestDHT_PutValue_invalid_key(t *testing.T) { + ctx := kadtest.CtxShort(t) + + top := NewTopology(t) + d := top.AddClient(nil) + + _, v := makePkKeyValue(t) + + t.Run("unknown namespace", func(t *testing.T) { + err := d.PutValue(ctx, "/unknown/some_key", v) + assert.ErrorIs(t, err, routing.ErrNotSupported) + }) - return key, v + t.Run("no namespace", func(t *testing.T) { + err := d.PutValue(ctx, "no namespace", v) + assert.ErrorContains(t, err, "splitting key") + }) } func TestGetSetValueLocal(t *testing.T) {