From accf5eaaab0fcc91cd5960acfa21ec6bf82406ff Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 16 Aug 2023 16:18:49 +0100 Subject: [PATCH] Add bootstrap state machine (#90) This adds a state machine for running the bootstrap query described in https://github.com/plprobelab/go-kademlia/issues/45. The state machine is very simple (it runs a query that attempts to find the self node), but it drives the design of the coordinator in a number of ways: - The coordinator now manages two state machines: bootstrap and user queries. It enforces the constraint that no user queries can be progressed while the bootstrap is running. This establishes the pattern for managing a set of state machines. - Priority is simple: the coordinator first attempts to advance the bootstrap state machine and only if it is idle, indicating the bootstrap has no further work, will it proceed to advance the query pool state machine. - This changes the design of the state machine. Previously the state machine reacted to an incoming event passed to the `Advance` method. However this causes complications in the presence of multiple state machines. What should happen if the bootstrap is waiting for a message response but the caller attempts to start a user query? The coordinator needs to remember the "add query" event until the bootstrap is complete, so that event would need to remain on the coordinator's queue. But the coordinator needs to read from the queue to detect if an incoming message response is available for the bootstrap, without removing the "add query" event. Thus we need separate queues for the two state machines. Rather than manage those in the coordinator, we give each state machine its own event queue. External callers enqueue events and the state machine dequeues the next one each time it attempts to advance state. - ~The above change leads to a simple interface for state machines: an Enqueue method for notifying a new event and an Advance method that returns the next state.~ - **update 2023-08-15**: The above change leads to a simple interface for state machines:an Advance method that accepts an event and returns the next state. - Coordinator methods like StartQuery and StopQuery now enqueue an event for query pool - A new Bootstrap method enqueues an event for bootstrap state machine - **update 2023-08-15**: the queues for the state machines are managed by the coordinator, which allows state machines to be more cleanly composed into hierarchies (for example, the state machine managing the routing table include queue will use a query pool state machine and this change eliminates the need to manage event queues of child state machines) There are still some ugly parts which I may be able to fix within the scope of this PR: - the coordinator implements a number of unused methods to conform to the scheduler.Scheduler interface. All that is needed is the RunOne method. - ~the name of the bootstrap query needs to be factored into a constant or remembered by the coordinator~ - coordinator now uses a separate callback to deal with bootstrap query instead of checking query id - ~events are action.Action interfaces so they can use the standard queue interface. The Run method is unused. The queue could simply be a channel or we could modify the queue interface to be parameterised by type, allowing us to have a queue of BootstrapEvents~ (**removed 2023-08-15**) - currently the bootstrap method expects a function that generates a FindNode request for the given node. FindNode is such a fundamental DHT operation that I think it should be provided as a method by the Endpoint Fixes #47 --- coord/coordinator.go | 433 ++++++++++++++++++++++-------------- coord/coordinator_test.go | 159 ++++++++++--- internal/kadtest/context.go | 24 ++ query/pool.go | 13 +- query/pool_test.go | 12 +- query/query.go | 1 + routing/bootstrap.go | 252 +++++++++++++++++++++ routing/bootstrap_test.go | 242 ++++++++++++++++++++ 8 files changed, 925 insertions(+), 211 deletions(-) create mode 100644 internal/kadtest/context.go create mode 100644 routing/bootstrap.go create mode 100644 routing/bootstrap_test.go diff --git a/coord/coordinator.go b/coord/coordinator.go index b15b050..3eef338 100644 --- a/coord/coordinator.go +++ b/coord/coordinator.go @@ -15,12 +15,49 @@ import ( "github.com/plprobelab/go-kademlia/network/address" "github.com/plprobelab/go-kademlia/network/endpoint" "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/routing" "github.com/plprobelab/go-kademlia/util" ) +// A StateMachine progresses through a set of states in response to transition events. +type StateMachine[S any, E any] interface { + // Advance advances the state of the state machine. + Advance(context.Context, E) S +} + +// eventQueue is a bounded, typed queue for events +// NOTE: this type is incompatible with the semantics of event.Queue which blocks on Dequeue +type eventQueue[E any] struct { + events chan E +} + +func newEventQueue[E any](capacity int) *eventQueue[E] { + return &eventQueue[E]{ + events: make(chan E, capacity), + } +} + +// Enqueue adds an event to the queue. It blocks if the queue is at capacity. +func (q *eventQueue[E]) Enqueue(ctx context.Context, e E) { + q.events <- e +} + +// Dequeue reads an event from the queue. It returns the event and a true value +// if an event was read or the zero value if the event type and false if no event +// was read. This method is non-blocking. +func (q *eventQueue[E]) Dequeue(ctx context.Context) (E, bool) { + select { + case e := <-q.events: + return e, true + default: + var v E + return v, false + } +} + // A Coordinator coordinates the state machines that comprise a Kademlia DHT -// Currently this is only queries but will expand to include other state machines such as routing table refresh, -// and reproviding. +// Currently this is only queries and bootstrapping but will expand to include other state machines such as +// routing table refresh, and reproviding. type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // self is the node id of the system the coordinator is running on self kad.NodeID[K] @@ -28,7 +65,17 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // cfg is a copy of the optional configuration supplied to the coordinator cfg Config - qp *query.Pool[K, A] + // pool is the query pool state machine, responsible for running user-submitted queries + pool StateMachine[query.PoolState, query.PoolEvent] + + // poolEvents is a fifo queue of events that are to be processed by the pool state machine + poolEvents *eventQueue[query.PoolEvent] + + // bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table + bootstrap StateMachine[routing.BootstrapState, routing.BootstrapEvent] + + // bootstrapEvents is a fifo queue of events that are to be processed by the bootstrap state machine + bootstrapEvents *eventQueue[routing.BootstrapEvent] // rt is the routing table used to look up nodes by distance rt kad.RoutingTable[K, kad.NodeID[K]] @@ -36,9 +83,10 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { // ep is the message endpoint used to send requests ep endpoint.Endpoint[K, A] - peerstoreTTL time.Duration + // queue not used + queue event.EventQueue - queue event.EventQueue + // planner not used planner event.AwareActionPlanner outboundEvents chan KademliaEvent @@ -47,10 +95,15 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct { const DefaultChanqueueCapacity = 1024 type Config struct { - // TODO: review if this is needed here PeerstoreTTL time.Duration // duration for which a peer is kept in the peerstore Clock clock.Clock // a clock that may replaced by a mock when testing + + QueryConcurrency int // the maximum number of queries that may be waiting for message responses at any one time + QueryTimeout time.Duration // the time to wait before terminating a query that is not making progress + + RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight + RequestTimeout time.Duration // the timeout queries should use for contacting a single node } // Validate checks the configuration options and returns an error if any have invalid values. @@ -62,13 +115,43 @@ func (cfg *Config) Validate() error { } } + if cfg.QueryConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("query concurrency must be greater than zero"), + } + } + if cfg.QueryTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("query timeout must be greater than zero"), + } + } + + if cfg.RequestConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("request concurrency must be greater than zero"), + } + } + + if cfg.RequestTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("request timeout must be greater than zero"), + } + } return nil } func DefaultConfig() *Config { return &Config{ - Clock: clock.New(), // use standard time - PeerstoreTTL: 10 * time.Minute, + Clock: clock.New(), // use standard time + PeerstoreTTL: 10 * time.Minute, + QueryConcurrency: 3, + QueryTimeout: 5 * time.Minute, + RequestConcurrency: 3, + RequestTimeout: time.Minute, } } @@ -81,20 +164,38 @@ func NewCoordinator[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], ep endpo qpCfg := query.DefaultPoolConfig() qpCfg.Clock = cfg.Clock + qpCfg.Concurrency = cfg.QueryConcurrency + qpCfg.Timeout = cfg.QueryTimeout + qpCfg.QueryConcurrency = cfg.RequestConcurrency + qpCfg.RequestTimeout = cfg.RequestTimeout qp, err := query.NewPool[K, A](self, qpCfg) if err != nil { return nil, fmt.Errorf("query pool: %w", err) } + + bootstrapCfg := routing.DefaultBootstrapConfig[K, A]() + bootstrapCfg.Clock = cfg.Clock + bootstrapCfg.Timeout = cfg.QueryTimeout + bootstrapCfg.RequestConcurrency = cfg.RequestConcurrency + bootstrapCfg.RequestTimeout = cfg.RequestTimeout + + bootstrap, err := routing.NewBootstrap(self, bootstrapCfg) + if err != nil { + return nil, fmt.Errorf("query pool: %w", err) + } return &Coordinator[K, A]{ - self: self, - cfg: *cfg, - ep: ep, - rt: rt, - qp: qp, - outboundEvents: make(chan KademliaEvent, 20), - queue: event.NewChanQueue(DefaultChanqueueCapacity), - planner: event.NewSimplePlanner(cfg.Clock), + self: self, + cfg: *cfg, + ep: ep, + rt: rt, + pool: qp, + poolEvents: newEventQueue[query.PoolEvent](20), // 20 is abitrary, move to config + bootstrap: bootstrap, + bootstrapEvents: newEventQueue[routing.BootstrapEvent](20), // 20 is abitrary, move to config + outboundEvents: make(chan KademliaEvent, 20), + queue: event.NewChanQueue(DefaultChanqueueCapacity), + planner: event.NewSimplePlanner(cfg.Clock), }, nil } @@ -102,71 +203,51 @@ func (c *Coordinator[K, A]) Events() <-chan KademliaEvent { return c.outboundEvents } -func (c *Coordinator[K, A]) handleInboundEvent(ctx context.Context, ev interface{}) { - switch tev := ev.(type) { - case *eventUnroutablePeer[K]: - // TODO: remove from routing table - c.dispatchQueryPoolEvent(ctx, nil) +func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool { + ctx, span := util.StartSpan(ctx, "Coordinator.RunOne") + defer span.End() - case *eventMessageFailed[K]: - qev := &query.EventPoolMessageFailure[K]{ - QueryID: tev.QueryID, - NodeID: tev.NodeID, - Error: tev.Error, - } + // Give the bootstrap state machine priority + // No queries can be run while a bootstrap is in progress + bev, ok := c.bootstrapEvents.Dequeue(ctx) + if !ok { + bev = &routing.EventBootstrapPoll{} + } - c.dispatchQueryPoolEvent(ctx, qev) + bstate := c.bootstrap.Advance(ctx, bev) + switch st := bstate.(type) { + case *routing.StateBootstrapMessage[K, A]: + c.sendBootstrapMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + return true - case *eventMessageResponse[K, A]: - if tev.Response != nil { - candidates := tev.Response.CloserNodes() - if len(candidates) > 0 { - // ignore error here - c.AddNodes(ctx, candidates) - } - } + case *routing.StateBootstrapWaiting: + // bootstrap waiting for a message response, don't proceed with other state machines + return false - // notify caller so they have chance to stop query - c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ - NodeID: tev.NodeID, - QueryID: tev.QueryID, - Response: tev.Response, - Stats: tev.Stats, + case *routing.StateBootstrapFinished: + c.outboundEvents <- &KademliaBootstrapFinishedEvent{ + Stats: st.Stats, } + return true - qev := &query.EventPoolMessageResponse[K, A]{ - QueryID: tev.QueryID, - NodeID: tev.NodeID, - Response: tev.Response, - } - c.dispatchQueryPoolEvent(ctx, qev) - case *eventAddQuery[K, A]: - qev := &query.EventPoolAddQuery[K, A]{ - QueryID: tev.QueryID, - Target: tev.Target, - ProtocolID: tev.ProtocolID, - Message: tev.Message, - KnownClosestNodes: tev.KnownClosestPeers, - } - c.dispatchQueryPoolEvent(ctx, qev) - case *eventStopQuery[K]: - qev := &query.EventPoolStopQuery{ - QueryID: tev.QueryID, - } - c.dispatchQueryPoolEvent(ctx, qev) + case *routing.StateBootstrapIdle: + // bootstrap not running, can proceed to other state machines + break default: - panic(fmt.Sprintf("unexpected event: %T", tev)) + panic(fmt.Sprintf("unexpected bootstrap state: %T", st)) } -} -func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query.PoolEvent) { - ctx, span := util.StartSpan(ctx, "Coordinator.dispatchQueryPoolEvent") - defer span.End() - // attempt to advance the query state machine - state := c.qp.Advance(ctx, ev) + // Attempt to advance an outbound query + pev, ok := c.poolEvents.Dequeue(ctx) + if !ok { + pev = &query.EventPoolPoll{} + } + + state := c.pool.Advance(ctx, pev) switch st := state.(type) { case *query.StatePoolQueryMessage[K, A]: - c.attemptSendMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats) + return true case *query.StatePoolWaitingAtCapacity: // TODO case *query.StatePoolWaitingWithCapacity: @@ -176,6 +257,7 @@ func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query QueryID: st.QueryID, Stats: st.Stats, } + return true // TODO case *query.StatePoolQueryTimeout: @@ -183,28 +265,83 @@ func (c *Coordinator[K, A]) dispatchQueryPoolEvent(ctx context.Context, ev query case *query.StatePoolIdle: // TODO default: - panic(fmt.Sprintf("unexpected state: %T", st)) + panic(fmt.Sprintf("unexpected pool state: %T", st)) } + + return false } -func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { - ctx, span := util.StartSpan(ctx, "Coordinator.attemptSendMessage") +func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { + ctx, span := util.StartSpan(ctx, "Coordinator.sendQueryMessage") defer span.End() onSendError := func(ctx context.Context, err error) { if errors.Is(err, endpoint.ErrCannotConnect) { // here we can notify that the peer is unroutable, which would feed into peerstore and routing table - c.queue.Enqueue(ctx, &eventUnroutablePeer[K]{ - NodeID: to, - }) + // TODO: remove from routing table return } - c.queue.Enqueue(ctx, &eventMessageFailed[K]{ + + qev := &query.EventPoolMessageFailure[K]{ NodeID: to, QueryID: queryID, - Stats: stats, Error: err, - }) + } + c.poolEvents.Enqueue(ctx, qev) + } + + onMessageResponse := func(ctx context.Context, resp kad.Response[K, A], err error) { + if err != nil { + onSendError(ctx, err) + return + } + + if resp != nil { + candidates := resp.CloserNodes() + if len(candidates) > 0 { + // ignore error here + c.AddNodes(ctx, candidates) + } + } + + // notify caller so they have chance to stop query + c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ + NodeID: to, + QueryID: queryID, + Response: resp, + Stats: stats, + } + + qev := &query.EventPoolMessageResponse[K, A]{ + NodeID: to, + QueryID: queryID, + Response: resp, + } + c.poolEvents.Enqueue(ctx, qev) + } + + err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) + if err != nil { + onSendError(ctx, err) + } +} + +func (c *Coordinator[K, A]) sendBootstrapMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) { + ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapMessage") + defer span.End() + + onSendError := func(ctx context.Context, err error) { + if errors.Is(err, endpoint.ErrCannotConnect) { + // here we can notify that the peer is unroutable, which would feed into peerstore and routing table + // TODO: remove from routing table + return + } + + bev := &routing.EventBootstrapMessageFailure[K]{ + NodeID: to, + Error: err, + } + c.bootstrapEvents.Enqueue(ctx, bev) } onMessageResponse := func(ctx context.Context, resp kad.Response[K, A], err error) { @@ -212,12 +349,28 @@ func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID addr onSendError(ctx, err) return } - c.queue.Enqueue(ctx, &eventMessageResponse[K, A]{ + + if resp != nil { + candidates := resp.CloserNodes() + if len(candidates) > 0 { + // ignore error here + c.AddNodes(ctx, candidates) + } + } + + // notify caller so they have chance to stop query + c.outboundEvents <- &KademliaOutboundQueryProgressedEvent[K, A]{ NodeID: to, QueryID: queryID, Response: resp, Stats: stats, - }) + } + + bev := &routing.EventBootstrapMessageResponse[K, A]{ + NodeID: to, + Response: resp, + } + c.bootstrapEvents.Enqueue(ctx, bev) } err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse) @@ -229,27 +382,22 @@ func (c *Coordinator[K, A]) attemptSendMessage(ctx context.Context, protoID addr func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error { knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20) - ev := &eventAddQuery[K, A]{ + qev := &query.EventPoolAddQuery[K, A]{ QueryID: queryID, Target: msg.Target(), ProtocolID: protocolID, Message: msg, - KnownClosestPeers: knownClosestPeers, + KnownClosestNodes: knownClosestPeers, } - - c.queue.Enqueue(ctx, ev) - // c.inboundEvents <- ev - + c.poolEvents.Enqueue(ctx, qev) return nil } func (c *Coordinator[K, A]) StopQuery(ctx context.Context, queryID query.QueryID) error { - ev := &eventStopQuery[K]{ + qev := &query.EventPoolStopQuery{ QueryID: queryID, } - - c.queue.Enqueue(ctx, ev) - // c.inboundEvents <- ev + c.poolEvents.Enqueue(ctx, qev) return nil } @@ -261,7 +409,7 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K continue } isNew := c.rt.AddNode(info.ID()) - c.ep.MaybeAddToPeerstore(ctx, info, c.peerstoreTTL) + c.ep.MaybeAddToPeerstore(ctx, info, c.cfg.PeerstoreTTL) if isNew { c.outboundEvents <- &KademliaRoutingUpdatedEvent[K, A]{ @@ -273,6 +421,26 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K return nil } +// FindNodeRequestFunc is a function that creates a request to find the supplied node id +// TODO: consider this being a first class method of the Endpoint +type FindNodeRequestFunc[K kad.Key[K], A kad.Address[A]] func(kad.NodeID[K]) (address.ProtocolID, kad.Request[K, A]) + +// Bootstrap instructs the coordinator to begin bootstrapping the routing table. +// While bootstrap is in progress, no other queries will make progress. +func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K], fn FindNodeRequestFunc[K, A]) error { + protoID, msg := fn(c.self) + + bev := &routing.EventBootstrapStart[K, A]{ + ProtocolID: protoID, + Message: msg, + KnownClosestNodes: seeds, + } + + c.bootstrapEvents.Enqueue(ctx, bev) + + return nil +} + // Kademlia events emitted by the Coordinator, intended for consumption by clients of the package type KademliaEvent interface { @@ -304,62 +472,19 @@ type KademliaUnroutablePeerEvent[K kad.Key[K]] struct{} type KademliaRoutablePeerEvent[K kad.Key[K]] struct{} +// KademliaBootstrapFinishedEvent is emitted by the coordinator when a bootstrap has finished, either through +// running to completion or by being canceled. +type KademliaBootstrapFinishedEvent struct { + Stats query.QueryStats +} + // kademliaEvent() ensures that only Kademlia events can be assigned to a KademliaEvent. func (*KademliaRoutingUpdatedEvent[K, A]) kademliaEvent() {} func (*KademliaOutboundQueryProgressedEvent[K, A]) kademliaEvent() {} func (*KademliaUnroutablePeerEvent[K]) kademliaEvent() {} func (*KademliaRoutablePeerEvent[K]) kademliaEvent() {} func (*KademliaOutboundQueryFinishedEvent) kademliaEvent() {} - -// Internal events for the Coordiinator - -type coordinatorInternalEvent interface { - coordinatorInternalEvent() - Run(ctx context.Context) -} - -type eventUnroutablePeer[K kad.Key[K]] struct { - NodeID kad.NodeID[K] -} - -type eventMessageFailed[K kad.Key[K]] struct { - NodeID kad.NodeID[K] // the node the message was sent to - QueryID query.QueryID // the id of the query that sent the message - Stats query.QueryStats // stats for the query sending the message - Error error // the error that caused the failure, if any -} - -type eventMessageResponse[K kad.Key[K], A kad.Address[A]] struct { - NodeID kad.NodeID[K] // the node the message was sent to - QueryID query.QueryID // the id of the query that sent the message - Response kad.Response[K, A] // the message response sent by the node - Stats query.QueryStats // stats for the query sending the message -} - -type eventAddQuery[K kad.Key[K], A kad.Address[A]] struct { - QueryID query.QueryID - Target K - ProtocolID address.ProtocolID - Message kad.Request[K, A] - KnownClosestPeers []kad.NodeID[K] -} - -type eventStopQuery[K kad.Key[K]] struct { - QueryID query.QueryID -} - -// coordinatorInternalEvent() ensures that only an internal coordinator event can be assigned to the coordinatorInternalEvent interface. -func (*eventUnroutablePeer[K]) coordinatorInternalEvent() {} -func (*eventMessageFailed[K]) coordinatorInternalEvent() {} -func (*eventMessageResponse[K, A]) coordinatorInternalEvent() {} -func (*eventAddQuery[K, A]) coordinatorInternalEvent() {} -func (*eventStopQuery[K]) coordinatorInternalEvent() {} - -func (*eventUnroutablePeer[K]) Run(context.Context) {} -func (*eventMessageFailed[K]) Run(context.Context) {} -func (*eventMessageResponse[K, A]) Run(context.Context) {} -func (*eventAddQuery[K, A]) Run(context.Context) {} -func (*eventStopQuery[K]) Run(context.Context) {} +func (*KademliaBootstrapFinishedEvent) kademliaEvent() {} // var _ scheduler.Scheduler = (*Coordinator[key.Key8])(nil) func (c *Coordinator[K, A]) Clock() clock.Clock { @@ -382,31 +507,9 @@ func (c *Coordinator[K, A]) RemovePlannedAction(ctx context.Context, a event.Pla return c.planner.RemoveAction(ctx, a) } -func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool { - c.moveOverdueActions(ctx) - if a := c.queue.Dequeue(ctx); a != nil { - c.handleInboundEvent(ctx, a) - return true - } - return false -} - -// moveOverdueActions moves all overdue actions from the planner to the queue. -func (c *Coordinator[K, A]) moveOverdueActions(ctx context.Context) { - overdue := c.planner.PopOverdueActions(ctx) - - event.EnqueueMany(ctx, c.queue, overdue) -} - // NextActionTime returns the time of the next action to run, or the current // time if there are actions to be run in the queue, or util.MaxTime if there // are no scheduled to run. func (c *Coordinator[K, A]) NextActionTime(ctx context.Context) time.Time { - c.moveOverdueActions(ctx) - nextScheduled := c.planner.NextActionTime(ctx) - - if !event.Empty(c.queue) { - return c.cfg.Clock.Now() - } - return nextScheduled + return c.cfg.Clock.Now() } diff --git a/coord/coordinator_test.go b/coord/coordinator_test.go index 0e2a341..85c94f4 100644 --- a/coord/coordinator_test.go +++ b/coord/coordinator_test.go @@ -22,14 +22,6 @@ import ( "github.com/plprobelab/go-kademlia/sim" ) -var ( - _ coordinatorInternalEvent = &eventUnroutablePeer[key.Key8]{} - _ coordinatorInternalEvent = &eventMessageFailed[key.Key8]{} - _ coordinatorInternalEvent = &eventMessageResponse[key.Key8, kadtest.StrAddr]{} - _ coordinatorInternalEvent = &eventAddQuery[key.Key8, kadtest.StrAddr]{} - _ coordinatorInternalEvent = &eventStopQuery[key.Key8]{} -) - func setupSimulation(t *testing.T, ctx context.Context) ([]kad.NodeInfo[key.Key8, kadtest.StrAddr], []*sim.Endpoint[key.Key8, kadtest.StrAddr], []kad.RoutingTable[key.Key8, kad.NodeID[key.Key8]], *sim.LiteSimulator) { // create node identifiers nodeCount := 4 @@ -116,10 +108,12 @@ const peerstoreTTL = 10 * time.Minute var protoID = address.ProtocolID("/statemachine/1.0.0") // protocol ID for the test // expectEventType selects on the event channel until an event of the expected type is sent. -func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) { +func expectEventType(t *testing.T, ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) { + t.Helper() for { select { case ev := <-events: + t.Logf("saw event: %T\n", ev) if reflect.TypeOf(ev) == reflect.TypeOf(expected) { return ev, nil } @@ -129,23 +123,6 @@ func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected } } -// Ctx returns a Context and a CancelFunc. The context will be -// cancelled just before the test binary deadline (as -// specified by the -timeout flag when running the test). The -// CancelFunc may be called to cancel the context earlier than -// the deadline. -func Ctx(t *testing.T) (context.Context, context.CancelFunc) { - t.Helper() - - deadline, ok := t.Deadline() - if !ok { - deadline = time.Now().Add(time.Minute) - } else { - deadline = deadline.Add(-time.Second) - } - return context.WithDeadline(context.Background(), deadline) -} - func TestConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { cfg := DefaultConfig() @@ -157,10 +134,42 @@ func TestConfigValidate(t *testing.T) { cfg.Clock = nil require.Error(t, cfg.Validate()) }) + + t.Run("query concurrency positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.QueryConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.QueryConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("query timeout positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.QueryTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.QueryTimeout = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request concurrency positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.RequestConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.QueryConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request timeout positive", func(t *testing.T) { + cfg := DefaultConfig() + cfg.RequestTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.RequestTimeout = -1 + require.Error(t, cfg.Validate()) + }) } func TestExhaustiveQuery(t *testing.T) { - ctx, cancel := Ctx(t) + ctx, cancel := kadtest.Ctx(t) defer cancel() nodes, eps, rts, siml := setupSimulation(t, ctx) @@ -201,7 +210,7 @@ func TestExhaustiveQuery(t *testing.T) { } // the query run by the coordinator should have received a response from nodes[1] - ev, err := expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -209,7 +218,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have received a response from nodes[2] - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -217,7 +226,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have received a response from nodes[3] - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) @@ -225,7 +234,7 @@ func TestExhaustiveQuery(t *testing.T) { require.Equal(t, queryID, tev.QueryID) // the query run by the coordinator should have completed - ev, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{}) + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{}) require.NoError(t, err) require.IsType(t, &KademliaOutboundQueryFinishedEvent{}, ev) @@ -237,7 +246,7 @@ func TestExhaustiveQuery(t *testing.T) { } func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { - ctx, cancel := Ctx(t) + ctx, cancel := kadtest.Ctx(t) defer cancel() nodes, eps, rts, siml := setupSimulation(t, ctx) @@ -279,7 +288,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { // the query run by the coordinator should have received a response from nodes[1] with closer nodes // nodes[0] and nodes[2] which should trigger a routing table update - ev, err := expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err := expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev := ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]) @@ -289,13 +298,93 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { // the query continues and should have received a response from nodes[2] with closer nodes // nodes[1] and nodes[3] which should trigger a routing table update - ev, err = expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) + ev, err = expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{}) require.NoError(t, err) tev = ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]) require.Equal(t, nodes[3].ID(), tev.NodeInfo.ID()) // the query run by the coordinator should have completed - _, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{}) + _, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{}) require.NoError(t, err) } + +var findNodeFn = func(n kad.NodeID[key.Key8]) (address.ProtocolID, kad.Request[key.Key8, kadtest.StrAddr]) { + return protoID, sim.NewRequest[key.Key8, kadtest.StrAddr](n.Key()) +} + +func TestBootstrap(t *testing.T) { + ctx, cancel := kadtest.Ctx(t) + defer cancel() + + nodes, eps, rts, siml := setupSimulation(t, ctx) + + clk := siml.Clock() + + ccfg := DefaultConfig() + ccfg.Clock = clk + ccfg.PeerstoreTTL = peerstoreTTL + + go func(ctx context.Context) { + for { + select { + case <-time.After(10 * time.Millisecond): + siml.Run(ctx) + case <-ctx.Done(): + return + } + } + }(ctx) + + self := nodes[0].ID() + c, err := NewCoordinator[key.Key8, kadtest.StrAddr](self, eps[0], rts[0], ccfg) + if err != nil { + log.Fatalf("unexpected error creating coordinator: %v", err) + } + siml.Add(c) + events := c.Events() + + queryID := query.QueryID("bootstrap") + + seeds := []kad.NodeID[key.Key8]{ + nodes[1].ID(), + } + err = c.Bootstrap(ctx, seeds, findNodeFn) + if err != nil { + t.Fatalf("failed to initiate bootstrap: %v", err) + } + + // the query run by the coordinator should have received a response from nodes[1] + ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have received a response from nodes[2] + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[2].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have received a response from nodes[3] + ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{}) + require.NoError(t, err) + + tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[3].ID(), tev.NodeID) + require.Equal(t, queryID, tev.QueryID) + + // the query run by the coordinator should have completed + ev, err = expectEventType(t, ctx, events, &KademliaBootstrapFinishedEvent{}) + require.NoError(t, err) + + require.IsType(t, &KademliaBootstrapFinishedEvent{}, ev) + tevf := ev.(*KademliaBootstrapFinishedEvent) + require.Equal(t, 3, tevf.Stats.Requests) + require.Equal(t, 3, tevf.Stats.Success) + require.Equal(t, 0, tevf.Stats.Failure) +} diff --git a/internal/kadtest/context.go b/internal/kadtest/context.go new file mode 100644 index 0000000..e11e032 --- /dev/null +++ b/internal/kadtest/context.go @@ -0,0 +1,24 @@ +package kadtest + +import ( + "context" + "testing" + "time" +) + +// Ctx returns a Context and a CancelFunc. The context will be +// cancelled just before the test binary deadline (as +// specified by the -timeout flag when running the test). The +// CancelFunc may be called to cancel the context earlier than +// the deadline. +func Ctx(t *testing.T) (context.Context, context.CancelFunc) { + t.Helper() + + deadline, ok := t.Deadline() + if !ok { + deadline = time.Now().Add(time.Minute) + } else { + deadline = deadline.Add(-time.Second) + } + return context.WithDeadline(context.Background(), deadline) +} diff --git a/query/pool.go b/query/pool.go index 7030bad..7e6a469 100644 --- a/query/pool.go +++ b/query/pool.go @@ -154,9 +154,8 @@ func (p *Pool[K, A]) Advance(ctx context.Context, ev PoolEvent) PoolState { } eventQueryID = qry.id } - case nil: - // TEMPORARY: no event to process - // TODO: introduce EventPoolPoll? + case *EventPoolPoll: + // no event to process default: panic(fmt.Sprintf("unexpected event: %T", tev)) } @@ -279,7 +278,7 @@ type PoolState interface { // StatePoolIdle indicates that the pool is idle, i.e. there are no queries to process. type StatePoolIdle struct{} -// StatePoolQueryMessage indicates that at a query is waiting to message a node. +// StatePoolQueryMessage indicates that a pool query is waiting to message a node. type StatePoolQueryMessage[K kad.Key[K], A kad.Address[A]] struct { QueryID QueryID NodeID kad.NodeID[K] @@ -302,7 +301,7 @@ type StatePoolQueryFinished struct { Stats QueryStats } -// StatePoolQueryTimeout indicates that at a query has timed out. +// StatePoolQueryTimeout indicates that a query has timed out. type StatePoolQueryTimeout struct { QueryID QueryID Stats QueryStats @@ -349,8 +348,12 @@ type EventPoolMessageFailure[K kad.Key[K]] struct { Error error // the error that caused the failure, if any } +// EventPoolPoll is an event that signals the pool that it can perform housekeeping work such as time out queries. +type EventPoolPoll struct{} + // poolEvent() ensures that only Pool events can be assigned to the PoolEvent interface. func (*EventPoolAddQuery[K, A]) poolEvent() {} func (*EventPoolStopQuery) poolEvent() {} func (*EventPoolMessageResponse[K, A]) poolEvent() {} func (*EventPoolMessageFailure[K]) poolEvent() {} +func (*EventPoolPoll) poolEvent() {} diff --git a/query/pool_test.go b/query/pool_test.go index ea96184..3e8da96 100644 --- a/query/pool_test.go +++ b/query/pool_test.go @@ -76,7 +76,7 @@ func TestPoolStartsIdle(t *testing.T) { p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg) require.NoError(t, err) - state := p.Advance(ctx, nil) + state := p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolIdle{}, state) } @@ -90,7 +90,7 @@ func TestPoolStopWhenNoQueries(t *testing.T) { p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg) require.NoError(t, err) - state := p.Advance(ctx, &EventPoolStopQuery{}) + state := p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolIdle{}, state) } @@ -137,7 +137,7 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) { require.Equal(t, msg, st.Message) // now the pool reports that it is waiting - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolWaitingWithCapacity{}, state) } @@ -244,14 +244,14 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) { require.Equal(t, b, st.NodeID) // advance the pool again, the first query should continue its operation in preference to starting the new query - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID1, st.QueryID) require.Equal(t, c, st.NodeID) // advance the pool again, the first query is at capacity so the second query can start - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID2, st.QueryID) @@ -362,7 +362,7 @@ func TestPoolRespectsConcurrency(t *testing.T) { require.Equal(t, queryID1, stf.QueryID) // advancing pool again allows query 3 to start - state = p.Advance(ctx, nil) + state = p.Advance(ctx, &EventPoolPoll{}) require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state) st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr]) require.Equal(t, queryID3, st.QueryID) diff --git a/query/query.go b/query/query.go index cad8ff3..f637f21 100644 --- a/query/query.go +++ b/query/query.go @@ -373,6 +373,7 @@ func (q *Query[K, A]) onMessageFailure(ctx context.Context, node kad.NodeID[K]) switch st := ni.State.(type) { case *StateNodeWaiting: q.inFlight-- + q.stats.Failure++ case *StateNodeUnresponsive: // update node state to failed break diff --git a/routing/bootstrap.go b/routing/bootstrap.go new file mode 100644 index 0000000..5dc4135 --- /dev/null +++ b/routing/bootstrap.go @@ -0,0 +1,252 @@ +package routing + +import ( + "context" + "fmt" + "time" + + "github.com/benbjohnson/clock" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/kaderr" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/util" +) + +type Bootstrap[K kad.Key[K], A kad.Address[A]] struct { + // self is the node id of the system the bootstrap is running on + self kad.NodeID[K] + + // qry is the query used by the bootstrap process + qry *query.Query[K, A] + + // cfg is a copy of the optional configuration supplied to the Bootstrap + cfg BootstrapConfig[K, A] +} + +// BootstrapConfig specifies optional configuration for a Bootstrap +type BootstrapConfig[K kad.Key[K], A kad.Address[A]] struct { + Timeout time.Duration // the time to wait before terminating a query that is not making progress + RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight + RequestTimeout time.Duration // the timeout queries should use for contacting a single node + Clock clock.Clock // a clock that may replaced by a mock when testing +} + +// Validate checks the configuration options and returns an error if any have invalid values. +func (cfg *BootstrapConfig[K, A]) Validate() error { + if cfg.Clock == nil { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("clock must not be nil"), + } + } + + if cfg.Timeout < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("timeout must be greater than zero"), + } + } + + if cfg.RequestConcurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("request concurrency must be greater than zero"), + } + } + + if cfg.RequestTimeout < 1 { + return &kaderr.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("request timeout must be greater than zero"), + } + } + + return nil +} + +// DefaultBootstrapConfig returns the default configuration options for a Bootstrap. +// Options may be overridden before passing to NewBootstrap +func DefaultBootstrapConfig[K kad.Key[K], A kad.Address[A]]() *BootstrapConfig[K, A] { + return &BootstrapConfig[K, A]{ + Clock: clock.New(), // use standard time + Timeout: 5 * time.Minute, + RequestConcurrency: 3, + RequestTimeout: time.Minute, + } +} + +func NewBootstrap[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], cfg *BootstrapConfig[K, A]) (*Bootstrap[K, A], error) { + if cfg == nil { + cfg = DefaultBootstrapConfig[K, A]() + } else if err := cfg.Validate(); err != nil { + return nil, err + } + + return &Bootstrap[K, A]{ + self: self, + cfg: *cfg, + }, nil +} + +// Advance advances the state of the bootstrap by attempting to advance its query if running. +func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState { + ctx, span := util.StartSpan(ctx, "Bootstrap.Advance") + defer span.End() + + switch tev := ev.(type) { + case *EventBootstrapStart[K, A]: + // TODO: ignore start event if query is already in progress + iter := query.NewClosestNodesIter(b.self.Key()) + + qryCfg := query.DefaultQueryConfig[K]() + qryCfg.Clock = b.cfg.Clock + qryCfg.Concurrency = b.cfg.RequestConcurrency + qryCfg.RequestTimeout = b.cfg.RequestTimeout + + queryID := query.QueryID("bootstrap") + + qry, err := query.NewQuery[K](b.self, queryID, tev.ProtocolID, tev.Message, iter, tev.KnownClosestNodes, qryCfg) + if err != nil { + // TODO: don't panic + panic(err) + } + b.qry = qry + return b.advanceQuery(ctx, nil) + + case *EventBootstrapMessageResponse[K, A]: + return b.advanceQuery(ctx, &query.EventQueryMessageResponse[K, A]{ + NodeID: tev.NodeID, + Response: tev.Response, + }) + case *EventBootstrapMessageFailure[K]: + return b.advanceQuery(ctx, &query.EventQueryMessageFailure[K]{ + NodeID: tev.NodeID, + Error: tev.Error, + }) + + case *EventBootstrapPoll: + // ignore, nothing to do + default: + panic(fmt.Sprintf("unexpected event: %T", tev)) + } + + if b.qry != nil { + return b.advanceQuery(ctx, nil) + } + + return &StateBootstrapIdle{} +} + +func (b *Bootstrap[K, A]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState { + state := b.qry.Advance(ctx, qev) + switch st := state.(type) { + case *query.StateQueryWaitingMessage[K, A]: + return &StateBootstrapMessage[K, A]{ + QueryID: st.QueryID, + Stats: st.Stats, + NodeID: st.NodeID, + ProtocolID: st.ProtocolID, + Message: st.Message, + } + case *query.StateQueryFinished: + return &StateBootstrapFinished{ + Stats: st.Stats, + } + case *query.StateQueryWaitingAtCapacity: + elapsed := b.cfg.Clock.Since(st.Stats.Start) + if elapsed > b.cfg.Timeout { + return &StateBootstrapTimeout{ + Stats: st.Stats, + } + } + return &StateBootstrapWaiting{ + Stats: st.Stats, + } + case *query.StateQueryWaitingWithCapacity: + elapsed := b.cfg.Clock.Since(st.Stats.Start) + if elapsed > b.cfg.Timeout { + return &StateBootstrapTimeout{ + Stats: st.Stats, + } + } + return &StateBootstrapWaiting{ + Stats: st.Stats, + } + default: + panic(fmt.Sprintf("unexpected state: %T", st)) + } +} + +// BootstrapState is the state of a bootstrap. +type BootstrapState interface { + bootstrapState() +} + +// StateBootstrapMessage indicates that the bootstrap query is waiting to message a node. +type StateBootstrapMessage[K kad.Key[K], A kad.Address[A]] struct { + QueryID query.QueryID + NodeID kad.NodeID[K] + ProtocolID address.ProtocolID + Message kad.Request[K, A] + Stats query.QueryStats +} + +// StateBootstrapIdle indicates that the bootstrap is not running its query. +type StateBootstrapIdle struct{} + +// StateBootstrapFinished indicates that the bootstrap has finished. +type StateBootstrapFinished struct { + Stats query.QueryStats +} + +// StateBootstrapTimeout indicates that the bootstrap query has timed out. +type StateBootstrapTimeout struct { + Stats query.QueryStats +} + +// StateBootstrapWaiting indicates that the bootstrap query is waiting for a response. +type StateBootstrapWaiting struct { + Stats query.QueryStats +} + +// bootstrapState() ensures that only Bootstrap states can be assigned to a BootstrapState. +func (*StateBootstrapMessage[K, A]) bootstrapState() {} +func (*StateBootstrapIdle) bootstrapState() {} +func (*StateBootstrapFinished) bootstrapState() {} +func (*StateBootstrapTimeout) bootstrapState() {} +func (*StateBootstrapWaiting) bootstrapState() {} + +// BootstrapEvent is an event intended to advance the state of a bootstrap. +type BootstrapEvent interface { + bootstrapEvent() +} + +// EventBootstrapPoll is an event that signals the bootstrap that it can perform housekeeping work such as time out queries. +type EventBootstrapPoll struct{} + +// EventBootstrapStart is an event that attempts to start a new bootstrap +type EventBootstrapStart[K kad.Key[K], A kad.Address[A]] struct { + ProtocolID address.ProtocolID + Message kad.Request[K, A] + KnownClosestNodes []kad.NodeID[K] +} + +// EventBootstrapMessageResponse notifies a bootstrap that a sent message has received a successful response. +type EventBootstrapMessageResponse[K kad.Key[K], A kad.Address[A]] struct { + NodeID kad.NodeID[K] // the node the message was sent to + Response kad.Response[K, A] // the message response sent by the node +} + +// EventBootstrapMessageFailure notifiesa bootstrap that an attempt to send a message has failed. +type EventBootstrapMessageFailure[K kad.Key[K]] struct { + NodeID kad.NodeID[K] // the node the message was sent to + Error error // the error that caused the failure, if any +} + +// bootstrapEvent() ensures that only Bootstrap events can be assigned to the BootstrapEvent interface. +func (*EventBootstrapPoll) bootstrapEvent() {} +func (*EventBootstrapStart[K, A]) bootstrapEvent() {} +func (*EventBootstrapMessageResponse[K, A]) bootstrapEvent() {} +func (*EventBootstrapMessageFailure[K]) bootstrapEvent() {} diff --git a/routing/bootstrap_test.go b/routing/bootstrap_test.go new file mode 100644 index 0000000..43dddf7 --- /dev/null +++ b/routing/bootstrap_test.go @@ -0,0 +1,242 @@ +package routing + +import ( + "context" + "testing" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/require" + + "github.com/plprobelab/go-kademlia/internal/kadtest" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" +) + +func TestBootstrapConfigValidate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + require.NoError(t, cfg.Validate()) + }) + + t.Run("clock is not nil", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("timeout positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Timeout = 0 + require.Error(t, cfg.Validate()) + cfg.Timeout = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request concurrency positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.RequestConcurrency = 0 + require.Error(t, cfg.Validate()) + cfg.RequestConcurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request timeout positive", func(t *testing.T) { + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.RequestTimeout = 0 + require.Error(t, cfg.Validate()) + cfg.RequestTimeout = -1 + require.Error(t, cfg.Validate()) + }) +} + +func TestBootstrapStartsIdle(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + state := bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapIdle{}, state) +} + +func TestBootstrapStart(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + }) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + + // the query should attempt to contact the node it was given + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + + // the query should be the one just added + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + + // the query should attempt to contact the node it was given + require.Equal(t, a, st.NodeID) + + // with the correct protocol ID + require.Equal(t, protocolID, st.ProtocolID) + + // with the correct message + require.Equal(t, msg, st.Message) + + // now the bootstrap reports that it is waiting + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapWaiting{}, state) +} + +func TestBootstrapMessageResponse(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{a}, + }) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + + // the bootstrap should attempt to contact the node it was given + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, a, st.NodeID) + + // notify bootstrap that node was contacted successfully, but no closer nodes + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: a, + }) + + // bootstrap should respond that its query has finished + require.IsType(t, &StateBootstrapFinished{}, state) + + stf := state.(*StateBootstrapFinished) + require.Equal(t, 1, stf.Stats.Requests) + require.Equal(t, 1, stf.Stats.Success) +} + +func TestBootstrapProgress(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultBootstrapConfig[key.Key8, kadtest.StrAddr]() + cfg.Clock = clk + cfg.RequestConcurrency = 3 // 1 less than the 4 nodes to be visited + + self := kadtest.NewID(key.Key8(0)) + bs, err := NewBootstrap[key.Key8, kadtest.StrAddr](self, cfg) + require.NoError(t, err) + + a := kadtest.NewID(key.Key8(0b00000100)) // 4 + b := kadtest.NewID(key.Key8(0b00001000)) // 8 + c := kadtest.NewID(key.Key8(0b00010000)) // 16 + d := kadtest.NewID(key.Key8(0b00100000)) // 32 + + // ensure the order of the known nodes + require.True(t, self.Key().Xor(a.Key()).Compare(self.Key().Xor(b.Key())) == -1) + require.True(t, self.Key().Xor(b.Key()).Compare(self.Key().Xor(c.Key())) == -1) + require.True(t, self.Key().Xor(c.Key()).Compare(self.Key().Xor(d.Key())) == -1) + + msg := kadtest.NewRequest("1", self.Key()) + protocolID := address.ProtocolID("testprotocol") + + // start the bootstrap + state := bs.Advance(ctx, &EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: protocolID, + Message: msg, + KnownClosestNodes: []kad.NodeID[key.Key8]{d, a, b, c}, + }) + + // the bootstrap should attempt to contact the closest node it was given + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st := state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("bootstrap"), st.QueryID) + require.Equal(t, a, st.NodeID) + + // next the bootstrap attempts to contact second nearest node + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, b, st.NodeID) + + // next the bootstrap attempts to contact third nearest node + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, c, st.NodeID) + + // now the bootstrap should be waiting since it is at request capacity + state = bs.Advance(ctx, &EventBootstrapPoll{}) + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that node was contacted successfully, but no closer nodes + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: a, + }) + + // now the bootstrap has capacity to contact fourth nearest node + require.IsType(t, &StateBootstrapMessage[key.Key8, kadtest.StrAddr]{}, state) + st = state.(*StateBootstrapMessage[key.Key8, kadtest.StrAddr]) + require.Equal(t, d, st.NodeID) + + // notify bootstrap that a node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: b, + }) + + // bootstrap should respond that it is waiting for messages + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that a node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: c, + }) + + // bootstrap should respond that it is waiting for last message + require.IsType(t, &StateBootstrapWaiting{}, state) + + // notify bootstrap that the final node was contacted successfully + state = bs.Advance(ctx, &EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeID: d, + }) + + // bootstrap should respond that its query has finished + require.IsType(t, &StateBootstrapFinished{}, state) + + stf := state.(*StateBootstrapFinished) + require.Equal(t, 4, stf.Stats.Requests) + require.Equal(t, 4, stf.Stats.Success) +}