diff --git a/routing/bootstrap.go b/routing/bootstrap.go index 5dc4135..96e0a4d 100644 --- a/routing/bootstrap.go +++ b/routing/bootstrap.go @@ -97,6 +97,7 @@ func (b *Bootstrap[K, A]) Advance(ctx context.Context, ev BootstrapEvent) Bootst switch tev := ev.(type) { case *EventBootstrapStart[K, A]: + // TODO: ignore start event if query is already in progress iter := query.NewClosestNodesIter(b.self.Key()) diff --git a/routing/probe.go b/routing/probe.go new file mode 100644 index 0000000..572024d --- /dev/null +++ b/routing/probe.go @@ -0,0 +1,503 @@ +package routing + +import ( + "container/heap" + "context" + "errors" + "fmt" + "time" + + "github.com/benbjohnson/clock" + "go.opentelemetry.io/otel/attribute" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/kaderr" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/util" +) + +type RoutingTableCpl[K kad.Key[K], N kad.NodeID[K]] interface { + kad.RoutingTable[K, N] + + // Cpl returns the longest common prefix length the supplied key shares with the table's key. + Cpl(kk K) int + + // CplSize returns the number of nodes in the table whose longest common prefix with the table's key is of length cpl. + CplSize(cpl int) int +} + +// The Probe state machine performs regular connectivity checks for nodes in a routing table. +// +// The state machine is notified of a new entry in the routing table via the [EventProbeAdd] event. This adds the node +// to an internal list and sets a time for a check to be performed, based on the current time plus a configurable +// interval. +// +// Connectivity checks are performed in time order, so older nodes are processed first. The connectivity check performed +// is the same as for the [Include] state machine: ask the node for closest nodes to itself and confirm that the node +// returns at least one node in the list of closer nodes. The state machine emits the [StateProbeConnectivityCheck] +// state when it wants to check the status of a node. +// +// The state machine expects to be notified either with the [EventProbeMessageResponse] or the +// [EventProbeMessageFailure] events to determine the outcome of the check. If neither are received within a +// configurable timeout the node is marked as failed. +// +// Nodes that receive a successful response have their next check time updated to the current time plus the configured +// [ProbeConfig.CheckInterval]. +// +// Nodes that fail a connectivity check, or are timed out, are removed from the routing table and from the list of nodes +// to check. The state machine emits the [StateProbeNodeFailure] state to notify callers of this event. +// +// The state machine accepts a [EventProbePoll] event to check for outstanding work such as initiating a new check or +// timing out an existing one. +// +// The [EventProbeRemove] event may be used to remove a node from the check list and from the routing table. +// +// The state machine accepts the [EventProbeNotifyConnectivity] event as a notification that an external system has +// performed a suitable connectivity check, such as when the node responds to a query. The probe state machine treats +// these events as if a successful response had been received from a check by advancing the time of the next check. +type Probe[K kad.Key[K], A kad.Address[A]] struct { + rt RoutingTableCpl[K, kad.NodeID[K]] + + // nvl is a list of nodes with information about their connectivity checks + // TODO: this will be expanded with more general scoring information related to their utility + nvl *nodeValueList[K] + + // cfg is a copy of the optional configuration supplied to the Probe + cfg ProbeConfig +} + +// ProbeConfig specifies optional configuration for a Probe +type ProbeConfig struct { + CheckInterval time.Duration // the minimum time interval between checks for a node + Concurrency int // the maximum number of probe checks that may be in progress at any one time + Timeout time.Duration // the time to wait before terminating a check that is not making progress + Clock clock.Clock // a clock that may be replaced by a mock when testing +} + +// Validate checks the configuration options and returns an error if any have invalid values. +func (cfg *ProbeConfig) Validate() error { + if cfg.Clock == nil { + return &kaderr.ConfigurationError{ + Component: "ProbeConfig", + Err: fmt.Errorf("clock must not be nil"), + } + } + + if cfg.Concurrency < 1 { + return &kaderr.ConfigurationError{ + Component: "ProbeConfig", + Err: fmt.Errorf("concurrency must be greater than zero"), + } + } + + if cfg.Timeout < 1 { + return &kaderr.ConfigurationError{ + Component: "ProbeConfig", + Err: fmt.Errorf("timeout must be greater than zero"), + } + } + + if cfg.CheckInterval < 1 { + return &kaderr.ConfigurationError{ + Component: "ProbeConfig", + Err: fmt.Errorf("revisit interval must be greater than zero"), + } + } + + return nil +} + +// DefaultProbeConfig returns the default configuration options for a Probe. +// Options may be overridden before passing to NewProbe +func DefaultProbeConfig() *ProbeConfig { + return &ProbeConfig{ + Clock: clock.New(), // use standard time + Concurrency: 3, // MAGIC + Timeout: time.Minute, // MAGIC + CheckInterval: 6 * time.Hour, // MAGIC + } +} + +func NewProbe[K kad.Key[K], A kad.Address[A]](rt RoutingTableCpl[K, kad.NodeID[K]], cfg *ProbeConfig) (*Probe[K, A], error) { + if cfg == nil { + cfg = DefaultProbeConfig() + } else if err := cfg.Validate(); err != nil { + return nil, err + } + + return &Probe[K, A]{ + cfg: *cfg, + rt: rt, + nvl: NewNodeValueList[K](), + }, nil +} + +// Advance advances the state of the probe state machine by attempting to advance its query if running. +func (p *Probe[K, A]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { + _, span := util.StartSpan(ctx, "Probe.Advance") + defer span.End() + + switch tev := ev.(type) { + case *EventProbePoll: + // ignore, nothing to do + span.SetAttributes(attribute.String("event", "EventProbePoll")) + case *EventProbeAdd[K]: + // check presence in routing table + span.SetAttributes(attribute.String("event", "EventProbeAdd"), attribute.String("nodeid", tev.NodeID.String())) + if _, found := p.rt.GetNode(tev.NodeID.Key()); !found { + // ignore if not in routing table + span.RecordError(errors.New("node not in routing table")) + break + } + + // add a node to the value list + nv := &nodeValue[K]{ + NodeID: tev.NodeID, + NextCheckDue: p.cfg.Clock.Now().Add(p.cfg.CheckInterval), + Cpl: p.rt.Cpl(tev.NodeID.Key()), + } + // TODO: if node was in ongoing list return a state that can signal the caller to cancel any prior outbound message + p.nvl.Put(nv) + case *EventProbeRemove[K]: + span.SetAttributes(attribute.String("event", "EventProbeRemove"), attribute.String("nodeid", tev.NodeID.String())) + p.nvl.Remove(tev.NodeID) + case *EventProbeMessageResponse[K, A]: + span.SetAttributes(attribute.String("event", "EventProbeMessageResponse"), attribute.String("nodeid", tev.NodeInfo.ID().String())) + nv, found := p.nvl.Get(tev.NodeInfo.ID()) + if !found { + // ignore message for unknown node, which might have been removed + span.RecordError(errors.New("node not in node value list")) + break + } + // update next check time + nv.NextCheckDue = p.cfg.Clock.Now().Add(p.cfg.CheckInterval) + + // put into list, which will clear any ongoing check too + p.nvl.Put(nv) + + case *EventProbeMessageFailure[K, A]: + // probe failed, so remove from routing table and from list + span.SetAttributes(attribute.String("event", "EventProbeMessageFailure"), attribute.String("nodeid", tev.NodeInfo.ID().String())) + span.RecordError(tev.Error) + p.rt.RemoveKey(tev.NodeInfo.ID().Key()) + p.nvl.Remove(tev.NodeInfo.ID()) + return &StateProbeNodeFailure[K]{ + NodeID: tev.NodeInfo.ID(), + } + case *EventProbeNotifyConnectivity[K]: + span.SetAttributes(attribute.String("event", "EventProbeNotifyConnectivity"), attribute.String("nodeid", tev.NodeID.String())) + nv, found := p.nvl.Get(tev.NodeID) + if !found { + // ignore message for unknown node, which might have been removed + break + } + // update next check time + nv.NextCheckDue = p.cfg.Clock.Now().Add(p.cfg.CheckInterval) + + // put into list, which will clear any ongoing check too + p.nvl.Put(nv) + + default: + panic(fmt.Sprintf("unexpected event: %T", tev)) + } + + // Check if there is capacity + if p.cfg.Concurrency <= p.nvl.OngoingCount() { + // see if a check can be timed out to free capacity + candidate, found := p.nvl.FindCheckPastDeadline(p.cfg.Clock.Now()) + if !found { + // nothing suitable for time out + return &StateProbeWaitingAtCapacity{} + } + + // mark the node as failed since it timed out + p.rt.RemoveKey(candidate.Key()) + p.nvl.Remove(candidate) + return &StateProbeNodeFailure[K]{ + NodeID: candidate, + } + + } + + // there is capacity to start a new check + next, ok := p.nvl.PeekNext(p.cfg.Clock.Now()) + if !ok { + if p.nvl.OngoingCount() > 0 { + // waiting for a check but nothing else to do + return &StateProbeWaitingWithCapacity{} + } + // nothing happening and nothing to do + return &StateProbeIdle{} + } + + p.nvl.MarkOngoing(next.NodeID, p.cfg.Clock.Now().Add(p.cfg.Timeout)) + + // Ask the node to find itself + return &StateProbeConnectivityCheck[K]{ + NodeID: next.NodeID, + } +} + +// ProbeState is the state of the [Probe] state machine. +type ProbeState interface { + probeState() +} + +// StateProbeConnectivityCheck indicates that the probe subsystem is waiting to send a connectivity check to a node. +// A find node message should be sent to the node, with the target being the node's key. +type StateProbeConnectivityCheck[K kad.Key[K]] struct { + NodeID kad.NodeID[K] // the node to send the message to +} + +// StateProbeIdle indicates that the probe state machine is not running any checks. +type StateProbeIdle struct{} + +// StateProbeWaitingAtCapacity indicates that the probe state machine is waiting for responses for checks and +// the maximum number of concurrent checks has been reached. +type StateProbeWaitingAtCapacity struct{} + +// StateProbeWaitingWithCapacity indicates that the probe state machine is waiting for responses for checks +// but has capacity to perform more. +type StateProbeWaitingWithCapacity struct{} + +// StateProbeNodeFailure indicates a node has failed a connectivity check been removed from the routing table and the probe list +type StateProbeNodeFailure[K kad.Key[K]] struct { + NodeID kad.NodeID[K] +} + +// probeState() ensures that only Probe states can be assigned to the ProbeState interface. +func (*StateProbeConnectivityCheck[K]) probeState() {} +func (*StateProbeIdle) probeState() {} +func (*StateProbeWaitingAtCapacity) probeState() {} +func (*StateProbeWaitingWithCapacity) probeState() {} +func (*StateProbeNodeFailure[K]) probeState() {} + +// ProbeEvent is an event intended to advance the state of a probe. +type ProbeEvent interface { + probeEvent() +} + +// EventProbePoll is an event that signals the probe that it can perform housekeeping work such as time out queries. +type EventProbePoll struct{} + +// EventProbeAdd notifies a probe that a node should be added to its list of nodes. +type EventProbeAdd[K kad.Key[K]] struct { + NodeID kad.NodeID[K] // the node to be probed +} + +// EventProbeRemove notifies a probe that a node should be removed from its list of nodes and the routing table. +type EventProbeRemove[K kad.Key[K]] struct { + NodeID kad.NodeID[K] // the node to be removed +} + +// EventProbeMessageResponse notifies a probe that a sent message has received a successful response. +type EventProbeMessageResponse[K kad.Key[K], A kad.Address[A]] struct { + NodeInfo kad.NodeInfo[K, A] // the node the message was sent to + Response kad.Response[K, A] // the message response sent by the node +} + +// EventProbeMessageFailure notifiesa probe that an attempt to send a message has failed. +type EventProbeMessageFailure[K kad.Key[K], A kad.Address[A]] struct { + NodeInfo kad.NodeInfo[K, A] // the node the message was sent to + Error error // the error that caused the failure, if any +} + +// EventProbeNotifyConnectivity notifies a probe that a node has confirmed connectivity from another source such as a query. +type EventProbeNotifyConnectivity[K kad.Key[K]] struct { + NodeID kad.NodeID[K] +} + +// probeEvent() ensures that only Probe events can be assigned to the ProbeEvent interface. +func (*EventProbePoll) probeEvent() {} +func (*EventProbeAdd[K]) probeEvent() {} +func (*EventProbeRemove[K]) probeEvent() {} +func (*EventProbeMessageResponse[K, A]) probeEvent() {} +func (*EventProbeMessageFailure[K, A]) probeEvent() {} +func (*EventProbeNotifyConnectivity[K]) probeEvent() {} + +type nodeValue[K kad.Key[K]] struct { + NodeID kad.NodeID[K] + Cpl int // the longest common prefix length the node shares with the routing table's key + NextCheckDue time.Time + CheckDeadline time.Time + Index int // the index of the item in the ordering +} + +type nodeValueEntry[K kad.Key[K]] struct { + nv *nodeValue[K] + index int // the index of the item in the ordering +} + +type nodeValueList[K kad.Key[K]] struct { + nodes map[string]*nodeValueEntry[K] + pending *nodeValuePendingList[K] + // ongoing is a list of nodes with ongoing/in-progress probes, loosely ordered earliest to most recent + ongoing []kad.NodeID[K] +} + +func NewNodeValueList[K kad.Key[K]]() *nodeValueList[K] { + return &nodeValueList[K]{ + nodes: make(map[string]*nodeValueEntry[K]), + ongoing: make([]kad.NodeID[K], 0), + pending: new(nodeValuePendingList[K]), + } +} + +// Put adds a node value to the list, replacing any existing value. +// It is added to the pending list and removed from the ongoing list if it was already present there. +func (l *nodeValueList[K]) Put(nv *nodeValue[K]) { + mk := key.HexString(nv.NodeID.Key()) + nve, exists := l.nodes[mk] + if !exists { + nve = &nodeValueEntry[K]{ + nv: nv, + } + } else { + nve.nv = nv + heap.Remove(l.pending, nve.index) + } + heap.Push(l.pending, nve) + l.nodes[mk] = nve + heap.Fix(l.pending, nve.index) + l.removeFromOngoing(nv.NodeID) +} + +func (l *nodeValueList[K]) Get(n kad.NodeID[K]) (*nodeValue[K], bool) { + mk := key.HexString(n.Key()) + nve, found := l.nodes[mk] + if !found { + return nil, false + } + return nve.nv, true +} + +func (l *nodeValueList[K]) PendingCount() int { + return len(*l.pending) +} + +func (l *nodeValueList[K]) OngoingCount() int { + return len(l.ongoing) +} + +func (l *nodeValueList[K]) NodeCount() int { + return len(l.nodes) +} + +// Put removes a node value from the list, deleting its information. +// It is removed from the pending list andongoing list if it was already present in either. +func (l *nodeValueList[K]) Remove(n kad.NodeID[K]) { + mk := key.HexString(n.Key()) + nve, ok := l.nodes[mk] + if !ok { + return + } + delete(l.nodes, mk) + if nve.index >= 0 { + heap.Remove(l.pending, nve.index) + } + l.removeFromOngoing(n) +} + +// FindCheckPastDeadline looks for the first node in the ongoing list whose deadline is +// before the supplied timestamp. +func (l *nodeValueList[K]) FindCheckPastDeadline(ts time.Time) (kad.NodeID[K], bool) { + // ongoing is in start time order, oldest first + for _, n := range l.ongoing { + mk := key.HexString(n.Key()) + nve, ok := l.nodes[mk] + if !ok { + // somehow the node doesn't exist so this is an obvious candidate for removal + return n, true + } + if !nve.nv.CheckDeadline.After(ts) { + return n, true + } + } + return nil, false +} + +func (l *nodeValueList[K]) removeFromOngoing(n kad.NodeID[K]) { + // ongoing list is expected to be small, so linear search is ok + for i := range l.ongoing { + if key.Equal(n.Key(), l.ongoing[i].Key()) { + if len(l.ongoing) > 1 { + // swap with last entry + l.ongoing[i], l.ongoing[len(l.ongoing)-1] = l.ongoing[len(l.ongoing)-1], l.ongoing[i] + } + // remove last entry + l.ongoing[len(l.ongoing)-1] = nil + l.ongoing = l.ongoing[:len(l.ongoing)-1] + return + } + } +} + +// PeekNext returns the next node that is due a connectivity check without removing it +// from the pending list. +func (l *nodeValueList[K]) PeekNext(ts time.Time) (*nodeValue[K], bool) { + if len(*l.pending) == 0 { + return nil, false + } + + nve := (*l.pending)[0] + + // Is the check due yet? + if nve.nv.NextCheckDue.After(ts) { + return nil, false + } + + return (*l.pending)[0].nv, true +} + +// MarkOngoing marks a node as having an ongoing connectivity check. +// It has no effect if the node is not already present in the list. +func (l *nodeValueList[K]) MarkOngoing(n kad.NodeID[K], deadline time.Time) { + mk := key.HexString(n.Key()) + nve, ok := l.nodes[mk] + if !ok { + return + } + nve.nv.CheckDeadline = deadline + l.nodes[mk] = nve + heap.Remove(l.pending, nve.index) + l.ongoing = append(l.ongoing, nve.nv.NodeID) +} + +// nodeValuePendingList is a min-heap of NodeValue ordered by NextCheckDue +type nodeValuePendingList[K kad.Key[K]] []*nodeValueEntry[K] + +func (o nodeValuePendingList[K]) Len() int { return len(o) } +func (o nodeValuePendingList[K]) Less(i, j int) bool { + // if due times are equal, then sort higher cpls first + if o[i].nv.NextCheckDue.Equal(o[j].nv.NextCheckDue) { + return o[i].nv.Cpl > o[j].nv.Cpl + } + + return o[i].nv.NextCheckDue.Before(o[j].nv.NextCheckDue) +} + +func (o nodeValuePendingList[K]) Swap(i, j int) { + o[i], o[j] = o[j], o[i] + o[i].index = i + o[j].index = j +} + +func (o *nodeValuePendingList[K]) Push(x any) { + n := len(*o) + v := x.(*nodeValueEntry[K]) + v.index = n + *o = append(*o, v) +} + +func (o *nodeValuePendingList[K]) Pop() any { + if len(*o) == 0 { + return nil + } + old := *o + n := len(old) + v := old[n-1] + old[n-1] = nil + v.index = -1 + *o = old[0 : n-1] + return v +} diff --git a/routing/probe_test.go b/routing/probe_test.go new file mode 100644 index 0000000..a376809 --- /dev/null +++ b/routing/probe_test.go @@ -0,0 +1,857 @@ +package routing + +import ( + "container/heap" + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/require" + + "github.com/plprobelab/go-kademlia/internal/kadtest" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/routing/simplert" +) + +var _ heap.Interface = (*nodeValuePendingList[key.Key8])(nil) + +type unaddressedNodeInfo[K kad.Key[K], A kad.Address[A]] struct { + NodeID kad.NodeID[K] +} + +func (u unaddressedNodeInfo[K, A]) ID() kad.NodeID[K] { return u.NodeID } +func (u unaddressedNodeInfo[K, A]) Addresses() []A { return nil } + +func TestProbeConfigValidate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultProbeConfig() + require.NoError(t, cfg.Validate()) + }) + + t.Run("clock is not nil", func(t *testing.T) { + cfg := DefaultProbeConfig() + cfg.Clock = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("timeout positive", func(t *testing.T) { + cfg := DefaultProbeConfig() + cfg.Timeout = 0 + require.Error(t, cfg.Validate()) + cfg.Timeout = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("request concurrency positive", func(t *testing.T) { + cfg := DefaultProbeConfig() + cfg.Concurrency = 0 + require.Error(t, cfg.Validate()) + cfg.Concurrency = -1 + require.Error(t, cfg.Validate()) + }) + + t.Run("revisit interval positive", func(t *testing.T) { + cfg := DefaultProbeConfig() + cfg.CheckInterval = 0 + require.Error(t, cfg.Validate()) + cfg.CheckInterval = -1 + require.Error(t, cfg.Validate()) + }) +} + +func TestProbeStartsIdle(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + cfg := DefaultProbeConfig() + cfg.Clock = clk + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + + bs, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + state := bs.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) +} + +func TestProbeAddChecksPresenceInRoutingTable(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow one check to run + cfg.Concurrency = 1 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // Add node that isn't in routing table + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + + // remains idle since probes aren't run unless node in routing table + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) +} + +func TestProbeAddStartsCheckIfCapacity(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow one check to run + cfg.Concurrency = 1 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // remains idle + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the probe state machine should attempt to contact the next node + st := state.(*StateProbeConnectivityCheck[key.Key8]) + + // the connectivity check should be for the right node + require.True(t, key.Equal(key.Key8(4), st.NodeID.Key())) +} + +func TestProbeAddManyStartsChecksIfCapacity(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency lower than the number of nodes + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + rt.AddNode(kadtest.NewID(key.Key8(3))) + rt.AddNode(kadtest.NewID(key.Key8(2))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // after adding second node the probe should still be idle since the + // connectivity check will be scheduled for the future + state = sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(3)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // after adding third node the probe should still be idle since the + // connectivity check will be scheduled for the future + state = sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(2)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + + // Poll the state machine, it should now attempt to contact a node + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the connectivity check should be for the right node + st := state.(*StateProbeConnectivityCheck[key.Key8]) + require.True(t, key.Equal(key.Key8(4), st.NodeID.Key())) + + // Poll the state machine, it should now attempt to contact another node + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the connectivity check should be for the right node + st = state.(*StateProbeConnectivityCheck[key.Key8]) + require.True(t, key.Equal(key.Key8(2), st.NodeID.Key())) + + // Poll the state machine, it should now be at capacity + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeWaitingAtCapacity{}, state) +} + +func TestProbeAddReportsCapacity(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow more than one check to run + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // remains idle + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the probe state machine should attempt to contact the next node + st := state.(*StateProbeConnectivityCheck[key.Key8]) + + // the connectivity check should be for the right node + require.True(t, key.Equal(key.Key8(4), st.NodeID.Key())) + + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeWaitingWithCapacity{}, state) +} + +func TestProbeRemoveDeletesNodeValue(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow more than one check to run + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // remove the node + state = sm.Advance(ctx, &EventProbeRemove[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + + // state should remain idle + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + + // state remains idle since there are no nodes to probe + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) +} + +func TestNodeValueList(t *testing.T) { + t.Run("put new", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + + l.Put(nv) + + got, found := l.Get(kadtest.NewID(key.Key8(4))) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), key.Key8(4))) + }) + + t.Run("put replace before", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + + l.Put(nv1) + + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now().Add(-time.Minute), + } + l.Put(nv2) + + got, found := l.Get(kadtest.NewID(key.Key8(4))) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), key.Key8(4))) + require.Equal(t, nv2.NextCheckDue, got.NextCheckDue) + }) + + t.Run("put replace after", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + + l.Put(nv1) + + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now().Add(time.Minute), + } + l.Put(nv2) + + got, found := l.Get(kadtest.NewID(key.Key8(4))) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), key.Key8(4))) + require.Equal(t, nv2.NextCheckDue, got.NextCheckDue) + }) + + t.Run("remove existing", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + + l.Put(nv) + + require.Equal(t, 1, l.PendingCount()) + require.Equal(t, 1, l.NodeCount()) + + _, found := l.Get(kadtest.NewID(key.Key8(4))) + require.True(t, found) + + l.Remove(kadtest.NewID(key.Key8(4))) + _, found = l.Get(kadtest.NewID(key.Key8(4))) + require.False(t, found) + + require.Equal(t, 0, l.PendingCount()) + require.Equal(t, 0, l.NodeCount()) + }) + + t.Run("remove not-existing", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + + l.Put(nv) + + l.Remove(kadtest.NewID(key.Key8(5))) + _, found := l.Get(kadtest.NewID(key.Key8(4))) + require.True(t, found) + }) + + t.Run("next empty list", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + got, found := l.PeekNext(clk.Now()) + require.False(t, found) + require.Nil(t, got) + }) + + t.Run("next one entry", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + l.Put(nv) + + got, found := l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), key.Key8(4))) + }) + + t.Run("next sorts by next check due", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(5)), + NextCheckDue: clk.Now().Add(-time.Minute), + } + l.Put(nv1) + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now().Add(-2 * time.Minute), + } + l.Put(nv2) + + got, found := l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv2.NodeID.Key())) + + nv2.NextCheckDue = clk.Now() + l.Put(nv2) + + got, found = l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv1.NodeID.Key())) + }) + + t.Run("next sorts by cpl descending after time", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(5)), + Cpl: 1, + NextCheckDue: clk.Now().Add(-time.Minute), + } + l.Put(nv1) + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + Cpl: 2, + NextCheckDue: clk.Now().Add(-time.Minute), + } + l.Put(nv2) + + got, found := l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv2.NodeID.Key())) + + nv2.NextCheckDue = clk.Now() + l.Put(nv2) + + got, found = l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv1.NodeID.Key())) + }) + + t.Run("next not due", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(5)), + NextCheckDue: clk.Now().Add(time.Minute), + } + l.Put(nv1) + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now().Add(2 * time.Minute), + } + l.Put(nv2) + + got, found := l.PeekNext(clk.Now()) + require.False(t, found) + require.Nil(t, got) + }) + + t.Run("mark ongoing", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(5)), + NextCheckDue: clk.Now().Add(time.Minute), + } + l.Put(nv1) + require.Equal(t, 1, l.PendingCount()) + require.Equal(t, 0, l.OngoingCount()) + require.Equal(t, 1, l.NodeCount()) + + l.MarkOngoing(kadtest.NewID(key.Key8(5)), clk.Now().Add(time.Minute)) + require.Equal(t, 0, l.PendingCount()) + require.Equal(t, 1, l.OngoingCount()) + require.Equal(t, 1, l.NodeCount()) + }) + + t.Run("mark ongoing changes next", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(5)), + NextCheckDue: clk.Now().Add(-2 * time.Minute), + } + l.Put(nv1) + + nv2 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now().Add(-1 * time.Minute), + } + l.Put(nv2) + + require.Equal(t, 2, l.PendingCount()) + require.Equal(t, 0, l.OngoingCount()) + require.Equal(t, 2, l.NodeCount()) + + // nv1 is the next node due + got, found := l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv1.NodeID.Key())) + + l.MarkOngoing(nv1.NodeID, clk.Now().Add(time.Minute)) + require.Equal(t, 1, l.PendingCount()) + require.Equal(t, 1, l.OngoingCount()) + require.Equal(t, 2, l.NodeCount()) + + // nv2 is now the next node due + got, found = l.PeekNext(clk.Now()) + require.True(t, found) + require.True(t, key.Equal(got.NodeID.Key(), nv2.NodeID.Key())) + }) + + t.Run("put removes from ongoing", func(t *testing.T) { + t.Parallel() + + clk := clock.NewMock() + l := NewNodeValueList[key.Key8]() + nv1 := &nodeValue[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + NextCheckDue: clk.Now(), + } + l.Put(nv1) + + require.Equal(t, 1, l.PendingCount()) + require.Equal(t, 0, l.OngoingCount()) + require.Equal(t, 1, l.NodeCount()) + + l.MarkOngoing(nv1.NodeID, clk.Now().Add(time.Minute)) + + require.Equal(t, 0, l.PendingCount()) + require.Equal(t, 1, l.OngoingCount()) + require.Equal(t, 1, l.NodeCount()) + + l.Put(nv1) + + require.Equal(t, 1, l.PendingCount()) + require.Equal(t, 0, l.OngoingCount()) + require.Equal(t, 1, l.NodeCount()) + }) +} + +func TestProbeMessageResponse(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow more than one check to run + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the probe state machine should attempt to contact the next node + st := state.(*StateProbeConnectivityCheck[key.Key8]) + + // notify that node was contacted successfully, with no closer nodes + state = sm.Advance(ctx, &EventProbeMessageResponse[key.Key8, kadtest.StrAddr]{ + NodeInfo: unaddressedNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeID: st.NodeID, + }, + Response: kadtest.NewResponse("resp", []kad.NodeInfo[key.Key8, kadtest.StrAddr]{ + kadtest.NewInfo(kadtest.NewID(key.Key8(4)), []kadtest.StrAddr{"addr_4"}), + kadtest.NewInfo(kadtest.NewID(key.Key8(6)), []kadtest.StrAddr{"addr_6"}), + }), + }) + + // node remains in routing table + _, found := rt.GetNode(key.Key8(4)) + require.True(t, found) + + // state machine now idle + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by another revisit interval + clk.Add(cfg.CheckInterval) + + // the probe state machine should attempt to contact node again, now it is time + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the connectivity check should be for the right node + require.True(t, key.Equal(key.Key8(4), st.NodeID.Key())) + + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeWaitingWithCapacity{}, state) +} + +func TestProbeMessageFailure(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + + // Set concurrency to allow more than one check to run + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by one revisit interval + clk.Add(cfg.CheckInterval) + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the probe state machine should attempt to contact the next node + st := state.(*StateProbeConnectivityCheck[key.Key8]) + + // notify that node was contacted successfully, with no closer nodes + state = sm.Advance(ctx, &EventProbeMessageFailure[key.Key8, kadtest.StrAddr]{ + NodeInfo: unaddressedNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeID: st.NodeID, + }, + }) + + // state machine announces node failure + require.IsType(t, &StateProbeNodeFailure[key.Key8]{}, state) + stf := state.(*StateProbeNodeFailure[key.Key8]) + + // the failure should be for the right node + require.True(t, key.Equal(key.Key8(4), stf.NodeID.Key())) + + // node has been removed from routing table + _, found := rt.GetNode(key.Key8(4)) + require.False(t, found) + + // advance time by another revisit interval + clk.Add(cfg.CheckInterval) + + // state machine still idle since node was removed + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeIdle{}, state) +} + +func TestProbeNotifyConnectivity(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + cfg.Concurrency = 2 + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + rt.AddNode(kadtest.NewID(key.Key8(3))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // after adding first node the probe should be idle since the + // connectivity check will be scheduled for the future (t0+10) + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + + // not time for a check yet + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by less than the revisit interval + // time is now (t0+2) + clk.Add(2 * time.Minute) + + // add a second node, which will be second in the probe list since it's + // time of next check will be later (t0+2+10=t0+12) + state = sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(3)), + }) + + // still not time for a check + require.IsType(t, &StateProbeIdle{}, state) + + // advance time past the first node's check time but before the second node's + // time is now (t0+2+9=t0+11) + clk.Add(9 * time.Minute) + + // notify that the node with key 4 was connected to successfully by another process + // this will delay the time for the next check to t0+11+10=to+21 + state = sm.Advance(ctx, &EventProbeNotifyConnectivity[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + + // still not time for a check + require.IsType(t, &StateProbeIdle{}, state) + + // advance time past second node's check time + // time is now (t0+2+9+4=t0+15) + clk.Add(4 * time.Minute) + + // Poll the state machine, it should now attempt to contact a node + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + + // the connectivity check should be for the right node, which is the one + // that did not get a connectivity notification + st := state.(*StateProbeConnectivityCheck[key.Key8]) + require.True(t, key.Equal(key.Key8(3), st.NodeID.Key())) + + // Poll the state machine, it should now waiting for a response but still have capacity + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeWaitingWithCapacity{}, state) +} + +func TestProbeTimeout(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + cfg := DefaultProbeConfig() + cfg.Clock = clk + cfg.CheckInterval = 10 * time.Minute + cfg.Timeout = 3 * time.Minute + cfg.Concurrency = 1 // one probe at a time, timeouts will be used to free capacity if there are more requests + + rt := simplert.New[key.Key8, kad.NodeID[key.Key8]](kadtest.NewID(key.Key8(128)), 5) + rt.AddNode(kadtest.NewID(key.Key8(4))) + rt.AddNode(kadtest.NewID(key.Key8(3))) + + sm, err := NewProbe[key.Key8, kadtest.StrAddr](rt, cfg) + require.NoError(t, err) + + // add a node + state := sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(4)), + }) + + // not time for a check yet + require.IsType(t, &StateProbeIdle{}, state) + + // advance time a little + clk.Add(time.Minute) + + // add another node + state = sm.Advance(ctx, &EventProbeAdd[key.Key8]{ + NodeID: kadtest.NewID(key.Key8(3)), + }) + + // not time for a check yet + require.IsType(t, &StateProbeIdle{}, state) + + // advance time by check interval + clk.Add(cfg.CheckInterval) + + // poll state machine + state = sm.Advance(ctx, &EventProbePoll{}) + + // the connectivity check should start + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + stm := state.(*StateProbeConnectivityCheck[key.Key8]) + require.True(t, key.Equal(key.Key8(4), stm.NodeID.Key())) + + // Poll the state machine, it should now waiting for a response with no capacity + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeWaitingAtCapacity{}, state) + + // advance time past the timeout + clk.Add(cfg.Timeout) + + // state machine announces node failure + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeNodeFailure[key.Key8]{}, state) + stf := state.(*StateProbeNodeFailure[key.Key8]) + + // the failure should be for the right node + require.True(t, key.Equal(key.Key8(4), stf.NodeID.Key())) + + // node has been removed from routing table + _, found := rt.GetNode(key.Key8(4)) + require.False(t, found) + + // state machine starts check for next node now there is capacity + state = sm.Advance(ctx, &EventProbePoll{}) + require.IsType(t, &StateProbeConnectivityCheck[key.Key8]{}, state) + stm = state.(*StateProbeConnectivityCheck[key.Key8]) + require.True(t, key.Equal(key.Key8(3), stm.NodeID.Key())) +} diff --git a/routing/simplert/table.go b/routing/simplert/table.go index fc43433..16a7b04 100644 --- a/routing/simplert/table.go +++ b/routing/simplert/table.go @@ -232,3 +232,19 @@ func min(a, b int) int { } return b } + +// Cpl returns the longest common prefix length the supplied key shares with the table's key. +func (rt *SimpleRT[K, N]) Cpl(kk K) int { + return rt.self.CommonPrefixLength(kk) +} + +// CplSize returns the number of nodes in the table whose longest common prefix with the table's key is of length cpl. +func (rt *SimpleRT[K, N]) CplSize(cpl int) int { + bid := cpl // cpl is simply the bucket id + nBuckets := len(rt.buckets) + if bid >= nBuckets { + bid = nBuckets - 1 + } + + return len(rt.buckets[bid]) +}