Skip to content

Commit

Permalink
Add include state machine (#95)
Browse files Browse the repository at this point in the history
Based on sm-bootstrap branch (#90)

This adds a state machine for running the include process described in
#45. The state machine
manages a queue of candidates nodes and processes them by checking
whether they respond to a find node request. Candidates that respond
with one or more closer nodes are considered live and added to the
routing table. Nodes that do not respond or do not provide any suggested
closer nodes are dropped from the queue. The number of concurrent checks
in flight is configurable.

Not done yet:

- [ ] check timeouts
- [ ] removing nodes failing checks from routing table
- [ ] notifying of unroutable nodes
  • Loading branch information
iand authored Aug 16, 2023
1 parent accf5ea commit 5d7f68c
Show file tree
Hide file tree
Showing 5 changed files with 872 additions and 45 deletions.
200 changes: 169 additions & 31 deletions coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (q *eventQueue[E]) Enqueue(ctx context.Context, e E) {
}

// Dequeue reads an event from the queue. It returns the event and a true value
// if an event was read or the zero value if the event type and false if no event
// if an event was read or the zero value of the event type and false if no event
// was read. This method is non-blocking.
func (q *eventQueue[E]) Dequeue(ctx context.Context) (E, bool) {
select {
Expand All @@ -55,6 +55,10 @@ func (q *eventQueue[E]) Dequeue(ctx context.Context) (E, bool) {
}
}

// FindNodeRequestFunc is a function that creates a request to find the supplied node id
// TODO: consider this being a first class method of the Endpoint
type FindNodeRequestFunc[K kad.Key[K], A kad.Address[A]] func(kad.NodeID[K]) (address.ProtocolID, kad.Request[K, A])

// A Coordinator coordinates the state machines that comprise a Kademlia DHT
// Currently this is only queries and bootstrapping but will expand to include other state machines such as
// routing table refresh, and reproviding.
Expand All @@ -77,12 +81,22 @@ type Coordinator[K kad.Key[K], A kad.Address[A]] struct {
// bootstrapEvents is a fifo queue of events that are to be processed by the bootstrap state machine
bootstrapEvents *eventQueue[routing.BootstrapEvent]

// include is the include state machine, responsible for including candidate nodes into the routing table
include StateMachine[routing.IncludeState, routing.IncludeEvent]

// includeEvents is a fifo queue of events that are to be processed by the include state machine
includeEvents *eventQueue[routing.IncludeEvent]

// rt is the routing table used to look up nodes by distance
rt kad.RoutingTable[K, kad.NodeID[K]]

// ep is the message endpoint used to send requests
ep endpoint.Endpoint[K, A]

// findNodeFn is a function that creates a find node request that may be understod by the endpoint
// TODO: thiis should be a function of the endpoint
findNodeFn FindNodeRequestFunc[K, A]

// queue not used
queue event.EventQueue

Expand Down Expand Up @@ -155,7 +169,7 @@ func DefaultConfig() *Config {
}
}

func NewCoordinator[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], ep endpoint.Endpoint[K, A], rt kad.RoutingTable[K, kad.NodeID[K]], cfg *Config) (*Coordinator[K, A], error) {
func NewCoordinator[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], ep endpoint.Endpoint[K, A], fn FindNodeRequestFunc[K, A], rt kad.RoutingTable[K, kad.NodeID[K]], cfg *Config) (*Coordinator[K, A], error) {
if cfg == nil {
cfg = DefaultConfig()
} else if err := cfg.Validate(); err != nil {
Expand All @@ -182,17 +196,34 @@ func NewCoordinator[K kad.Key[K], A kad.Address[A]](self kad.NodeID[K], ep endpo

bootstrap, err := routing.NewBootstrap(self, bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
return nil, fmt.Errorf("bootstrap: %w", err)
}

includeCfg := routing.DefaultIncludeConfig()
includeCfg.Clock = cfg.Clock
includeCfg.Timeout = cfg.QueryTimeout

// TODO: expose config
// includeCfg.QueueCapacity = cfg.IncludeQueueCapacity
// includeCfg.Concurrency = cfg.IncludeConcurrency
// includeCfg.Timeout = cfg.IncludeTimeout

include, err := routing.NewInclude[K, A](rt, includeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}
return &Coordinator[K, A]{
self: self,
cfg: *cfg,
ep: ep,
findNodeFn: fn,
rt: rt,
pool: qp,
poolEvents: newEventQueue[query.PoolEvent](20), // 20 is abitrary, move to config
bootstrap: bootstrap,
bootstrapEvents: newEventQueue[routing.BootstrapEvent](20), // 20 is abitrary, move to config
include: include,
includeEvents: newEventQueue[routing.IncludeEvent](20), // 20 is abitrary, move to config
outboundEvents: make(chan KademliaEvent, 20),
queue: event.NewChanQueue(DefaultChanqueueCapacity),
planner: event.NewSimplePlanner(cfg.Clock),
Expand All @@ -207,8 +238,28 @@ func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool {
ctx, span := util.StartSpan(ctx, "Coordinator.RunOne")
defer span.End()

// Process state machines in priority order

// Give the bootstrap state machine priority
// No queries can be run while a bootstrap is in progress
if c.advanceBootstrap(ctx) {
return true
}

// Attempt to advance the include state machine so candidate nodes
// are added to the routing table
if c.advanceInclude(ctx) {
return true
}

// Attempt to advance an outbound query
if c.advancePool(ctx) {
return true
}

return false
}

func (c *Coordinator[K, A]) advanceBootstrap(ctx context.Context) bool {
bev, ok := c.bootstrapEvents.Dequeue(ctx)
if !ok {
bev = &routing.EventBootstrapPoll{}
Expand All @@ -217,11 +268,11 @@ func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool {
bstate := c.bootstrap.Advance(ctx, bev)
switch st := bstate.(type) {
case *routing.StateBootstrapMessage[K, A]:
c.sendBootstrapMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats)
c.sendBootstrapFindNode(ctx, st.NodeID, st.QueryID, st.Stats)
return true

case *routing.StateBootstrapWaiting:
// bootstrap waiting for a message response, don't proceed with other state machines
// bootstrap waiting for a message response, proceed with other state machines
return false

case *routing.StateBootstrapFinished:
Expand All @@ -232,12 +283,49 @@ func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool {

case *routing.StateBootstrapIdle:
// bootstrap not running, can proceed to other state machines
break
return false
default:
panic(fmt.Sprintf("unexpected bootstrap state: %T", st))
}
}

// Attempt to advance an outbound query
func (c *Coordinator[K, A]) advanceInclude(ctx context.Context) bool {
// Attempt to advance the include state machine so candidate nodes
// are added to the routing table
iev, ok := c.includeEvents.Dequeue(ctx)
if !ok {
iev = &routing.EventIncludePoll{}
}
istate := c.include.Advance(ctx, iev)
switch st := istate.(type) {
case *routing.StateIncludeFindNodeMessage[K, A]:
// include wants to send a find node message to a node
c.sendIncludeFindNode(ctx, st.NodeInfo)
return true
case *routing.StateIncludeRoutingUpdated[K, A]:
// a node has been included in the routing table
c.outboundEvents <- &KademliaRoutingUpdatedEvent[K, A]{
NodeInfo: st.NodeInfo,
}
return true
case *routing.StateIncludeWaitingAtCapacity:
// nothing to do except wait for message response or timeout
return false
case *routing.StateIncludeWaitingWithCapacity:
// nothing to do except wait for message response or timeout
return false
case *routing.StateIncludeWaitingFull:
// nothing to do except wait for message response or timeout
return false
case *routing.StateIncludeIdle:
// nothing to do except wait for message response or timeout
return false
default:
panic(fmt.Sprintf("unexpected include state: %T", st))
}
}

func (c *Coordinator[K, A]) advancePool(ctx context.Context) bool {
pev, ok := c.poolEvents.Dequeue(ctx)
if !ok {
pev = &query.EventPoolPoll{}
Expand All @@ -249,26 +337,26 @@ func (c *Coordinator[K, A]) RunOne(ctx context.Context) bool {
c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats)
return true
case *query.StatePoolWaitingAtCapacity:
// TODO
// nothing to do except wait for message response or timeout
return false
case *query.StatePoolWaitingWithCapacity:
// TODO
// nothing to do except wait for message response or timeout
return false
case *query.StatePoolQueryFinished:
c.outboundEvents <- &KademliaOutboundQueryFinishedEvent{
QueryID: st.QueryID,
Stats: st.Stats,
}
return true

// TODO
case *query.StatePoolQueryTimeout:
// TODO
return false
case *query.StatePoolIdle:
// TODO
// nothing to do
return false
default:
panic(fmt.Sprintf("unexpected pool state: %T", st))
}

return false
}

func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
Expand Down Expand Up @@ -326,8 +414,8 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres
}
}

func (c *Coordinator[K, A]) sendBootstrapMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapMessage")
func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeID[K], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapFindNode")
defer span.End()

onSendError := func(ctx context.Context, err error) {
Expand Down Expand Up @@ -373,13 +461,65 @@ func (c *Coordinator[K, A]) sendBootstrapMessage(ctx context.Context, protoID ad
c.bootstrapEvents.Enqueue(ctx, bev)
}

protoID, msg := c.findNodeFn(c.self)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
}

func (c *Coordinator[K, A]) sendIncludeFindNode(ctx context.Context, to kad.NodeInfo[K, A]) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendIncludeFindNode")
defer span.End()

onSendError := func(ctx context.Context, err error) {
if errors.Is(err, endpoint.ErrCannotConnect) {
// here we can notify that the peer is unroutable, which would feed into peerstore and routing table
// TODO: remove from routing table
return
}

iev := &routing.EventIncludeMessageFailure[K, A]{
NodeInfo: to,
Error: err,
}
c.includeEvents.Enqueue(ctx, iev)
}

onMessageResponse := func(ctx context.Context, resp kad.Response[K, A], err error) {
if err != nil {
onSendError(ctx, err)
return
}

iev := &routing.EventIncludeMessageResponse[K, A]{
NodeInfo: to,
Response: resp,
}
c.includeEvents.Enqueue(ctx, iev)

if resp != nil {
candidates := resp.CloserNodes()
if len(candidates) > 0 {
// ignore error here
c.AddNodes(ctx, candidates)
}
}
}

// this might be new node addressing info
c.ep.MaybeAddToPeerstore(ctx, to, c.cfg.PeerstoreTTL)

protoID, msg := c.findNodeFn(c.self)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
}

func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error {
ctx, span := util.StartSpan(ctx, "Coordinator.StartQuery")
defer span.End()
knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20)

qev := &query.EventPoolAddQuery[K, A]{
Expand All @@ -394,6 +534,8 @@ func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryI
}

func (c *Coordinator[K, A]) StopQuery(ctx context.Context, queryID query.QueryID) error {
ctx, span := util.StartSpan(ctx, "Coordinator.StopQuery")
defer span.End()
qev := &query.EventPoolStopQuery{
QueryID: queryID,
}
Expand All @@ -402,33 +544,29 @@ func (c *Coordinator[K, A]) StopQuery(ctx context.Context, queryID query.QueryID
}

// AddNodes suggests new DHT nodes and their associated addresses to be added to the routing table.
// If the routing table is been updated as a result of this operation a KademliaRoutingUpdatedEvent event is emitted.
// If the routing table is updated as a result of this operation a KademliaRoutingUpdatedEvent event is emitted.
func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K, A]) error {
ctx, span := util.StartSpan(ctx, "Coordinator.AddNodes")
defer span.End()
for _, info := range infos {
if key.Equal(info.ID().Key(), c.self.Key()) {
// skip self
continue
}
isNew := c.rt.AddNode(info.ID())
c.ep.MaybeAddToPeerstore(ctx, info, c.cfg.PeerstoreTTL)

if isNew {
c.outboundEvents <- &KademliaRoutingUpdatedEvent[K, A]{
NodeInfo: info,
}
// inject a new node into the coordinator's includeEvents queue
iev := &routing.EventIncludeAddCandidate[K, A]{
NodeInfo: info,
}
c.includeEvents.Enqueue(ctx, iev)
}

return nil
}

// FindNodeRequestFunc is a function that creates a request to find the supplied node id
// TODO: consider this being a first class method of the Endpoint
type FindNodeRequestFunc[K kad.Key[K], A kad.Address[A]] func(kad.NodeID[K]) (address.ProtocolID, kad.Request[K, A])

// Bootstrap instructs the coordinator to begin bootstrapping the routing table.
// While bootstrap is in progress, no other queries will make progress.
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K], fn FindNodeRequestFunc[K, A]) error {
protoID, msg := fn(c.self)
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K]) error {
protoID, msg := c.findNodeFn(c.self)

bev := &routing.EventBootstrapStart[K, A]{
ProtocolID: protoID,
Expand Down
Loading

0 comments on commit 5d7f68c

Please sign in to comment.