From ab50ba9494339d1cb16960b5ed8756bbe6aef48b Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 31 Aug 2023 14:03:50 +0100 Subject: [PATCH] routing: integrate probe state machine --- internal/nettest/topology.go | 3 +- kademlia/behaviour.go | 5 + kademlia/behaviour_test.go | 54 +++++++ kademlia/dht.go | 15 +- kademlia/routing.go | 165 ++++++++++++------- kademlia/routing_test.go | 303 +++++++++++++++++++++++++++++++++++ 6 files changed, 488 insertions(+), 57 deletions(-) create mode 100644 kademlia/behaviour_test.go create mode 100644 kademlia/routing_test.go diff --git a/internal/nettest/topology.go b/internal/nettest/topology.go index 56b5d74..eb8710f 100644 --- a/internal/nettest/topology.go +++ b/internal/nettest/topology.go @@ -9,12 +9,13 @@ import ( "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/routing" ) type Node[K kad.Key[K], A kad.Address[A]] struct { NodeInfo kad.NodeInfo[K, A] Router *Router[K, A] - RoutingTable kad.RoutingTable[K, kad.NodeID[K]] + RoutingTable routing.RoutingTableCpl[K, kad.NodeID[K]] } type Topology[K kad.Key[K], A kad.Address[A]] struct { diff --git a/kademlia/behaviour.go b/kademlia/behaviour.go index b9c146a..6d847ea 100644 --- a/kademlia/behaviour.go +++ b/kademlia/behaviour.go @@ -22,6 +22,7 @@ type Behaviour[I DhtEvent, O DhtEvent] interface { // Notify informs the behaviour of an event. The behaviour may perform the event // immediately and queue the result, causing the behaviour to become ready. + // It is safe to call Notify from the Perform method. Notify(ctx context.Context, ev I) // Perform gives the behaviour the opportunity to perform work or to return a queued @@ -29,6 +30,10 @@ type Behaviour[I DhtEvent, O DhtEvent] interface { Perform(ctx context.Context) (O, bool) } +type SM[E any, S any] interface { + Advance(context.Context, E) S +} + type WorkQueueFunc[E DhtEvent] func(context.Context, E) bool // WorkQueue is buffered queue of work to be performed. diff --git a/kademlia/behaviour_test.go b/kademlia/behaviour_test.go new file mode 100644 index 0000000..0efd04a --- /dev/null +++ b/kademlia/behaviour_test.go @@ -0,0 +1,54 @@ +package kademlia + +import ( + "context" + "fmt" + "reflect" + "testing" +) + +type NullSM[E any, S any] struct{} + +func (NullSM[E, S]) Advance(context.Context, E) S { + var v S + return v +} + +type RecordingSM[E any, S any] struct { + State S + Received E +} + +func NewRecordingSM[E any, S any](response S) *RecordingSM[E, S] { + return &RecordingSM[E, S]{ + State: response, + } +} + +func (r *RecordingSM[E, S]) Advance(ctx context.Context, e E) S { + r.Received = e + return r.State +} + +// expectBehaviourEvent selects on a behaviour's ready channel until it becomes ready and then checks the perform +// mehtod for the expected event type. Unexpected events are ignored and selecting resumes. +// The function returns when an event matching the type of expected is received or when the context is cancelled. +func expectBehaviourEvent[I DhtEvent, O DhtEvent](t *testing.T, ctx context.Context, b Behaviour[I, O], expected O) (O, error) { + t.Helper() + for { + select { + case <-b.Ready(): + ev, ok := b.Perform(ctx) + if !ok { + continue + } + t.Logf("saw event: %T\n", ev) + if reflect.TypeOf(ev) == reflect.TypeOf(expected) { + return ev, nil + } + case <-ctx.Done(): + var v O + return v, fmt.Errorf("test deadline exceeded") + } + } +} diff --git a/kademlia/dht.go b/kademlia/dht.go index 8d66868..ed9bc21 100644 --- a/kademlia/dht.go +++ b/kademlia/dht.go @@ -123,7 +123,7 @@ func DefaultConfig() *Config { } } -func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt kad.RoutingTable[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) { +func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt routing.RoutingTableCpl[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) { if cfg == nil { cfg = DefaultConfig() } else if err := cfg.Validate(); err != nil { @@ -168,7 +168,18 @@ func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A] return nil, fmt.Errorf("include: %w", err) } - routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, cfg.Logger) + probeCfg := routing.DefaultProbeConfig() + probeCfg.Clock = cfg.Clock + probeCfg.Timeout = cfg.QueryTimeout + + // TODO: expose config + // probeCfg.Concurrency = cfg.ProbeConcurrency + probe, err := routing.NewProbe[K, A](rt, probeCfg) + if err != nil { + return nil, fmt.Errorf("include: %w", err) + } + + routingBehaviour := NewRoutingBehaviour[K, A](self, bootstrap, include, probe, cfg.Logger) networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger) diff --git a/kademlia/routing.go b/kademlia/routing.go index d5929d4..310d567 100644 --- a/kademlia/routing.go +++ b/kademlia/routing.go @@ -9,6 +9,7 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/routing" "github.com/plprobelab/go-kademlia/util" + "go.opentelemetry.io/otel/attribute" "golang.org/x/exp/slog" ) @@ -16,8 +17,13 @@ type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct { // self is the node id of the system the dht is running on self kad.NodeID[K] // bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table - bootstrap *routing.Bootstrap[K, A] - include *routing.Include[K, A] + bootstrap SM[routing.BootstrapEvent, routing.BootstrapState] + + // include is the inclusion state machine, responsible for vetting nodes before including them in the routing table + include SM[routing.IncludeEvent, routing.IncludeState] + + // probe is the node probing state machine, responsible for periodically checking connectivity of nodes in the routing table + probe SM[routing.ProbeEvent, routing.ProbeState] pendingMu sync.Mutex pending []DhtEvent @@ -26,11 +32,12 @@ type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct { logger *slog.Logger } -func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap *routing.Bootstrap[K, A], include *routing.Include[K, A], logger *slog.Logger) *RoutingBehaviour[K, A] { +func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap SM[routing.BootstrapEvent, routing.BootstrapState], include SM[routing.IncludeEvent, routing.IncludeState], probe SM[routing.ProbeEvent, routing.ProbeState], logger *slog.Logger) *RoutingBehaviour[K, A] { r := &RoutingBehaviour[K, A]{ self: self, bootstrap: bootstrap, include: include, + probe: probe, ready: make(chan struct{}, 1), logger: logger, } @@ -38,14 +45,21 @@ func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], boo } func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { - ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Perform") + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Notify") defer span.End() r.pendingMu.Lock() defer r.pendingMu.Unlock() + r.notify(ctx, ev) +} +// notify must only be called while r.pendingMu is held +func (r *RoutingBehaviour[K, A]) notify(ctx context.Context, ev DhtEvent) { + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.notify") + defer span.End() switch ev := ev.(type) { case *EventDhtStartBootstrap[K, A]: + span.SetAttributes(attribute.String("event", "EventDhtStartBootstrap")) cmd := &routing.EventBootstrapStart[K, A]{ ProtocolID: ev.ProtocolID, Message: ev.Message, @@ -58,6 +72,7 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { } case *EventDhtAddNodeInfo[K, A]: + span.SetAttributes(attribute.String("event", "EventDhtAddNodeInfo")) // Ignore self if key.Equal(ev.NodeInfo.ID().Key(), r.self.Key()) { break @@ -71,7 +86,19 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { r.pending = append(r.pending, next) } + case *EventRoutingUpdated[K, A]: + span.SetAttributes(attribute.String("event", "EventRoutingUpdated")) + cmd := &routing.EventProbeAdd[K]{ + NodeID: ev.NodeInfo.ID(), + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + case *EventGetClosestNodesSuccess[K, A]: + span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String()))) switch ev.QueryID { case "bootstrap": for _, info := range ev.ClosestNodes { @@ -101,10 +128,23 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { r.pending = append(r.pending, next) } + case "probe": + cmd := &routing.EventProbeMessageResponse[K, A]{ + NodeInfo: ev.To, + Response: ClosestNodesFakeResponse(ev.Target, ev.ClosestNodes), + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) } case *EventGetClosestNodesFailure[K, A]: + span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String()))) + span.RecordError(ev.Err) switch ev.QueryID { case "bootstrap": cmd := &routing.EventBootstrapMessageFailure[K]{ @@ -121,11 +161,21 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { NodeInfo: ev.To, Error: ev.Err, } - // attempt to advance the include + // attempt to advance the include state machine next, ok := r.advanceInclude(ctx, cmd) if ok { r.pending = append(r.pending, next) } + case "probe": + cmd := &routing.EventProbeMessageFailure[K, A]{ + NodeInfo: ev.To, + Error: ev.Err, + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) @@ -169,59 +219,24 @@ func (r *RoutingBehaviour[K, A]) Perform(ctx context.Context) (DhtEvent, bool) { return ev, true } - // attempt to advance the bootstrap state machine - bstate := r.bootstrap.Advance(ctx, &routing.EventBootstrapPoll{}) - switch st := bstate.(type) { - - case *routing.StateBootstrapMessage[K, A]: - return &EventOutboundGetClosestNodes[K, A]{ - QueryID: "bootstrap", - To: NewNodeAddr[K, A](st.NodeID, nil), - Target: st.Message.Target(), - Notifiee: r, - }, true - - case *routing.StateBootstrapWaiting: - // bootstrap waiting for a message response, nothing to do - case *routing.StateBootstrapFinished: - return &EventBootstrapFinished{ - Stats: st.Stats, - }, true - case *routing.StateBootstrapIdle: - // bootstrap not running, nothing to do - default: - panic(fmt.Sprintf("unexpected bootstrap state: %T", st)) + // poll the child state machines in priority order to give each an opportunity to perform work + + ev, ok := r.advanceBootstrap(ctx, &routing.EventBootstrapPoll{}) + if ok { + return ev, true } - // attempt to advance the include state machine - istate := r.include.Advance(ctx, &routing.EventIncludePoll{}) - switch st := istate.(type) { - case *routing.StateIncludeFindNodeMessage[K, A]: - // include wants to send a find node message to a node - return &EventOutboundGetClosestNodes[K, A]{ - QueryID: "include", - To: st.NodeInfo, - Target: st.NodeInfo.ID().Key(), - Notifiee: r, - }, true - - case *routing.StateIncludeRoutingUpdated[K, A]: - // a node has been included in the routing table - return &EventRoutingUpdated[K, A]{ - NodeInfo: st.NodeInfo, - }, true - case *routing.StateIncludeWaitingAtCapacity: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeWaitingWithCapacity: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeWaitingFull: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeIdle: - // nothing to do except wait for message response or timeout - default: - panic(fmt.Sprintf("unexpected include state: %T", st)) + ev, ok = r.advanceInclude(ctx, &routing.EventIncludePoll{}) + if ok { + return ev, true + } + + ev, ok = r.advanceProbe(ctx, &routing.EventProbePoll{}) + if ok { + return ev, true } + // finally check if any pending events were accumulated in the meantime if len(r.pending) == 0 { return nil, false } @@ -273,6 +288,13 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing. case *routing.StateIncludeRoutingUpdated[K, A]: // a node has been included in the routing table + + // notify other routing state machines that there is a new node in the routing table + r.notify(ctx, &EventRoutingUpdated[K, A]{ + NodeInfo: st.NodeInfo, + }) + + // return the event to notify outwards too return &EventRoutingUpdated[K, A]{ NodeInfo: st.NodeInfo, }, true @@ -290,3 +312,38 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing. return nil, false } + +func (r *RoutingBehaviour[K, A]) advanceProbe(ctx context.Context, ev routing.ProbeEvent) (DhtEvent, bool) { + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.advanceProbe") + defer span.End() + st := r.probe.Advance(ctx, ev) + switch st := st.(type) { + case *routing.StateProbeConnectivityCheck[K, A]: + // include wants to send a find node message to a node + return &EventOutboundGetClosestNodes[K, A]{ + QueryID: "probe", + To: st.NodeInfo, + Target: st.NodeInfo.ID().Key(), + Notifiee: r, + }, true + case *routing.StateProbeNodeFailure[K, A]: + // a node has failed a connectivity check been removed from the routing table and the probe list + // add the node to the inclusion list for a second chance + r.notify(ctx, &EventDhtAddNodeInfo[K, A]{ + NodeInfo: st.NodeInfo, + }) + case *routing.StateProbeWaitingAtCapacity: + // the probe state machine is waiting for responses for checks and the maximum number of concurrent checks has been reached. + // nothing to do except wait for message response or timeout + case *routing.StateProbeWaitingWithCapacity: + // the probe state machine is waiting for responses for checks but has capacity to perform more + // nothing to do except wait for message response or timeout + case *routing.StateProbeIdle: + // the probe state machine is not running any checks. + // nothing to do except wait for message response or timeout + default: + panic(fmt.Sprintf("unexpected include state: %T", st)) + } + + return nil, false +} diff --git a/kademlia/routing_test.go b/kademlia/routing_test.go new file mode 100644 index 0000000..f97aebf --- /dev/null +++ b/kademlia/routing_test.go @@ -0,0 +1,303 @@ +package kademlia + +import ( + "errors" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/iand/zikade/internal/kadtest" + "github.com/iand/zikade/internal/nettest" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/routing" + "github.com/stretchr/testify/require" + + "golang.org/x/exp/slog" +) + +func TestRoutingBehaviour(t *testing.T) { +} + +func TestRoutingStartBootstrapSendsEvent(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventDhtStartBootstrap[key.Key8, kadtest.StrAddr]{ + ProtocolID: address.ProtocolID("test"), + Message: kadtest.NewRequest("1", self.Key()), + SeedNodes: []kad.NodeID[key.Key8]{nodes[1].NodeInfo.ID()}, + } + + routingBehaviour.Notify(ctx, ev) + + // the event that should be passed to the bootstrap state machine + expected := &routing.EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: ev.ProtocolID, + Message: ev.Message, + KnownClosestNodes: ev.SeedNodes, + } + require.Equal(t, expected, bootstrap.Received) +} + +func TestRoutingBootstrapGetClosestNodesSuccess(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("bootstrap"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[2].NodeInfo}, + } + + routingBehaviour.Notify(ctx, ev) + + // bootstrap should receive message response event + require.IsType(t, &routing.EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{}, bootstrap.Received) + + rev := bootstrap.Received.(*routing.EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo.ID(), rev.NodeID) + require.Equal(t, ev.ClosestNodes, rev.Response.CloserNodes()) +} + +func TestRoutingBootstrapGetClosestNodesFailure(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + failure := errors.New("failed") + ev := &EventGetClosestNodesFailure[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("bootstrap"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + Err: failure, + } + + routingBehaviour.Notify(ctx, ev) + + // bootstrap should receive message response event + require.IsType(t, &routing.EventBootstrapMessageFailure[key.Key8]{}, bootstrap.Received) + + rev := bootstrap.Received.(*routing.EventBootstrapMessageFailure[key.Key8]) + require.Equal(t, nodes[1].NodeInfo.ID(), rev.NodeID) + require.Equal(t, failure, rev.Error) +} + +func TestRoutingAddNodeInfoSendsEvent(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventDhtAddNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeInfo: nodes[2].NodeInfo, + } + + routingBehaviour.Notify(ctx, ev) + + // the event that should be passed to the include state machine + expected := &routing.EventIncludeAddCandidate[key.Key8, kadtest.StrAddr]{ + NodeInfo: ev.NodeInfo, + } + require.Equal(t, expected, include.Received) +} + +func TestRoutingIncludeGetClosestNodesSuccess(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("include"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[2].NodeInfo}, + } + + routingBehaviour.Notify(ctx, ev) + + // include should receive message response event + require.IsType(t, &routing.EventIncludeMessageResponse[key.Key8, kadtest.StrAddr]{}, include.Received) + + rev := include.Received.(*routing.EventIncludeMessageResponse[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo, rev.NodeInfo) + require.Equal(t, ev.ClosestNodes, rev.Response.CloserNodes()) +} + +func TestRoutingIncludeGetClosestNodesFailure(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + failure := errors.New("failed") + ev := &EventGetClosestNodesFailure[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("include"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + Err: failure, + } + + routingBehaviour.Notify(ctx, ev) + + // include should receive message response event + require.IsType(t, &routing.EventIncludeMessageFailure[key.Key8, kadtest.StrAddr]{}, include.Received) + + rev := include.Received.(*routing.EventIncludeMessageFailure[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo, rev.NodeInfo) + require.Equal(t, failure, rev.Error) +} + +func TestRoutingIncludedNodeAddToProbeList(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + rt := nodes[0].RoutingTable + + includeCfg := routing.DefaultIncludeConfig() + includeCfg.Clock = clk + include, err := routing.NewInclude[key.Key8, kadtest.StrAddr](rt, includeCfg) + require.NoError(t, err) + + probeCfg := routing.DefaultProbeConfig() + probeCfg.Clock = clk + probeCfg.CheckInterval = 5 * time.Minute + probe, err := routing.NewProbe[key.Key8, kadtest.StrAddr](rt, probeCfg) + require.NoError(t, err) + + // ensure bootstrap is always idle + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + // a new node to be included + candidate := nodes[len(nodes)-1].NodeInfo + + // the routing table should not contain the node yet + _, intable := rt.GetNode(candidate.ID().Key()) + require.False(t, intable) + + // notify that there is a new node to be included + routingBehaviour.Notify(ctx, &EventDhtAddNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeInfo: candidate, + }) + + // collect the result of the notify + dev, ok := routingBehaviour.Perform(ctx) + require.True(t, ok) + + // include should be asking to send a message to the node + require.IsType(t, &EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]{}, dev) + + oev := dev.(*EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]) + + // advance time a little + clk.Add(time.Second) + + // notify a successful response back (best to use the notify included in the event even though it will be the behaviour's Notify method) + oev.Notifiee.Notify(ctx, &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: oev.QueryID, + To: oev.To, + Target: oev.Target, + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[1].NodeInfo}, // must include one for include check to pass + }) + + // the routing table should now contain the node + _, intable = rt.GetNode(candidate.ID().Key()) + require.True(t, intable) + + // routing update event should be emitted from the include state machine + dev, ok = routingBehaviour.Perform(ctx) + require.True(t, ok) + require.IsType(t, &EventRoutingUpdated[key.Key8, kadtest.StrAddr]{}, dev) + + // advance time past the probe check interval + clk.Add(probeCfg.CheckInterval) + + // routing update event should be emitted from the include state machine + dev, ok = routingBehaviour.Perform(ctx) + require.True(t, ok) + require.IsType(t, &EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]{}, dev) + + // confirm that the message is for the correct node + oev = dev.(*EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("probe"), oev.QueryID) + require.Equal(t, candidate.ID(), oev.To.ID()) + require.Equal(t, candidate.ID().Key(), oev.Target) +}