Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Commit

Permalink
routing: integrate probe state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Aug 31, 2023
1 parent 2bf54bd commit ab50ba9
Show file tree
Hide file tree
Showing 6 changed files with 488 additions and 57 deletions.
3 changes: 2 additions & 1 deletion internal/nettest/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/network/address"
"github.com/plprobelab/go-kademlia/routing"
)

type Node[K kad.Key[K], A kad.Address[A]] struct {
NodeInfo kad.NodeInfo[K, A]
Router *Router[K, A]
RoutingTable kad.RoutingTable[K, kad.NodeID[K]]
RoutingTable routing.RoutingTableCpl[K, kad.NodeID[K]]
}

type Topology[K kad.Key[K], A kad.Address[A]] struct {
Expand Down
5 changes: 5 additions & 0 deletions kademlia/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ type Behaviour[I DhtEvent, O DhtEvent] interface {

// Notify informs the behaviour of an event. The behaviour may perform the event
// immediately and queue the result, causing the behaviour to become ready.
// It is safe to call Notify from the Perform method.
Notify(ctx context.Context, ev I)

// Perform gives the behaviour the opportunity to perform work or to return a queued
// result as an event.
Perform(ctx context.Context) (O, bool)
}

type SM[E any, S any] interface {
Advance(context.Context, E) S
}

type WorkQueueFunc[E DhtEvent] func(context.Context, E) bool

// WorkQueue is buffered queue of work to be performed.
Expand Down
54 changes: 54 additions & 0 deletions kademlia/behaviour_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kademlia

import (
"context"
"fmt"
"reflect"
"testing"
)

type NullSM[E any, S any] struct{}

func (NullSM[E, S]) Advance(context.Context, E) S {
var v S
return v
}

type RecordingSM[E any, S any] struct {
State S
Received E
}

func NewRecordingSM[E any, S any](response S) *RecordingSM[E, S] {
return &RecordingSM[E, S]{
State: response,
}
}

func (r *RecordingSM[E, S]) Advance(ctx context.Context, e E) S {
r.Received = e
return r.State
}

// expectBehaviourEvent selects on a behaviour's ready channel until it becomes ready and then checks the perform
// mehtod for the expected event type. Unexpected events are ignored and selecting resumes.
// The function returns when an event matching the type of expected is received or when the context is cancelled.
func expectBehaviourEvent[I DhtEvent, O DhtEvent](t *testing.T, ctx context.Context, b Behaviour[I, O], expected O) (O, error) {
t.Helper()
for {
select {
case <-b.Ready():
ev, ok := b.Perform(ctx)
if !ok {
continue
}
t.Logf("saw event: %T\n", ev)
if reflect.TypeOf(ev) == reflect.TypeOf(expected) {
return ev, nil
}
case <-ctx.Done():
var v O
return v, fmt.Errorf("test deadline exceeded")
}
}
}
15 changes: 13 additions & 2 deletions kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func DefaultConfig() *Config {
}
}

func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt kad.RoutingTable[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) {
func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A], rt routing.RoutingTableCpl[K, kad.NodeID[K]], cfg *Config) (*Dht[K, A], error) {
if cfg == nil {
cfg = DefaultConfig()
} else if err := cfg.Validate(); err != nil {
Expand Down Expand Up @@ -168,7 +168,18 @@ func NewDht[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], rtr Router[K, A]
return nil, fmt.Errorf("include: %w", err)
}

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, cfg.Logger)
probeCfg := routing.DefaultProbeConfig()
probeCfg.Clock = cfg.Clock
probeCfg.Timeout = cfg.QueryTimeout

// TODO: expose config
// probeCfg.Concurrency = cfg.ProbeConcurrency
probe, err := routing.NewProbe[K, A](rt, probeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}

routingBehaviour := NewRoutingBehaviour[K, A](self, bootstrap, include, probe, cfg.Logger)

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger)

Expand Down
165 changes: 111 additions & 54 deletions kademlia/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import (
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/routing"
"github.com/plprobelab/go-kademlia/util"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/slog"
)

