diff --git a/coord/coordinator.go b/coord/coordinator.go index b15b050..3eef338 100644 --- a/coord/coordinator.go +++ b/coord/coordinator.go @@ -15,12 +15,49 @@ import ( "github.com/plprobelab/go-kademlia/network/address" "github.com/plprobelab/go-kademlia/network/endpoint" "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/routing" "github.com/plprobelab/go-kademlia/util" ) +// A StateMachine progresses through a set of states in response to transition events. +type StateMachine[S any, E any] interface { + // Advance advances the state of the state machine. + Advance(context.Context, E) S +} + +// eventQueue is a bounded, typed queue for events +// NOTE: this type is incompatible with the semantics of event.Queue which blocks on Dequeue +type eventQueue[E any] struct { + events chan E +} + +func newEventQueue[E any](capacity int) *eventQueue[E] { + return &eventQueue[E]{ + events: make(chan E, capacity), + } +} + +// Enqueue adds an event to the queue. It blocks if the queue is at capacity. +func (q *eventQueue[E]) Enqueue(ctx context.Context, e E) { + q.events <- e +} + +// Dequeue reads an event from the queue. It returns the event and a true value +// if an event was read or the zero value if the event type and false if no event +// was read. This method is non-blocking. +func (q *eventQueue[E]) Dequeue(ctx context.Context) (E, bool) { + select { + case e := <-q.events: + return e, true + default: + var v E + return v, false + } +} + // A Coordinator coordinates the state machines that comprise a Kademlia DHT -// Currently this is only queries but will expand to include other state machines such as routing table refresh, -// and reproviding. +// Currently this is only queries and bootstrapping but will expand to include other state machines such as +// routing table refresh, and reproviding. type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // self is the node id of the system the coordinator is running on self kad.NodeID[K] @@ -28,7 +65,17 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // cfg is a copy of the optional configuration supplied to the coordinator cfg Config - qp *query.Pool[K, A] + // pool is the query pool state machine, responsible for running user-submitted queries + pool StateMachine[query.PoolState, query.PoolEvent] + + // poolEvents is a fifo queue of events that are to be processed by the pool state machine + poolEvents *eventQueue[query.PoolEvent] + + // bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table + bootstrap StateMachine[routing.BootstrapState, routing.BootstrapEvent] + + // bootstrapEvents is a fifo queue of events that are to be processed by the bootstrap state machine + bootstrapEvents *eventQueue[routing.BootstrapEvent] // rt is the routing table used to look up nodes by distance rt kad.RoutingTable[K, kad.NodeID[K]] @@ -36,9 +83,10 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // ep is the message endpoint used to send requests ep endpoint.Endpoint[K, A] - peerstoreTTL time.Duration + // queue not used + queue event.EventQueue - queue event.EventQueue + // planner not used planner event.AwareActionPlanner outboundEvents chan KademliaEvent @@ -47,10 +95,15 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { const DefaultChanqueueCapacity = 1024 type Config struct { - // TODO: review if this is needed here PeerstoreTTL time.Duration // duration for which a peer is kept in the peerstore Clock clock.Clock // a clock that may replaced by a mock when testing + + QueryConcurrency int // the maximum number of queries that may be waiting for message responses at any one time + QueryTimeout time.Duration // the time to wait before terminating a query that is not making progress + + RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight + RequestTimeout time.Duration // the timeout queries should use for contacting a single node } // Validate checks the configuration options and returns an error if any have invalid values. @@ -62,13 +115,43 @@ func (cfg *Config) Validate() error { } } + if cfg.QueryConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("query concurrency must be greater than zero"), + } + } + if cfg.QueryTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("query timeout must be greater than zero"), + } + } + + if cfg.RequestConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("request concurrency must be greater than zero"), + } + } + + if cfg.RequestTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("request timeout must be greater than zero"), + } + } return nil } func DefaultConfig() *Config { return &Config{ - Clock: clock.New(), // use standard time - PeerstoreTTL: 10 * time.Minute, + Clock: clock.New(), // use standard time + PeerstoreTTL: 10 * time.Minute, + QueryConcurrency: 3, + QueryTimeout: 5 * time.Minute, + RequestConcurrency: 3, + RequestTimeout: time.Minute, } } @@ -81,20 +164,38 @@ func NewCoordinator[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], ep endpo qpCfg := query.DefaultPoolConfig() qpCfg.Clock = cfg.Clock + qpCfg.Concurrency = cfg.QueryConcurrency + qpCfg.Timeout = cfg.QueryTimeout + qpCfg.QueryConcurrency = cfg.RequestConcurrency + qpCfg.RequestTimeout = cfg.RequestTimeout qp, err := query.NewPool[K, A](self, qpCfg) if err != nil { return nil, fmt.Errorf("query pool: %w", err) } + + bootstrapCfg := routing.DefaultBootstrapConfig[K, A]() + bootstrapCfg.Clock = cfg.Clock + bootstrapCfg.Timeout = cfg.QueryTimeout + bootstrapCfg.RequestConcurrency = cfg.RequestConcurrency + bootstrapCfg.RequestTimeout = cfg.RequestTimeout + + bootstrap, err := routing.NewBootstrap(self, bootstrapCfg) + if err != nil { + return nil, fmt.Errorf("query pool: %w", err) + } return &Coordinator[K, A]{ - self: self, - cfg: *cfg, - ep: ep, - rt: rt, - qp: qp, - outboundEvents: make(chan KademliaEvent, 20), - queue: event.NewChanQueue(DefaultChanqueueCapacity), - planner: event.NewSimplePlanner(cfg.Clock), + self: self, + cfg: *cfg, + ep: ep, + rt: rt, + pool: qp, + poolEvents: newEventQueue[query.PoolEvent](20), // 20 is abitrary, move to config + bootstrap: bootstrap, + bootstrapEvents: newEventQueue[routing.BootstrapEvent](20), // 20 is abitrary, move to config + outboundEvents: make(chan KademliaEvent, 20), + queue: event.NewChanQueue(DefaultChanqueueCapacity), + planner: event.NewSimplePlanner(cfg.Clock), }, nil } @@ -102,71 +203,51 @@ func (c *Coordinator[K, A]) Events() <-chan KademliaEvent { return c.outboundEvents } -func (c *Coordinator[K, A]) handleInboundEvent(ctx context.Context, ev interface{}) { - switch tev := ev.(type) { - case *eventUnroutablePeer[K]: - // TODO: remove from routing table - c.dispatchQueryPoolEvent(ctx, nil) +func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool { + ctx, span := util.StartSpan(ctx, "Coordinator.RunOne") + defer span.End() - case *eventMessageFailed[K]: - qev := &query.EventPoolMessageFailure[K]{ - QueryID: tev.QueryID, - NodeID: tev.NodeID, - Error: tev.Error, - } + // Give the bootstrap state machine priority + // No queries can be run while a bootstrap is in progress + bev, ok := c.bootstrapEvents.Dequeue(ctx) + if !ok { + bev = &routing.EventBootstrapPoll{} + } - c.dispatchQueryPoolEvent(ctx, qev) + bstate := c.bootstrap.Advance(ctx, bev) + switch st := bstate.(type) { + case *routing.StateBootstrapMessage[K, A]: + c.sendBootstrapMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + return true - case *eventMessageResponse[K, A]: - if tev.Response != nil { - candidates := tev.Response.CloserNodes() - if len(candidates) > 0 { - // ignore error here - c.AddNodes(ctx, candidates) - } - } + case *routing.StateBootstrapWaiting: + // bootstrap waiting for a message response, don't proceed with other state machines + return false - // notify caller so they have chance to stop query - c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ - NodeID: tev.NodeID, - QueryID: tev.QueryID, - Response: tev.Response, - Stats: tev.Stats, + case *routing.StateBootstrapFinished: + c.outboundEvents <- &KademliaBootstrapFinishedEvent{ + Stats: st.Stats, } + return true - qev := &query.EventPoolMessageResponse[K, A]{ - QueryID: tev.QueryID, - NodeID: tev.NodeID, - Response: tev.Response, - } - c.dispatchQueryPoolEvent(ctx, qev) - case *eventAddQuery[K, A]: - qev := &query.EventPoolAddQuery[K, A]{ - QueryID: tev.QueryID, - Target: tev.Target, - ProtocolID: tev.ProtocolID, - Message: tev.Message, - KnownClosestNodes: tev.KnownClosestPeers, - } - c.dispatchQueryPoolEvent(ctx, qev) - case *eventStopQuery[K]: - qev := &query.EventPoolStopQuery{ - QueryID: tev.QueryID, - } - c.dispatchQueryPoolEvent(ctx, qev) + case *routing.StateBootstrapIdle: + // bootstrap not running, can proceed to other state machines + break default: - panic(fmt.Sprintf("unexpected event: %T", tev)) + panic(fmt.Sprintf("unexpected bootstrap state: %T", st)) } -} -func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query.PoolEvent) { - ctx, span := util.StartSpan(ctx, "Coordinator.dispatchQueryPoolEvent") - defer span.End() - // attempt to advance the query state machine - state := c.qp.Advance(ctx, ev) + // Attempt to advance an outbound query + pev, ok := c.poolEvents.Dequeue(ctx) + if !ok { + pev = &query.EventPoolPoll{} + } + + state := c.pool.Advance(ctx, pev) switch st := state.(type) { case *query.StatePoolQueryMessage[K, A]: - c.attemptSendMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + return true case *query.StatePoolWaitingAtCapacity: // TODO case *query.StatePoolWaitingWithCapacity: @@ -176,6 +257,7 @@ func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query QueryID: st.QueryID, Stats: st.Stats, } + return true // TODO case *query.StatePoolQueryTimeout: @@ -183,28 +265,83 @@ func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query case *query.StatePoolIdle: // TODO default: - panic(fmt.Sprintf("unexpected state: %T", st)) + panic(fmt.Sprintf("unexpected pool state: %T", st)) } + + return false } -func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { - ctx, span := util.StartSpan(ctx, "Coordinator.attemptSendMessage") +func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { + ctx, span := util.StartSpan(ctx, "Coordinator.sendQueryMessage") defer span.End() onSendError := func(ctx context.Context, err error) { if errors.Is(err, endpoint.ErrCannotConnect) { // here we can notify that the peer is unroutable, which would feed into peerstore and routing table - c.queue.Enqueue(ctx, &eventUnroutablePeer[K]{ - NodeID: to, - }) + // TODO: remove from routing table return } - c.queue.Enqueue(ctx, &eventMessageFailed[K]{ + + qev := &query.EventPoolMessageFailure[K]{ NodeID: to, QueryID: queryID, - Stats: stats, Error: err, - }) + } + c.poolEvents.Enqueue(ctx, qev) + } + + onMessageResponse := func(ctx context.Context, resp kad.Response[K, A], err error) { + if err != nil { + onSendError(ctx, err) + return + } + + if resp != nil { + candidates := resp.CloserNodes() + if len(candidates) > 0 { + // ignore error here + c.AddNodes(ctx, candidates) + } + } + + // notify caller so they have chance to stop query + c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ + NodeID: to, + QueryID: queryID, + Response: resp, + Stats: stats, + } + + qev := &query.EventPoolMessageResponse[K, A]{ + NodeID: to, + QueryID: queryID, + Response: resp, + } + c.poolEvents.Enqueue(ctx, qev) + } + + err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) + if err != nil { + onSendError(ctx, err) + } +} + +func (c *Coordinator[K, A]) sendBootstrapMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { + ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapMessage") + defer span.End() + + onSendError := func(ctx context.Context, err error) { + if errors.Is(err, endpoint.ErrCannotConnect) { + // here we can notify that the peer is unroutable, which would feed into peerstore and routing table + // TODO: remove from routing table + return + } + + bev := &routing.EventBootstrapMessageFailure[K]{ + NodeID: to, + Error: err, + } + c.bootstrapEvents.Enqueue(ctx, bev) } onMessageResponse := func(ctx context.Context, resp kad.Response[K, A], err error) { @@ -212,12 +349,28 @@ func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID addr onSendError(ctx, err) return } - c.queue.Enqueue(ctx, &eventMessageResponse[K, A]{ + + if resp != nil { + candidates := resp.CloserNodes() + if len(candidates) > 0 { + // ignore error here + c.AddNodes(ctx, candidates) + } + } + + // notify caller so they have chance to stop query + c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ NodeID: to, QueryID: queryID, Response: resp, Stats: stats, - }) + } + + bev := &routing.EventBootstrapMessageResponse[K, A]{ + NodeID: to, + Response: resp, + } + c.bootstrapEvents.Enqueue(ctx, bev) } err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) @@ -229,27 +382,22 @@ func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID addr func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error { knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20) - ev := &eventAddQuery[K, A]{ + qev := &query.EventPoolAddQuery[K, A]{ QueryID: queryID, Target: msg.Target(), ProtocolID: protocolID, Message: msg, - KnownClosestPeers: knownClosestPeers, + KnownClosestNodes: knownClosestPeers, } - - c.queue.Enqueue(ctx, ev) - // c.inboundEvents <- ev - + c.poolEvents.Enqueue(ctx, qev) return nil } func (c *Coordinator[K, A]) StopQuery(ctx context.Context, queryID query.QueryID) error { - ev := &eventStopQuery[K]{ + qev := &query.EventPoolStopQuery{ QueryID: queryID, } - - c.queue.Enqueue(ctx, ev) - // c.inboundEvents <- ev + c.poolEvents.Enqueue(ctx, qev) return nil } @@ -261,7 +409,7 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K continue } isNew := c.rt.AddNode(info.ID()) - c.ep.MaybeAddToPeerstore(ctx, info, c.peerstoreTTL) + c.ep.MaybeAddToPeerstore(ctx, info, c.cfg.PeerstoreTTL) if isNew { c.outboundEvents <- &KademliaRoutingUpdatedEvent[K, A]{ @@ -273,6 +421,26 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K return nil } +// FindNodeRequestFunc is a function that creates a request to find the supplied node id +// TODO: consider this being a first class method of the Endpoint +type FindNodeRequestFunc[K kad.Key[K], A kad.Address[A]] func(kad.NodeID[K]) (address.ProtocolID, kad.Request[K, A]) + +// Bootstrap instructs the coordinator to begin bootstrapping the routing table. +// While bootstrap is in progress, no other queries will make progress. +func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K], fn FindNodeRequestFunc[K, A]) error { + protoID, msg := fn(c.self) + + bev := &routing.EventBootstrapStart[K, A]{ + ProtocolID: protoID, + Message: msg, + KnownClosestNodes: seeds, + } + + c.bootstrapEvents.Enqueue(ctx, bev) + + return nil +} + // Kademlia events emitted by the Coordinator, intended for consumption by clients of the package type KademliaEvent interface { @@ -304,62 +472,19 @@ type KademliaUnroutablePeerEvent[K kad.Key[K]] struct{} type KademliaRoutablePeerEvent[K kad.Key[K]] struct{} +// KademliaBootstrapFinishedEvent is emitted by the coordinator when a bootstrap has finished, either through +// running to completion or by being canceled. +type KademliaBootstrapFinishedEvent struct { + Stats query.QueryStats +} + // kademliaEvent() ensures that only Kademlia events can be assigned to a KademliaEvent. func (*KademliaRoutingUpdatedEvent[K, A]) kademliaEvent() {} func (*KademliaOutboundQueryProgressedEvent[K, A]) kademliaEvent() {} func (*KademliaUnroutablePeerEvent[K]) kademliaEvent() {} func (*KademliaRoutablePeerEvent[K]) kademliaEvent() {} func (*KademliaOutboundQueryFinishedEvent) kademliaEvent() {} - -// Internal events for the Coordiinator - -type coordinatorInternalEvent interface { - coordinatorInternalEvent() - Run(ctx context.Context) -} - -type eventUnroutablePeer[K kad.Key[K]] struct { - NodeID kad.NodeID[K] -} - -type eventMessageFailed[K kad.Key[K]] struct { - NodeID kad.NodeID[K] // the node the message was sent to - QueryID query.QueryID // the id of the query that sent the message - Stats query.QueryStats // stats for the query sending the message - Error error // the error that caused the failure, if any -} - -type eventMessageResponse[K kad.Key[K], A kad.Address[A]] struct { - NodeID kad.NodeID[K] // the node the message was sent to - QueryID query.QueryID // the id of the query that sent the message - Response kad.Response[K, A] // the message response sent by the node - Stats query.QueryStats // stats for the query sending the message -} - -type eventAddQuery[K kad.Key[K], A kad.Address[A]] struct { - QueryID query.QueryID - Target K - ProtocolID address.ProtocolID - Message kad.Request[K, A] - KnownClosestPeers []kad.NodeID[K] -} - -type eventStopQuery[K kad.Key[K]] struct { - QueryID query.QueryID -} - -// coordinatorInternalEvent() ensures that only an internal coordinator event can be assigned to the coordinatorInternalEvent interface. -func (*eventUnroutablePeer[K]) coordinatorInternalEvent() {} -func (*eventMessageFailed[K]) coordinatorInternalEvent() {} -func (*eventMessageResponse[K, A]) coordinatorInternalEvent() {} -func (*eventAddQuery[K, A]) coordinatorInternalEvent() {} -func (*eventStopQuery[K]) coordinatorInternalEvent() {} - -func (*eventUnroutablePeer[K]) Run(context.Context) {} -func (*eventMessageFailed[K]) Run(context.Context) {} -func (*eventMessageResponse[K, A]) Run(context.Context) {} -func (*eventAddQuery[K, A]) Run(context.Context) {} -func (*eventStopQuery[K]) Run(context.Context) {} +func (*KademliaBootstrapFinishedEvent) kademliaEvent() {} // var _ scheduler.Scheduler = (*Coordinator[key.Key8])(nil) func (c *Coordinator[K, A]) Clock() clock.Clock { @@ -382,31 +507,9 @@ func (c *Coordinator[K, A]) RemovePlannedAction(ctx context.Context, a event.Pla return c.planner.RemoveAction(ctx, a) } -func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool { - c.moveOverdueActions(ctx) - if a := c.queue.Dequeue(ctx); a != nil { - c.handleInboundEvent(ctx, a) - return true - } - return false -} - -// moveOverdueActions moves all overdue actions from the planner to the queue. -func (c *Coordinator[K, A]) moveOverdueActions(ctx context.Context) { - overdue := c.planner.PopOverdueActions(ctx) - - event.EnqueueMany(ctx, c.queue, overdue) -} - // NextActionTime returns the time of the next action to run, or the current // time if there are actions to be run in the queue, or util.MaxTime if there // are no scheduled to run. func (c *Coordinator[K, A]) NextActionTime(ctx context.Context) time.Time { - c.moveOverdueActions(ctx) - nextScheduled := c.planner.NextActionTime(ctx) - - if !event.Empty(c.queue) { - return c.cfg.Clock.Now() - } - return nextScheduled + return c.cfg.Clock.Now() } diff --git a/coord/coordinator_test.go b/coord/coordinator_test.go index 0e2a341..85c94f4 100644 --- a/coord/coordinator_test.go +++ b/coord/coordinator_test.go @@ -22,14 +22,6 @@ import ( "github.com/plprobelab/go-kademlia/sim" ) -var ( - _ coordinatorInternalEvent = &eventUnroutablePeer[key.Key8]{} - _ coordinatorInternalEvent = &eventMessageFailed[key.Key8]{} - _ coordinatorInternalEvent = &eventMessageResponse[key.Key8, kadtest.StrAddr]{} - _ coordinatorInternalEvent = &eventAddQuery[key.Key8, kadtest.StrAddr]{} - _ coordinatorInternalEvent = &eventStopQuery[key.Key8]{} -) - func setupSimulation(t *testing.T, ctx context.Context) ([]kad.NodeInfo[key.Key8, kadtest.StrAddr], []*sim.Endpoint[key.Key8, kadtest.StrAddr], []kad.RoutingTable[key.Key8, kad.NodeID[key.Key8]], *sim.LiteSimulator) { // create node identifiers nodeCount := 4 @@ -116,10 +108,12 @@ const peerstoreTTL = 10 * time.Minute var protoID = address.ProtocolID("/statemachine/1.0.0") // protocol ID for the test // expectEventType selects on the event channel until an event of the expected type is sent. -func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) { +func expectEventType(t *testing.T, ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) { + t.Helper() for { select { case ev := <-events: + t.Logf("saw event: %T\n", ev) if reflect.TypeOf(ev) == reflect.TypeOf(expected) { return ev, nil } @@ -129,23 +123,6 @@ func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected } } -// Ctx returns a Context and a CancelFunc. The context will be -// cancelled just before the test binary deadline (as -// specified by the -timeout flag when running the test). The -// CancelFunc may be called to cancel the context earlier than -// the deadline. -func Ctx(t *testing.T) (context.Context, context.CancelFunc) { - t.Helper() - - deadline, ok := t.Deadline() - if !ok { - deadline = time.Now().Add(time.Minute) - } else { - deadline = deadline.Add(-time.Second) - } - return context.WithDeadline(context.Background(), deadline) -} - func TestConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { cfg := DefaultConfig() @@ -157,10 +134,42 @@ func TestConfigValidate(t *testing.T) { cfg.Clock = nil require.Error(t, cfg.Validate()) }) + + t.Run("query concurrency positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.QueryConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.QueryConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("query timeout positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.QueryTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.QueryTimeout = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request concurrency positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.RequestConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.QueryConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request timeout positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.RequestTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.RequestTimeout = -1 + require.Error(t, cfg.Validate()) + }) } func TestExhaustiveQuery(t *testing.T) { - ctx, cancel := Ctx(t) + ctx, cancel := kadtest.Ctx(t) defer cancel() nodes, eps, rts, siml := setupSimulation(t, ctx) @@ -201,7 +210,7 @@ func TestExhaustiveQuery(t *testing.T) { } // the query run by the coordinator should have received a response from nodes[1] - ev, err := expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -209,7 +218,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have received a response from nodes[2] - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -217,7 +226,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have received a response from nodes[3] - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -225,7 +234,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have completed - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{}) require.NoError(t, err) require.IsType(t, &KademliaOutboundQueryFinishedEvent{}, ev) @@ -237,7 +246,7 @@ func TestExhaustiveQuery(t *testing.T) { } func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { - ctx, cancel := Ctx(t) + ctx, cancel := kadtest.Ctx(t) defer cancel() nodes, eps, rts, siml := setupSimulation(t, ctx) @@ -279,7 +288,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { // the query run by the coordinator should have received a response from nodes[1] with closer nodes // nodes[0] and nodes[2] which should trigger a routing table update - ev, err := expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err := expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev := ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]) @@ -289,13 +298,93 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { // the query continues and should have received a response from nodes[2] with closer nodes // nodes[1] and nodes[3] which should trigger a routing table update - ev, err = expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]) require.Equal(t, nodes[3].ID(), tev.NodeInfo.ID()) // the query run by the coordinator should have completed - _, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{}) + _, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{}) require.NoError(t, err) } + +var findNodeFn = func(n kad.NodeID[key.Key8]) (address.ProtocolID, kad.Request[key.Key8, kadtest.StrAddr]) { + return protoID, sim.NewRequest[key.Key8, kadtest.StrAddr](n.Key()) +} + +func TestBootstrap(t *testing.T) { + ctx, cancel := kadtest.Ctx(t) + defer cancel() + + nodes, eps, rts, siml := setupSimulation(t, ctx) + + clk := siml.Clock() + + ccfg := DefaultConfig() + ccfg.Clock = clk + ccfg.PeerstoreTTL = peerstoreTTL + + go func(ctx context.Context) { + for { + select { + case <-time.After(10 * time.Millisecond): + siml.Run(ctx) + case <-ctx.Done(): + return + } + } + }(ctx) + + self := nodes[0].ID() + c, err := NewCoordinator[key.Key8, kadtest.StrAddr](self, eps[0], rts[0], ccfg) + if err != nil { + log.Fatalf("unexpected error creating coordinator: %v", err) + } + siml.Add(c) + events := c.Events() + + queryID := query.QueryID("bootstrap") + + seeds := []kad.NodeID[key.Key8]{ + nodes[1].ID(), + } + err = c.Bootstrap(ctx, seeds, findNodeFn) + if err != nil { + t.Fatalf("failed to initiate bootstrap: %v", err) + } + + // the query run by the coordinator should have received a response from nodes[1] + ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have received a response from nodes[2] + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[2].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have received a response from nodes[3] + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[3].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have completed + ev, err = expectEventType(t, ctx, events, &KademliaBootstrapFinishedEvent{}) + require.NoError(t, err) + + require.IsType(t, &KademliaBootstrapFinishedEvent{}, ev) + tevf := ev.(*KademliaBootstrapFinishedEvent) + require.Equal(t, 3, tevf.Stats.Requests) + require.Equal(t, 3, tevf.Stats.Success) + require.Equal(t, 0, tevf.Stats.Failure) +} diff --git a/internal/kadtest/context.go b/internal/kadtest/context.go new file mode 100644 index 0000000..e11e032 --- /dev/null +++ b/internal/kadtest/context.go @@ -0,0 +1,24 @@ +package kadtest + +import ( + "context" + "testing" + "time" +) + +// Ctx returns a Context and a CancelFunc. The context will be +// cancelled just before the test binary deadline (as +// specified by the -timeout flag when running the test). The +// CancelFunc may be called to cancel the context earlier than +// the deadline. +func Ctx(t *testing.T) (context.Context, context.CancelFunc) { + t.Helper() + + deadline, ok := t.Deadline() + if !ok { + deadline = time.Now().Add(time.Minute) + } else { + deadline = deadline.Add(-time.Second) + } + return context.WithDeadline(context.Background(), deadline) +} diff --git a/query/pool.go b/query/pool.go index 7030bad..7e6a469 100644 --- a/query/pool.go +++ b/query/pool.go @@ -154,9 +154,8 @@ func (p *Pool[K, A]) Advance(ctx context.Context, ev PoolEvent) PoolState { } eventQueryID = qry.id } - case nil: - // TEMPORARY: no event to process - // TODO: introduce EventPoolPoll? + case *EventPoolPoll: + // no event to process default: panic(fmt.Sprintf("unexpected event: %T", tev)) } @@ -279,7 +278,7 @@ type PoolState interface { // StatePoolIdle indicates that the pool is idle, i.e. there are no queries to process. type StatePoolIdle struct{} -// StatePoolQueryMessage indicates that at a query is waiting to message a node. +// StatePoolQueryMessage indicates that a pool query is waiting to message a node. type StatePoolQueryMessage[K kad.Key[K], A kad.Address[A]] struct { QueryID QueryID NodeID kad.NodeID[K] @@ -302,7 +301,7 @@ type StatePoolQueryFinished struct { Stats QueryStats } -// StatePoolQueryTimeout indicates that at a query has timed out. +// StatePoolQueryTimeout indicates that a query has timed out. type StatePoolQueryTimeout struct { QueryID QueryID Stats QueryStats @@ -349,8 +348,12 @@ type EventPoolMessageFailure[K kad.Key[K]] struct { Error error // the error that caused the failure, if any } +// EventPoolPoll is an event that signals the pool that it can perform housekeeping work such as time out queries. +type EventPoolPoll struct{} + // poolEvent() ensures that only Pool events can be assigned to the PoolEvent interface. func (*EventPoolAddQuery[K, A]) poolEvent() {} func (*EventPoolStopQuery) poolEvent() {} func (*EventPoolMessageResponse[K, A]) poolEvent() {} func (*EventPoolMessageFailure[K]) poolEvent() {} +func (*EventPoolPoll) poolEvent() {} diff --git a/query/pool_test.go b/query/pool_test.go index ea96184..3e8da96 100644 --- a/query/pool_test.go +++ b/query/pool_test.go @@ -76,7 +76,7 @@ func TestPoolStartsIdle(t *testing.T) { p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg) require.NoError(t, err) - state := p.Advance(ctx, nil) + state := p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolIdle{}, state) } @@ -90,7 +90,7 @@ func TestPoolStopWhenNoQueries(t *testing.T) { p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg) require.NoError(t, err) - state := p.Advance(ctx, &EventPoolStopQuery{}) + state := p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolIdle{}, state) } @@ -137,7 +137,7 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) { require.Equal(t, msg, st.Message) // now the pool reports that it is waiting - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolWaitingWithCapacity{}, state) } @@ -244,14 +244,14 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) { require.Equal(t, b, st.NodeID) // advance the pool again, the first query should continue its operation in preference to starting the new query - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) require.Equal(t, c, st.NodeID) // advance the pool again, the first query is at capacity so the second query can start - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID2, st.QueryID) @@ -362,7 +362,7 @@ func TestPoolRespectsConcurrency(t *testing.T) { require.Equal(t, queryID1, stf.QueryID) // advancing pool again allows query 3 to start - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID3, st.QueryID) diff --git a/query/query.go b/query/query.go index cad8ff3..f637f21 100644 --- a/query/query.go +++ b/query/query.go @@ -373,6 +373,7 @@ func (q *Query[K, A]) onMessageFailure(ctx context.Context, node kad.NodeID[K]) switch st := ni.State.(type) { case *StateNodeWaiting: q.inFlight-- + q.stats.Failure++ case *StateNodeUnresponsive: // update node state to failed break diff --git a/routing/bootstrap.go b/routing/bootstrap.go new file mode 100644 index 0000000..5dc4135 --- /dev/null +++ b/routing/bootstrap.go @@ -0,0 +1,252 @@ +package routing + +import ( + "context" + "fmt" + "time" + + "github.com/benbjohnson/clock" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/kaderr" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/util" +) + +type Bootstrap[K kad.Key[K], A kad.Address[A]] struct { + // self is the node id of the system the bootstrap is running on + self kad.NodeID[K] + + // qry is the query used by the bootstrap process + qry *query.Query[K, A] + + // cfg is a copy of the optional configuration supplied to the Bootstrap + cfg BootstrapConfig[K, A] +} + +// BootstrapConfig specifies optional configuration for a Bootstrap +type BootstrapConfig[K kad.Key[K], A kad.Address[A]] struct { + Timeout time.Duration // the time to wait before terminating a query that is not making progress + RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight + RequestTimeout time.Duration // the timeout queries should use for contacting a single node + Clock clock.Clock // a clock that may replaced by a mock when testing +} + +// Validate checks the configuration options and returns an error if any have invalid values. +func (cfg *BootstrapConfig[K, A]) Validate() error { + if cfg.Clock == nil { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("clock must not be nil"), + } + } + + if cfg.Timeout < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("timeout must be greater than zero"), + } + } + + if cfg.RequestConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("request concurrency must be greater than zero"), + } + } + + if cfg.RequestTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("request timeout must be greater than zero"), + } + } + + return nil +} + +// DefaultBootstrapConfig returns the default configuration options for a Bootstrap. +// Options may be overridden before passing to NewBootstrap +func DefaultBootstrapConfig[K kad.Key[K], A kad.Address[A]]() *BootstrapConfig[K, A] { + return &BootstrapConfig[K, A]{ + Clock: clock.New(), // use standard time + Timeout: 5 * time.Minute, + RequestConcurrency: 3, + RequestTimeout: time.Minute, + } +} + +func NewBootstrap[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], cfg *BootstrapConfig[K, A]) (*Bootstrap[K, A], error) { + if cfg == nil { + cfg = DefaultBootstrapConfig[K, A]() + } else if err := cfg.Validate(); err != nil { + return nil, err + } + + return &Bootstrap[K, A]{ + self: self, + cfg: *cfg, + }, nil +} + +// Advance advances the state of the bootstrap by attempting to advance its query if running. +func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState { + ctx, span := util.StartSpan(ctx, "Bootstrap.Advance") + defer span.End() + + switch tev := ev.(type) { + case *EventBootstrapStart[K, A]: + // TODO: ignore start event if query is already in progress + iter := query.NewClosestNodesIter(b.self.Key()) + + qryCfg := query.DefaultQueryConfig[K]() + qryCfg.Clock = b.cfg.Clock + qryCfg.Concurrency = b.cfg.RequestConcurrency + qryCfg.RequestTimeout = b.cfg.RequestTimeout + + queryID := query.QueryID("bootstrap") + + qry, err := query.NewQuery[K](b.self, queryID, tev.ProtocolID, tev.Message, iter, tev.KnownClosestNodes, qryCfg) + if err != nil { + // TODO: don't panic + panic(err) + } + b.qry = qry + return b.advanceQuery(ctx, nil) + + case *EventBootstrapMessageResponse[K, A]: + return b.advanceQuery(ctx, &query.EventQueryMessageResponse[K, A]{ + NodeID: tev.NodeID, + Response: tev.Response, + }) + case *EventBootstrapMessageFailure[K]: + return b.advanceQuery(ctx, &query.EventQueryMessageFailure[K]{ + NodeID: tev.NodeID, + Error: tev.Error, + }) + + case *EventBootstrapPoll: + // ignore, nothing to do + default: + panic(fmt.Sprintf("unexpected event: %T", tev)) + } + + if b.qry != nil { + return b.advanceQuery(ctx, nil) + } + + return &StateBootstrapIdle{} +} + +func (b *Bootstrap[K, A]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState { + state := b.qry.Advance(ctx, qev) + switch st := state.(type) { + case *query.StateQueryWaitingMessage[K, A]: + return &StateBootstrapMessage[K, A]{ + QueryID: st.QueryID, + Stats: st.Stats, + NodeID: st.NodeID, + ProtocolID: st.ProtocolID, + Message: st.Message, + } + case *query.StateQueryFinished: + return &StateBootstrapFinished{ + Stats: st.Stats, + } + case *query.StateQueryWaitingAtCapacity: + elapsed := b.cfg.Clock.Since(st.Stats.Start) + if elapsed > b.cfg.Timeout { + return &StateBootstrapTimeout{ + Stats: st.Stats, + } + } + return &StateBootstrapWaiting{ + Stats: st.Stats, + } + case *query.StateQueryWaitingWithCapacity: + elapsed := b.cfg.Clock.Since(st.Stats.Start) + if elapsed > b.cfg.Timeout { + return &StateBootstrapTimeout{ + Stats: st.Stats, + } + } + return &StateBootstrapWaiting{ + Stats: st.Stats, + } + default: + panic(fmt.Sprintf("unexpected state: %T", st)) + } +} + +// BootstrapState is the state of a bootstrap. +type BootstrapState interface { + bootstrapState() +} + +// StateBootstrapMessage indicates that the bootstrap query is waiting to message a node. +type StateBootstrapMessage[K kad.Key[K], A kad.Address[A]] struct { + QueryID query.QueryID + NodeID kad.NodeID[K] + ProtocolID address.ProtocolID + Message kad.Request[K, A] + Stats query.QueryStats +} + +// StateBootstrapIdle indicates that the bootstrap is not running its query. +type StateBootstrapIdle struct{} + +// StateBootstrapFinished indicates that the bootstrap has finished. +type StateBootstrapFinished struct { + Stats query.QueryStats +} + +// StateBootstrapTimeout indicates that the bootstrap query has timed out. +type StateBootstrapTimeout struct { + Stats query.QueryStats +} + +// StateBootstrapWaiting indicates that the bootstrap query is waiting for a response. +type StateBootstrapWaiting struct { + Stats query.QueryStats +} + +// bootstrapState() ensures that only Bootstrap states can be assigned to a BootstrapState. +func (*StateBootstrapMessage[K, A]) bootstrapState() {} +func (*StateBootstrapIdle) bootstrapState() {} +func (*StateBootstrapFinished) bootstrapState() {} +func (*StateBootstrapTimeout) bootstrapState() {} +func (*StateBootstrapWaiting) bootstrapState() {} + +// BootstrapEvent is an event intended to advance the state of a bootstrap. +type BootstrapEvent interface { + bootstrapEvent() +} + +// EventBootstrapPoll is an event that signals the bootstrap that it can perform housekeeping work such as time out queries. +type EventBootstrapPoll struct{} + +// EventBootstrapStart is an event that attempts to start a new bootstrap +type EventBootstrapStart[K kad.Key[K], A kad.Address[A]] struct { + ProtocolID address.ProtocolID + Message kad.Request[K, A] + KnownClosestNodes []kad.NodeID[K] +} + +// EventBootstrapMessageResponse notifies a bootstrap that a sent message has received a successful response. +type EventBootstrapMessageResponse[K kad.Key[K], A kad.Address[A]] struct { + NodeID kad.NodeID[K] // the node the message was sent to + Response kad.Response[K, A] // the message response sent by the node +} + +// EventBootstrapMessageFailure notifiesa bootstrap that an attempt to send a message has failed. +type EventBootstrapMessageFailure[K kad.Key[K]] struct { + NodeID kad.NodeID[K] // the node the message was sent to + Error error // the error that caused the failure, if any +} + +// bootstrapEvent() ensures that only Bootstrap events can be assigned to the BootstrapEvent interface. +func (*EventBootstrapPoll) bootstrapEvent() {} +func (*EventBootstrapStart[K, A]) bootstrapEvent() {} +func (*EventBootstrapMessageResponse[K, A]) bootstrapEvent() {} +func (*EventBootstrapMessageFailure[K]) bootstrapEvent() {} diff --git a/routing/bootstrap_test.go b/routing/bootstrap_test.go new file mode 100644 index 0000000..43dddf7 --- /dev/null +++ b/routing/bootstrap_test.go @@ -0,0 +1,242 @@ +package routing + +import ( + "context" + "testing" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/require" + + "github.com/plprobelab/go-kademlia/internal/kadtest" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" +) + +func TestBootstrapConfigValidate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + require.NoError(t, cfg.Validate()) + }) + + t.Run("clock is not nil", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("timeout positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Timeout = 0 + require.Error(t, cfg.Validate()) + cfg.Timeout = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request concurrency positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.RequestConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.RequestConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request timeout positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.RequestTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.RequestTimeout = -1 + require.Error(t, cfg.Validate()) + }) +} + +func TestBootstrapStartsIdle(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + state := bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapIdle{}, state) +} + +func TestBootstrapStart(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + }) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + + // the query should attempt to contact the node it was given + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + + // the query should be the one just added + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + + // the query should attempt to contact the node it was given + require.Equal(t, a, st.NodeID) + + // with the correct protocol ID + require.Equal(t, protocolID, st.ProtocolID) + + // with the correct message + require.Equal(t, msg, st.Message) + + // now the bootstrap reports that it is waiting + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapWaiting{}, state) +} + +func TestBootstrapMessageResponse(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + }) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + + // the bootstrap should attempt to contact the node it was given + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, a, st.NodeID) + + // notify bootstrap that node was contacted successfully, but no closer nodes + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: a, + }) + + // bootstrap should respond that its query has finished + require.IsType(t, &StateBootstrapFinished{}, state) + + stf := state.(*StateBootstrapFinished) + require.Equal(t, 1, stf.Stats.Requests) + require.Equal(t, 1, stf.Stats.Success) +} + +func TestBootstrapProgress(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + cfg.RequestConcurrency = 3 // 1 less than the 4 nodes to be visited + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + b := kadtest.NewID(key.Key8(0b00001000)) // 8 + c := kadtest.NewID(key.Key8(0b00010000)) // 16 + d := kadtest.NewID(key.Key8(0b00100000)) // 32 + + // ensure the order of the known nodes + require.True(t, self.Key().Xor(a.Key()).Compare(self.Key().Xor(b.Key())) == -1) + require.True(t, self.Key().Xor(b.Key()).Compare(self.Key().Xor(c.Key())) == -1) + require.True(t, self.Key().Xor(c.Key()).Compare(self.Key().Xor(d.Key())) == -1) + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{d, a, b, c}, + }) + + // the bootstrap should attempt to contact the closest node it was given + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, a, st.NodeID) + + // next the bootstrap attempts to contact second nearest node + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, b, st.NodeID) + + // next the bootstrap attempts to contact third nearest node + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, c, st.NodeID) + + // now the bootstrap should be waiting since it is at request capacity + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that node was contacted successfully, but no closer nodes + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: a, + }) + + // now the bootstrap has capacity to contact fourth nearest node + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, d, st.NodeID) + + // notify bootstrap that a node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: b, + }) + + // bootstrap should respond that it is waiting for messages + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that a node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: c, + }) + + // bootstrap should respond that it is waiting for last message + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that the final node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: d, + }) + + // bootstrap should respond that its query has finished + require.IsType(t, &StateBootstrapFinished{}, state) + + stf := state.(*StateBootstrapFinished) + require.Equal(t, 4, stf.Stats.Requests) + require.Equal(t, 4, stf.Stats.Success) +}