diff --git a/go.mod b/go.mod index dd6990e..801721a 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,9 @@ require ( github.com/libp2p/go-msgio v0.3.0 github.com/multiformats/go-multiaddr v0.11.0 github.com/multiformats/go-multihash v0.2.3 - github.com/plprobelab/go-kademlia v0.0.0-20230823114513-9b9e606066c9 + github.com/plprobelab/go-kademlia v0.0.0-20230901130940-286ab4ceca60 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.16.0 go.uber.org/zap/exp v0.1.0 golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 google.golang.org/protobuf v1.31.0 @@ -84,7 +85,6 @@ require ( github.com/quic-go/webtransport-go v0.5.3 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/dig v1.17.0 // indirect diff --git a/go.sum b/go.sum index cc2c839..de7bd08 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -114,8 +115,11 @@ github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -179,6 +183,7 @@ github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dz github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -206,6 +211,7 @@ github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqd github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= @@ -222,6 +228,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/plprobelab/go-kademlia v0.0.0-20230823114513-9b9e606066c9 h1:qqrJgUNOCAozZDkL0gH57FUi+aXj/d/SdldaLAZUFUU= github.com/plprobelab/go-kademlia v0.0.0-20230823114513-9b9e606066c9/go.mod h1:OMu6Kyh5AetV3uLRVSZlp6WcwrZUn3nyRFaRuJxVWJQ= +github.com/plprobelab/go-kademlia v0.0.0-20230901130940-286ab4ceca60 h1:fgo8NhFeL+p7atahZNtvo1BfWClUNRvAjzC2ikEwvsY= +github.com/plprobelab/go-kademlia v0.0.0-20230901130940-286ab4ceca60/go.mod h1:OMu6Kyh5AetV3uLRVSZlp6WcwrZUn3nyRFaRuJxVWJQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -279,6 +287,7 @@ github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -450,6 +459,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/nettest/topology.go b/internal/nettest/topology.go index 56b5d74..eb8710f 100644 --- a/internal/nettest/topology.go +++ b/internal/nettest/topology.go @@ -9,12 +9,13 @@ import ( "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/routing" ) type Node[K kad.Key[K], A kad.Address[A]] struct { NodeInfo kad.NodeInfo[K, A] Router *Router[K, A] - RoutingTable kad.RoutingTable[K, kad.NodeID[K]] + RoutingTable routing.RoutingTableCpl[K, kad.NodeID[K]] } type Topology[K kad.Key[K], A kad.Address[A]] struct { diff --git a/kademlia/behaviour.go b/kademlia/behaviour.go index b9c146a..6d847ea 100644 --- a/kademlia/behaviour.go +++ b/kademlia/behaviour.go @@ -22,6 +22,7 @@ type Behaviour[I DhtEvent, O DhtEvent] interface { // Notify informs the behaviour of an event. The behaviour may perform the event // immediately and queue the result, causing the behaviour to become ready. + // It is safe to call Notify from the Perform method. Notify(ctx context.Context, ev I) // Perform gives the behaviour the opportunity to perform work or to return a queued @@ -29,6 +30,10 @@ type Behaviour[I DhtEvent, O DhtEvent] interface { Perform(ctx context.Context) (O, bool) } +type SM[E any, S any] interface { + Advance(context.Context, E) S +} + type WorkQueueFunc[E DhtEvent] func(context.Context, E) bool // WorkQueue is buffered queue of work to be performed. diff --git a/kademlia/behaviour_test.go b/kademlia/behaviour_test.go new file mode 100644 index 0000000..0efd04a --- /dev/null +++ b/kademlia/behaviour_test.go @@ -0,0 +1,54 @@ +package kademlia + +import ( + "context" + "fmt" + "reflect" + "testing" +) + +type NullSM[E any, S any] struct{} + +func (NullSM[E, S]) Advance(context.Context, E) S { + var v S + return v +} + +type RecordingSM[E any, S any] struct { + State S + Received E +} + +func NewRecordingSM[E any, S any](response S) *RecordingSM[E, S] { + return &RecordingSM[E, S]{ + State: response, + } +} + +func (r *RecordingSM[E, S]) Advance(ctx context.Context, e E) S { + r.Received = e + return r.State +} + +// expectBehaviourEvent selects on a behaviour's ready channel until it becomes ready and then checks the perform +// mehtod for the expected event type. Unexpected events are ignored and selecting resumes. +// The function returns when an event matching the type of expected is received or when the context is cancelled. +func expectBehaviourEvent[I DhtEvent, O DhtEvent](t *testing.T, ctx context.Context, b Behaviour[I, O], expected O) (O, error) { + t.Helper() + for { + select { + case <-b.Ready(): + ev, ok := b.Perform(ctx) + if !ok { + continue + } + t.Logf("saw event: %T\n", ev) + if reflect.TypeOf(ev) == reflect.TypeOf(expected) { + return ev, nil + } + case <-ctx.Done(): + var v O + return v, fmt.Errorf("test deadline exceeded") + } + } +} diff --git a/kademlia/dht.go b/kademlia/dht.go index 8d66868..ed9bc21 100644 --- a/kademlia/dht.go +++ b/kademlia/dht.go @@ -123,7 +123,7 @@ func DefaultConfig() *Config { } } -func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt kad.RoutingTable[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) { +func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt routing.RoutingTableCpl[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) { if cfg == nil { cfg = DefaultConfig() } else if err := cfg.Validate(); err != nil { @@ -168,7 +168,18 @@ func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A] return nil, fmt.Errorf("include: %w", err) } - routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, cfg.Logger) + probeCfg := routing.DefaultProbeConfig() + probeCfg.Clock = cfg.Clock + probeCfg.Timeout = cfg.QueryTimeout + + // TODO: expose config + // probeCfg.Concurrency = cfg.ProbeConcurrency + probe, err := routing.NewProbe[K, A](rt, probeCfg) + if err != nil { + return nil, fmt.Errorf("include: %w", err) + } + + routingBehaviour := NewRoutingBehaviour[K, A](self, bootstrap, include, probe, cfg.Logger) networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger) diff --git a/kademlia/routing.go b/kademlia/routing.go index d5929d4..94d6bdf 100644 --- a/kademlia/routing.go +++ b/kademlia/routing.go @@ -9,6 +9,7 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/routing" "github.com/plprobelab/go-kademlia/util" + "go.opentelemetry.io/otel/attribute" "golang.org/x/exp/slog" ) @@ -16,8 +17,13 @@ type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct { // self is the node id of the system the dht is running on self kad.NodeID[K] // bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table - bootstrap *routing.Bootstrap[K, A] - include *routing.Include[K, A] + bootstrap SM[routing.BootstrapEvent, routing.BootstrapState] + + // include is the inclusion state machine, responsible for vetting nodes before including them in the routing table + include SM[routing.IncludeEvent, routing.IncludeState] + + // probe is the node probing state machine, responsible for periodically checking connectivity of nodes in the routing table + probe SM[routing.ProbeEvent, routing.ProbeState] pendingMu sync.Mutex pending []DhtEvent @@ -26,11 +32,12 @@ type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct { logger *slog.Logger } -func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap *routing.Bootstrap[K, A], include *routing.Include[K, A], logger *slog.Logger) *RoutingBehaviour[K, A] { +func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap SM[routing.BootstrapEvent, routing.BootstrapState], include SM[routing.IncludeEvent, routing.IncludeState], probe SM[routing.ProbeEvent, routing.ProbeState], logger *slog.Logger) *RoutingBehaviour[K, A] { r := &RoutingBehaviour[K, A]{ self: self, bootstrap: bootstrap, include: include, + probe: probe, ready: make(chan struct{}, 1), logger: logger, } @@ -38,14 +45,21 @@ func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], boo } func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { - ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Perform") + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Notify") defer span.End() r.pendingMu.Lock() defer r.pendingMu.Unlock() + r.notify(ctx, ev) +} +// notify must only be called while r.pendingMu is held +func (r *RoutingBehaviour[K, A]) notify(ctx context.Context, ev DhtEvent) { + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.notify") + defer span.End() switch ev := ev.(type) { case *EventDhtStartBootstrap[K, A]: + span.SetAttributes(attribute.String("event", "EventDhtStartBootstrap")) cmd := &routing.EventBootstrapStart[K, A]{ ProtocolID: ev.ProtocolID, Message: ev.Message, @@ -58,6 +72,7 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { } case *EventDhtAddNodeInfo[K, A]: + span.SetAttributes(attribute.String("event", "EventDhtAddNodeInfo")) // Ignore self if key.Equal(ev.NodeInfo.ID().Key(), r.self.Key()) { break @@ -71,7 +86,19 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { r.pending = append(r.pending, next) } + case *EventRoutingUpdated[K, A]: + span.SetAttributes(attribute.String("event", "EventRoutingUpdated")) + cmd := &routing.EventProbeAdd[K]{ + NodeID: ev.NodeInfo.ID(), + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + case *EventGetClosestNodesSuccess[K, A]: + span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String()))) switch ev.QueryID { case "bootstrap": for _, info := range ev.ClosestNodes { @@ -101,10 +128,23 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { r.pending = append(r.pending, next) } + case "probe": + cmd := &routing.EventProbeMessageResponse[K, A]{ + NodeInfo: ev.To, + Response: ClosestNodesFakeResponse(ev.Target, ev.ClosestNodes), + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) } case *EventGetClosestNodesFailure[K, A]: + span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String()))) + span.RecordError(ev.Err) switch ev.QueryID { case "bootstrap": cmd := &routing.EventBootstrapMessageFailure[K]{ @@ -121,11 +161,21 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) { NodeInfo: ev.To, Error: ev.Err, } - // attempt to advance the include + // attempt to advance the include state machine next, ok := r.advanceInclude(ctx, cmd) if ok { r.pending = append(r.pending, next) } + case "probe": + cmd := &routing.EventProbeMessageFailure[K, A]{ + NodeInfo: ev.To, + Error: ev.Err, + } + // attempt to advance the probe state machine + next, ok := r.advanceProbe(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) @@ -169,59 +219,24 @@ func (r *RoutingBehaviour[K, A]) Perform(ctx context.Context) (DhtEvent, bool) { return ev, true } - // attempt to advance the bootstrap state machine - bstate := r.bootstrap.Advance(ctx, &routing.EventBootstrapPoll{}) - switch st := bstate.(type) { - - case *routing.StateBootstrapMessage[K, A]: - return &EventOutboundGetClosestNodes[K, A]{ - QueryID: "bootstrap", - To: NewNodeAddr[K, A](st.NodeID, nil), - Target: st.Message.Target(), - Notifiee: r, - }, true - - case *routing.StateBootstrapWaiting: - // bootstrap waiting for a message response, nothing to do - case *routing.StateBootstrapFinished: - return &EventBootstrapFinished{ - Stats: st.Stats, - }, true - case *routing.StateBootstrapIdle: - // bootstrap not running, nothing to do - default: - panic(fmt.Sprintf("unexpected bootstrap state: %T", st)) + // poll the child state machines in priority order to give each an opportunity to perform work + + ev, ok := r.advanceBootstrap(ctx, &routing.EventBootstrapPoll{}) + if ok { + return ev, true } - // attempt to advance the include state machine - istate := r.include.Advance(ctx, &routing.EventIncludePoll{}) - switch st := istate.(type) { - case *routing.StateIncludeFindNodeMessage[K, A]: - // include wants to send a find node message to a node - return &EventOutboundGetClosestNodes[K, A]{ - QueryID: "include", - To: st.NodeInfo, - Target: st.NodeInfo.ID().Key(), - Notifiee: r, - }, true - - case *routing.StateIncludeRoutingUpdated[K, A]: - // a node has been included in the routing table - return &EventRoutingUpdated[K, A]{ - NodeInfo: st.NodeInfo, - }, true - case *routing.StateIncludeWaitingAtCapacity: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeWaitingWithCapacity: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeWaitingFull: - // nothing to do except wait for message response or timeout - case *routing.StateIncludeIdle: - // nothing to do except wait for message response or timeout - default: - panic(fmt.Sprintf("unexpected include state: %T", st)) + ev, ok = r.advanceInclude(ctx, &routing.EventIncludePoll{}) + if ok { + return ev, true + } + + ev, ok = r.advanceProbe(ctx, &routing.EventProbePoll{}) + if ok { + return ev, true } + // finally check if any pending events were accumulated in the meantime if len(r.pending) == 0 { return nil, false } @@ -273,6 +288,13 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing. case *routing.StateIncludeRoutingUpdated[K, A]: // a node has been included in the routing table + + // notify other routing state machines that there is a new node in the routing table + r.notify(ctx, &EventRoutingUpdated[K, A]{ + NodeInfo: st.NodeInfo, + }) + + // return the event to notify outwards too return &EventRoutingUpdated[K, A]{ NodeInfo: st.NodeInfo, }, true @@ -290,3 +312,45 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing. return nil, false } + +func (r *RoutingBehaviour[K, A]) advanceProbe(ctx context.Context, ev routing.ProbeEvent) (DhtEvent, bool) { + ctx, span := util.StartSpan(ctx, "RoutingBehaviour.advanceProbe") + defer span.End() + st := r.probe.Advance(ctx, ev) + switch st := st.(type) { + case *routing.StateProbeConnectivityCheck[K]: + // include wants to send a find node message to a node + return &EventOutboundGetClosestNodes[K, A]{ + QueryID: "probe", + To: unaddressedNodeInfo[K, A]{NodeID: st.NodeID}, + Target: st.NodeID.Key(), + Notifiee: r, + }, true + case *routing.StateProbeNodeFailure[K]: + // a node has failed a connectivity check been removed from the routing table and the probe list + // add the node to the inclusion list for a second chance + r.notify(ctx, &EventDhtAddNodeInfo[K, A]{ + NodeInfo: unaddressedNodeInfo[K, A]{NodeID: st.NodeID}, + }) + case *routing.StateProbeWaitingAtCapacity: + // the probe state machine is waiting for responses for checks and the maximum number of concurrent checks has been reached. + // nothing to do except wait for message response or timeout + case *routing.StateProbeWaitingWithCapacity: + // the probe state machine is waiting for responses for checks but has capacity to perform more + // nothing to do except wait for message response or timeout + case *routing.StateProbeIdle: + // the probe state machine is not running any checks. + // nothing to do except wait for message response or timeout + default: + panic(fmt.Sprintf("unexpected include state: %T", st)) + } + + return nil, false +} + +type unaddressedNodeInfo[K kad.Key[K], A kad.Address[A]] struct { + NodeID kad.NodeID[K] +} + +func (u unaddressedNodeInfo[K, A]) ID() kad.NodeID[K] { return u.NodeID } +func (u unaddressedNodeInfo[K, A]) Addresses() []A { return nil } diff --git a/kademlia/routing_test.go b/kademlia/routing_test.go new file mode 100644 index 0000000..f97aebf --- /dev/null +++ b/kademlia/routing_test.go @@ -0,0 +1,303 @@ +package kademlia + +import ( + "errors" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/iand/zikade/internal/kadtest" + "github.com/iand/zikade/internal/nettest" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/routing" + "github.com/stretchr/testify/require" + + "golang.org/x/exp/slog" +) + +func TestRoutingBehaviour(t *testing.T) { +} + +func TestRoutingStartBootstrapSendsEvent(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventDhtStartBootstrap[key.Key8, kadtest.StrAddr]{ + ProtocolID: address.ProtocolID("test"), + Message: kadtest.NewRequest("1", self.Key()), + SeedNodes: []kad.NodeID[key.Key8]{nodes[1].NodeInfo.ID()}, + } + + routingBehaviour.Notify(ctx, ev) + + // the event that should be passed to the bootstrap state machine + expected := &routing.EventBootstrapStart[key.Key8, kadtest.StrAddr]{ + ProtocolID: ev.ProtocolID, + Message: ev.Message, + KnownClosestNodes: ev.SeedNodes, + } + require.Equal(t, expected, bootstrap.Received) +} + +func TestRoutingBootstrapGetClosestNodesSuccess(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("bootstrap"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[2].NodeInfo}, + } + + routingBehaviour.Notify(ctx, ev) + + // bootstrap should receive message response event + require.IsType(t, &routing.EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]{}, bootstrap.Received) + + rev := bootstrap.Received.(*routing.EventBootstrapMessageResponse[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo.ID(), rev.NodeID) + require.Equal(t, ev.ClosestNodes, rev.Response.CloserNodes()) +} + +func TestRoutingBootstrapGetClosestNodesFailure(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to bootstrap + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + include := new(NullSM[routing.IncludeEvent, routing.IncludeState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + failure := errors.New("failed") + ev := &EventGetClosestNodesFailure[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("bootstrap"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + Err: failure, + } + + routingBehaviour.Notify(ctx, ev) + + // bootstrap should receive message response event + require.IsType(t, &routing.EventBootstrapMessageFailure[key.Key8]{}, bootstrap.Received) + + rev := bootstrap.Received.(*routing.EventBootstrapMessageFailure[key.Key8]) + require.Equal(t, nodes[1].NodeInfo.ID(), rev.NodeID) + require.Equal(t, failure, rev.Error) +} + +func TestRoutingAddNodeInfoSendsEvent(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventDhtAddNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeInfo: nodes[2].NodeInfo, + } + + routingBehaviour.Notify(ctx, ev) + + // the event that should be passed to the include state machine + expected := &routing.EventIncludeAddCandidate[key.Key8, kadtest.StrAddr]{ + NodeInfo: ev.NodeInfo, + } + require.Equal(t, expected, include.Received) +} + +func TestRoutingIncludeGetClosestNodesSuccess(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + ev := &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("include"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[2].NodeInfo}, + } + + routingBehaviour.Notify(ctx, ev) + + // include should receive message response event + require.IsType(t, &routing.EventIncludeMessageResponse[key.Key8, kadtest.StrAddr]{}, include.Received) + + rev := include.Received.(*routing.EventIncludeMessageResponse[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo, rev.NodeInfo) + require.Equal(t, ev.ClosestNodes, rev.Response.CloserNodes()) +} + +func TestRoutingIncludeGetClosestNodesFailure(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + + // records the event passed to include + include := NewRecordingSM[routing.IncludeEvent, routing.IncludeState](&routing.StateIncludeIdle{}) + + bootstrap := new(NullSM[routing.BootstrapEvent, routing.BootstrapState]) + probe := new(NullSM[routing.ProbeEvent, routing.ProbeState]) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + failure := errors.New("failed") + ev := &EventGetClosestNodesFailure[key.Key8, kadtest.StrAddr]{ + QueryID: query.QueryID("include"), + To: nodes[1].NodeInfo, + Target: nodes[0].NodeInfo.ID().Key(), + Err: failure, + } + + routingBehaviour.Notify(ctx, ev) + + // include should receive message response event + require.IsType(t, &routing.EventIncludeMessageFailure[key.Key8, kadtest.StrAddr]{}, include.Received) + + rev := include.Received.(*routing.EventIncludeMessageFailure[key.Key8, kadtest.StrAddr]) + require.Equal(t, nodes[1].NodeInfo, rev.NodeInfo) + require.Equal(t, failure, rev.Error) +} + +func TestRoutingIncludedNodeAddToProbeList(t *testing.T) { + ctx, cancel := kadtest.CtxShort(t) + defer cancel() + + clk := clock.NewMock() + _, nodes := nettest.LinearTopology(4, clk) + + self := nodes[0].NodeInfo.ID() + rt := nodes[0].RoutingTable + + includeCfg := routing.DefaultIncludeConfig() + includeCfg.Clock = clk + include, err := routing.NewInclude[key.Key8, kadtest.StrAddr](rt, includeCfg) + require.NoError(t, err) + + probeCfg := routing.DefaultProbeConfig() + probeCfg.Clock = clk + probeCfg.CheckInterval = 5 * time.Minute + probe, err := routing.NewProbe[key.Key8, kadtest.StrAddr](rt, probeCfg) + require.NoError(t, err) + + // ensure bootstrap is always idle + bootstrap := NewRecordingSM[routing.BootstrapEvent, routing.BootstrapState](&routing.StateBootstrapIdle{}) + + routingBehaviour := NewRoutingBehaviour[key.Key8, kadtest.StrAddr](self, bootstrap, include, probe, slog.Default()) + + // a new node to be included + candidate := nodes[len(nodes)-1].NodeInfo + + // the routing table should not contain the node yet + _, intable := rt.GetNode(candidate.ID().Key()) + require.False(t, intable) + + // notify that there is a new node to be included + routingBehaviour.Notify(ctx, &EventDhtAddNodeInfo[key.Key8, kadtest.StrAddr]{ + NodeInfo: candidate, + }) + + // collect the result of the notify + dev, ok := routingBehaviour.Perform(ctx) + require.True(t, ok) + + // include should be asking to send a message to the node + require.IsType(t, &EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]{}, dev) + + oev := dev.(*EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]) + + // advance time a little + clk.Add(time.Second) + + // notify a successful response back (best to use the notify included in the event even though it will be the behaviour's Notify method) + oev.Notifiee.Notify(ctx, &EventGetClosestNodesSuccess[key.Key8, kadtest.StrAddr]{ + QueryID: oev.QueryID, + To: oev.To, + Target: oev.Target, + ClosestNodes: []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[1].NodeInfo}, // must include one for include check to pass + }) + + // the routing table should now contain the node + _, intable = rt.GetNode(candidate.ID().Key()) + require.True(t, intable) + + // routing update event should be emitted from the include state machine + dev, ok = routingBehaviour.Perform(ctx) + require.True(t, ok) + require.IsType(t, &EventRoutingUpdated[key.Key8, kadtest.StrAddr]{}, dev) + + // advance time past the probe check interval + clk.Add(probeCfg.CheckInterval) + + // routing update event should be emitted from the include state machine + dev, ok = routingBehaviour.Perform(ctx) + require.True(t, ok) + require.IsType(t, &EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]{}, dev) + + // confirm that the message is for the correct node + oev = dev.(*EventOutboundGetClosestNodes[key.Key8, kadtest.StrAddr]) + require.Equal(t, query.QueryID("probe"), oev.QueryID) + require.Equal(t, candidate.ID(), oev.To.ID()) + require.Equal(t, candidate.ID().Key(), oev.Target) +}