type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct {
// self is the node id of the system the dht is running on
self kad.NodeID[K]
// bootstrap is the bootstrap state machine, responsible for bootstrapping the routing table
bootstrap *routing.Bootstrap[K, A]
include *routing.Include[K, A]
bootstrap SM[routing.BootstrapEvent, routing.BootstrapState]

// include is the inclusion state machine, responsible for vetting nodes before including them in the routing table
include SM[routing.IncludeEvent, routing.IncludeState]

// probe is the node probing state machine, responsible for periodically checking connectivity of nodes in the routing table
probe SM[routing.ProbeEvent, routing.ProbeState]

pendingMu sync.Mutex
pending []DhtEvent
Expand All @@ -26,26 +32,34 @@ type RoutingBehaviour[K kad.Key[K], A kad.Address[A]] struct {
logger *slog.Logger
}

func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap *routing.Bootstrap[K, A], include *routing.Include[K, A], logger *slog.Logger) *RoutingBehaviour[K, A] {
func NewRoutingBehaviour[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], bootstrap SM[routing.BootstrapEvent, routing.BootstrapState], include SM[routing.IncludeEvent, routing.IncludeState], probe SM[routing.ProbeEvent, routing.ProbeState], logger *slog.Logger) *RoutingBehaviour[K, A] {
r := &RoutingBehaviour[K, A]{
self: self,
bootstrap: bootstrap,
include: include,
probe: probe,
ready: make(chan struct{}, 1),
logger: logger,
}
return r
}

func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) {
ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Perform")
ctx, span := util.StartSpan(ctx, "RoutingBehaviour.Notify")
defer span.End()

r.pendingMu.Lock()
defer r.pendingMu.Unlock()
r.notify(ctx, ev)
}

// notify must only be called while r.pendingMu is held
func (r *RoutingBehaviour[K, A]) notify(ctx context.Context, ev DhtEvent) {
ctx, span := util.StartSpan(ctx, "RoutingBehaviour.notify")
defer span.End()
switch ev := ev.(type) {
case *EventDhtStartBootstrap[K, A]:
span.SetAttributes(attribute.String("event", "EventDhtStartBootstrap"))
cmd := &routing.EventBootstrapStart[K, A]{
ProtocolID: ev.ProtocolID,
Message: ev.Message,
Expand All @@ -58,6 +72,7 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) {
}

case *EventDhtAddNodeInfo[K, A]:
span.SetAttributes(attribute.String("event", "EventDhtAddNodeInfo"))
// Ignore self
if key.Equal(ev.NodeInfo.ID().Key(), r.self.Key()) {
break
Expand All @@ -71,7 +86,19 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) {
r.pending = append(r.pending, next)
}

case *EventRoutingUpdated[K, A]:
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"))
cmd := &routing.EventProbeAdd[K]{
NodeID: ev.NodeInfo.ID(),
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

case *EventGetClosestNodesSuccess[K, A]:
span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String())))
switch ev.QueryID {
case "bootstrap":
for _, info := range ev.ClosestNodes {
Expand Down Expand Up @@ -101,10 +128,23 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) {
r.pending = append(r.pending, next)
}

case "probe":
cmd := &routing.EventProbeMessageResponse[K, A]{
NodeInfo: ev.To,
Response: ClosestNodesFakeResponse(ev.Target, ev.ClosestNodes),
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
}
case *EventGetClosestNodesFailure[K, A]:
span.SetAttributes(attribute.String("event", "EventGetClosestNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", string(ev.To.ID().String())))
span.RecordError(ev.Err)
switch ev.QueryID {
case "bootstrap":
cmd := &routing.EventBootstrapMessageFailure[K]{
Expand All @@ -121,11 +161,21 @@ func (r *RoutingBehaviour[K, A]) Notify(ctx context.Context, ev DhtEvent) {
NodeInfo: ev.To,
Error: ev.Err,
}
// attempt to advance the include
// attempt to advance the include state machine
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}
case "probe":
cmd := &routing.EventProbeMessageFailure[K, A]{
NodeInfo: ev.To,
Error: ev.Err,
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
Expand Down Expand Up @@ -169,59 +219,24 @@ func (r *RoutingBehaviour[K, A]) Perform(ctx context.Context) (DhtEvent, bool) {
return ev, true
}

// attempt to advance the bootstrap state machine
bstate := r.bootstrap.Advance(ctx, &routing.EventBootstrapPoll{})
switch st := bstate.(type) {

case *routing.StateBootstrapMessage[K, A]:
return &EventOutboundGetClosestNodes[K, A]{
QueryID: "bootstrap",
To: NewNodeAddr[K, A](st.NodeID, nil),
Target: st.Message.Target(),
Notifiee: r,
}, true

case *routing.StateBootstrapWaiting:
// bootstrap waiting for a message response, nothing to do
case *routing.StateBootstrapFinished:
return &EventBootstrapFinished{
Stats: st.Stats,
}, true
case *routing.StateBootstrapIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected bootstrap state: %T", st))
// poll the child state machines in priority order to give each an opportunity to perform work

ev, ok := r.advanceBootstrap(ctx, &routing.EventBootstrapPoll{})
if ok {
return ev, true
}

// attempt to advance the include state machine
istate := r.include.Advance(ctx, &routing.EventIncludePoll{})
switch st := istate.(type) {
case *routing.StateIncludeFindNodeMessage[K, A]:
// include wants to send a find node message to a node
return &EventOutboundGetClosestNodes[K, A]{
QueryID: "include",
To: st.NodeInfo,
Target: st.NodeInfo.ID().Key(),
Notifiee: r,
}, true

case *routing.StateIncludeRoutingUpdated[K, A]:
// a node has been included in the routing table
return &EventRoutingUpdated[K, A]{
NodeInfo: st.NodeInfo,
}, true
case *routing.StateIncludeWaitingAtCapacity:
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingWithCapacity:
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingFull:
// nothing to do except wait for message response or timeout
case *routing.StateIncludeIdle:
// nothing to do except wait for message response or timeout
default:
panic(fmt.Sprintf("unexpected include state: %T", st))
ev, ok = r.advanceInclude(ctx, &routing.EventIncludePoll{})
if ok {
return ev, true
}

ev, ok = r.advanceProbe(ctx, &routing.EventProbePoll{})
if ok {
return ev, true
}

// finally check if any pending events were accumulated in the meantime
if len(r.pending) == 0 {
return nil, false
}
Expand Down Expand Up @@ -273,6 +288,13 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing.

case *routing.StateIncludeRoutingUpdated[K, A]:
// a node has been included in the routing table

// notify other routing state machines that there is a new node in the routing table
r.notify(ctx, &EventRoutingUpdated[K, A]{
NodeInfo: st.NodeInfo,
})

// return the event to notify outwards too
return &EventRoutingUpdated[K, A]{
NodeInfo: st.NodeInfo,
}, true
Expand All @@ -290,3 +312,38 @@ func (r *RoutingBehaviour[K, A]) advanceInclude(ctx context.Context, ev routing.

return nil, false
}

func (r *RoutingBehaviour[K, A]) advanceProbe(ctx context.Context, ev routing.ProbeEvent) (DhtEvent, bool) {
ctx, span := util.StartSpan(ctx, "RoutingBehaviour.advanceProbe")
defer span.End()
st := r.probe.Advance(ctx, ev)
switch st := st.(type) {
case *routing.StateProbeConnectivityCheck[K, A]:
// include wants to send a find node message to a node
return &EventOutboundGetClosestNodes[K, A]{
QueryID: "probe",
To: st.NodeInfo,
Target: st.NodeInfo.ID().Key(),
Notifiee: r,
}, true
case *routing.StateProbeNodeFailure[K, A]:
// a node has failed a connectivity check been removed from the routing table and the probe list
// add the node to the inclusion list for a second chance
r.notify(ctx, &EventDhtAddNodeInfo[K, A]{
NodeInfo: st.NodeInfo,
})
case *routing.StateProbeWaitingAtCapacity:
// the probe state machine is waiting for responses for checks and the maximum number of concurrent checks has been reached.
// nothing to do except wait for message response or timeout
case *routing.StateProbeWaitingWithCapacity:
// the probe state machine is waiting for responses for checks but has capacity to perform more
// nothing to do except wait for message response or timeout
case *routing.StateProbeIdle:
// the probe state machine is not running any checks.
// nothing to do except wait for message response or timeout
default:
panic(fmt.Sprintf("unexpected include state: %T", st))
}

return nil, false
}
Loading

0 comments on commit ab50ba9

Please sign in to comment.