Skip to content

Commit

Permalink
Add bootstrap state machine (#90)
Browse files Browse the repository at this point in the history
This adds a state machine for running the bootstrap query described in
#45. The state machine
is very simple (it runs a query that attempts to find the self node),
but it drives the design of the coordinator in a number of ways:

- The coordinator now manages two state machines: bootstrap and user
queries. It enforces the constraint that no user queries can be
progressed while the bootstrap is running. This establishes the pattern
for managing a set of state machines.
- Priority is simple: the coordinator first attempts to advance the
bootstrap state machine and only if it is idle, indicating the bootstrap
has no further work, will it proceed to advance the query pool state
machine.
- This changes the design of the state machine. Previously the state
machine reacted to an incoming event passed to the `Advance` method.
However this causes complications in the presence of multiple state
machines. What should happen if the bootstrap is waiting for a message
response but the caller attempts to start a user query? The coordinator
needs to remember the "add query" event until the bootstrap is complete,
so that event would need to remain on the coordinator's queue. But the
coordinator needs to read from the queue to detect if an incoming
message response is available for the bootstrap, without removing the
"add query" event. Thus we need separate queues for the two state
machines. Rather than manage those in the coordinator, we give each
state machine its own event queue. External callers enqueue events and
the state machine dequeues the next one each time it attempts to advance
state.
- ~The above change leads to a simple interface for state machines: an
Enqueue method for notifying a new event and an Advance method that
returns the next state.~
- **update 2023-08-15**: The above change leads to a simple interface
for state machines:an Advance method that accepts an event and returns
the next state.
- Coordinator methods like StartQuery and StopQuery now enqueue an event
for query pool
 - A new Bootstrap method enqueues an event for bootstrap state machine
- **update 2023-08-15**: the queues for the state machines are managed
by the coordinator, which allows state machines to be more cleanly
composed into hierarchies (for example, the state machine managing the
routing table include queue will use a query pool state machine and this
change eliminates the need to manage event queues of child state
machines)

There are still some ugly parts which I may be able to fix within the
scope of this PR:
- the coordinator implements a number of unused methods to conform to
the scheduler.Scheduler interface. All that is needed is the RunOne
method.
- ~the name of the bootstrap query needs to be factored into a constant
or remembered by the coordinator~ - coordinator now uses a separate
callback to deal with bootstrap query instead of checking query id
- ~events are action.Action interfaces so they can use the standard
queue interface. The Run method is unused. The queue could simply be a
channel or we could modify the queue interface to be parameterised by
type, allowing us to have a queue of BootstrapEvents~ (**removed
2023-08-15**)
- currently the bootstrap method expects a function that generates a
FindNode request for the given node. FindNode is such a fundamental DHT
operation that I think it should be provided as a method by the Endpoint


Fixes #47
  • Loading branch information
iand authored Aug 16, 2023
1 parent 16fd9b8 commit accf5ea
Show file tree
Hide file tree
Showing 8 changed files with 925 additions and 211 deletions.
433 changes: 268 additions & 165 deletions coord/coordinator.go

Large diffs are not rendered by default.

159 changes: 124 additions & 35 deletions coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ import (
"github.com/plprobelab/go-kademlia/sim"
)

var (
_ coordinatorInternalEvent = &eventUnroutablePeer[key.Key8]{}
_ coordinatorInternalEvent = &eventMessageFailed[key.Key8]{}
_ coordinatorInternalEvent = &eventMessageResponse[key.Key8, kadtest.StrAddr]{}
_ coordinatorInternalEvent = &eventAddQuery[key.Key8, kadtest.StrAddr]{}
_ coordinatorInternalEvent = &eventStopQuery[key.Key8]{}
)

func setupSimulation(t *testing.T, ctx context.Context) ([]kad.NodeInfo[key.Key8, kadtest.StrAddr], []*sim.Endpoint[key.Key8, kadtest.StrAddr], []kad.RoutingTable[key.Key8, kad.NodeID[key.Key8]], *sim.LiteSimulator) {
// create node identifiers
nodeCount := 4
Expand Down Expand Up @@ -116,10 +108,12 @@ const peerstoreTTL = 10 * time.Minute
var protoID = address.ProtocolID("/statemachine/1.0.0") // protocol ID for the test

// expectEventType selects on the event channel until an event of the expected type is sent.
func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) {
func expectEventType(t *testing.T, ctx context.Context, events <-chan KademliaEvent, expected KademliaEvent) (KademliaEvent, error) {
t.Helper()
for {
select {
case ev := <-events:
t.Logf("saw event: %T\n", ev)
if reflect.TypeOf(ev) == reflect.TypeOf(expected) {
return ev, nil
}
Expand All @@ -129,23 +123,6 @@ func expectEventType(ctx context.Context, events <-chan KademliaEvent, expected
}
}

// Ctx returns a Context and a CancelFunc. The context will be
// cancelled just before the test binary deadline (as
// specified by the -timeout flag when running the test). The
// CancelFunc may be called to cancel the context earlier than
// the deadline.
func Ctx(t *testing.T) (context.Context, context.CancelFunc) {
t.Helper()

deadline, ok := t.Deadline()
if !ok {
deadline = time.Now().Add(time.Minute)
} else {
deadline = deadline.Add(-time.Second)
}
return context.WithDeadline(context.Background(), deadline)
}

func TestConfigValidate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultConfig()
Expand All @@ -157,10 +134,42 @@ func TestConfigValidate(t *testing.T) {
cfg.Clock = nil
require.Error(t, cfg.Validate())
})

t.Run("query concurrency positive", func(t *testing.T) {
cfg := DefaultConfig()
cfg.QueryConcurrency = 0
require.Error(t, cfg.Validate())
cfg.QueryConcurrency = -1
require.Error(t, cfg.Validate())
})

t.Run("query timeout positive", func(t *testing.T) {
cfg := DefaultConfig()
cfg.QueryTimeout = 0
require.Error(t, cfg.Validate())
cfg.QueryTimeout = -1
require.Error(t, cfg.Validate())
})

t.Run("request concurrency positive", func(t *testing.T) {
cfg := DefaultConfig()
cfg.RequestConcurrency = 0
require.Error(t, cfg.Validate())
cfg.QueryConcurrency = -1
require.Error(t, cfg.Validate())
})

t.Run("request timeout positive", func(t *testing.T) {
cfg := DefaultConfig()
cfg.RequestTimeout = 0
require.Error(t, cfg.Validate())
cfg.RequestTimeout = -1
require.Error(t, cfg.Validate())
})
}

func TestExhaustiveQuery(t *testing.T) {
ctx, cancel := Ctx(t)
ctx, cancel := kadtest.Ctx(t)
defer cancel()

nodes, eps, rts, siml := setupSimulation(t, ctx)
Expand Down Expand Up @@ -201,31 +210,31 @@ func TestExhaustiveQuery(t *testing.T) {
}

// the query run by the coordinator should have received a response from nodes[1]
ev, err := expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[1].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have received a response from nodes[2]
ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[2].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have received a response from nodes[3]
ev, err = expectEventType(ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[3].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have completed
ev, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{})
ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{})
require.NoError(t, err)

require.IsType(t, &KademliaOutboundQueryFinishedEvent{}, ev)
Expand All @@ -237,7 +246,7 @@ func TestExhaustiveQuery(t *testing.T) {
}

func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {
ctx, cancel := Ctx(t)
ctx, cancel := kadtest.Ctx(t)
defer cancel()

nodes, eps, rts, siml := setupSimulation(t, ctx)
Expand Down Expand Up @@ -279,7 +288,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {

// the query run by the coordinator should have received a response from nodes[1] with closer nodes
// nodes[0] and nodes[2] which should trigger a routing table update
ev, err := expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{})
ev, err := expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev := ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr])
Expand All @@ -289,13 +298,93 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {

// the query continues and should have received a response from nodes[2] with closer nodes
// nodes[1] and nodes[3] which should trigger a routing table update
ev, err = expectEventType(ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{})
ev, err = expectEventType(t, ctx, events, &KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev = ev.(*KademliaRoutingUpdatedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[3].ID(), tev.NodeInfo.ID())

// the query run by the coordinator should have completed
_, err = expectEventType(ctx, events, &KademliaOutboundQueryFinishedEvent{})
_, err = expectEventType(t, ctx, events, &KademliaOutboundQueryFinishedEvent{})
require.NoError(t, err)
}

var findNodeFn = func(n kad.NodeID[key.Key8]) (address.ProtocolID, kad.Request[key.Key8, kadtest.StrAddr]) {
return protoID, sim.NewRequest[key.Key8, kadtest.StrAddr](n.Key())
}

func TestBootstrap(t *testing.T) {
ctx, cancel := kadtest.Ctx(t)
defer cancel()

nodes, eps, rts, siml := setupSimulation(t, ctx)

clk := siml.Clock()

ccfg := DefaultConfig()
ccfg.Clock = clk
ccfg.PeerstoreTTL = peerstoreTTL

go func(ctx context.Context) {
for {
select {
case <-time.After(10 * time.Millisecond):
siml.Run(ctx)
case <-ctx.Done():
return
}
}
}(ctx)

self := nodes[0].ID()
c, err := NewCoordinator[key.Key8, kadtest.StrAddr](self, eps[0], rts[0], ccfg)
if err != nil {
log.Fatalf("unexpected error creating coordinator: %v", err)
}
siml.Add(c)
events := c.Events()

queryID := query.QueryID("bootstrap")

seeds := []kad.NodeID[key.Key8]{
nodes[1].ID(),
}
err = c.Bootstrap(ctx, seeds, findNodeFn)
if err != nil {
t.Fatalf("failed to initiate bootstrap: %v", err)
}

// the query run by the coordinator should have received a response from nodes[1]
ev, err := expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev := ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[1].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have received a response from nodes[2]
ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[2].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have received a response from nodes[3]
ev, err = expectEventType(t, ctx, events, &KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr]{})
require.NoError(t, err)

tev = ev.(*KademliaOutboundQueryProgressedEvent[key.Key8, kadtest.StrAddr])
require.Equal(t, nodes[3].ID(), tev.NodeID)
require.Equal(t, queryID, tev.QueryID)

// the query run by the coordinator should have completed
ev, err = expectEventType(t, ctx, events, &KademliaBootstrapFinishedEvent{})
require.NoError(t, err)

require.IsType(t, &KademliaBootstrapFinishedEvent{}, ev)
tevf := ev.(*KademliaBootstrapFinishedEvent)
require.Equal(t, 3, tevf.Stats.Requests)
require.Equal(t, 3, tevf.Stats.Success)
require.Equal(t, 0, tevf.Stats.Failure)
}
24 changes: 24 additions & 0 deletions internal/kadtest/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kadtest

import (
"context"
"testing"
"time"
)

// Ctx returns a Context and a CancelFunc. The context will be
// cancelled just before the test binary deadline (as
// specified by the -timeout flag when running the test). The
// CancelFunc may be called to cancel the context earlier than
// the deadline.
func Ctx(t *testing.T) (context.Context, context.CancelFunc) {
t.Helper()

deadline, ok := t.Deadline()
if !ok {
deadline = time.Now().Add(time.Minute)
} else {
deadline = deadline.Add(-time.Second)
}
return context.WithDeadline(context.Background(), deadline)
}
13 changes: 8 additions & 5 deletions query/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ func (p *Pool[K, A]) Advance(ctx context.Context, ev PoolEvent) PoolState {
}
eventQueryID = qry.id
}
case nil:
// TEMPORARY: no event to process
// TODO: introduce EventPoolPoll?
case *EventPoolPoll:
// no event to process
default:
panic(fmt.Sprintf("unexpected event: %T", tev))
}
Expand Down Expand Up @@ -279,7 +278,7 @@ type PoolState interface {
// StatePoolIdle indicates that the pool is idle, i.e. there are no queries to process.
type StatePoolIdle struct{}

// StatePoolQueryMessage indicates that at a query is waiting to message a node.
// StatePoolQueryMessage indicates that a pool query is waiting to message a node.
type StatePoolQueryMessage[K kad.Key[K], A kad.Address[A]] struct {
QueryID QueryID
NodeID kad.NodeID[K]
Expand All @@ -302,7 +301,7 @@ type StatePoolQueryFinished struct {
Stats QueryStats
}

// StatePoolQueryTimeout indicates that at a query has timed out.
// StatePoolQueryTimeout indicates that a query has timed out.
type StatePoolQueryTimeout struct {
QueryID QueryID
Stats QueryStats
Expand Down Expand Up @@ -349,8 +348,12 @@ type EventPoolMessageFailure[K kad.Key[K]] struct {
Error error // the error that caused the failure, if any
}

// EventPoolPoll is an event that signals the pool that it can perform housekeeping work such as time out queries.
type EventPoolPoll struct{}

// poolEvent() ensures that only Pool events can be assigned to the PoolEvent interface.
func (*EventPoolAddQuery[K, A]) poolEvent() {}
func (*EventPoolStopQuery) poolEvent() {}
func (*EventPoolMessageResponse[K, A]) poolEvent() {}
func (*EventPoolMessageFailure[K]) poolEvent() {}
func (*EventPoolPoll) poolEvent() {}
12 changes: 6 additions & 6 deletions query/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestPoolStartsIdle(t *testing.T) {
p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg)
require.NoError(t, err)

state := p.Advance(ctx, nil)
state := p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolIdle{}, state)
}

Expand All @@ -90,7 +90,7 @@ func TestPoolStopWhenNoQueries(t *testing.T) {
p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg)
require.NoError(t, err)

state := p.Advance(ctx, &EventPoolStopQuery{})
state := p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolIdle{}, state)
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestPoolAddQueryStartsIfCapacity(t *testing.T) {
require.Equal(t, msg, st.Message)

// now the pool reports that it is waiting
state = p.Advance(ctx, nil)
state = p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolWaitingWithCapacity{}, state)
}

Expand Down Expand Up @@ -244,14 +244,14 @@ func TestPoolPrefersRunningQueriesOverNewOnes(t *testing.T) {
require.Equal(t, b, st.NodeID)

// advance the pool again, the first query should continue its operation in preference to starting the new query
state = p.Advance(ctx, nil)
state = p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state)
st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr])
require.Equal(t, queryID1, st.QueryID)
require.Equal(t, c, st.NodeID)

// advance the pool again, the first query is at capacity so the second query can start
state = p.Advance(ctx, nil)
state = p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state)
st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr])
require.Equal(t, queryID2, st.QueryID)
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestPoolRespectsConcurrency(t *testing.T) {
require.Equal(t, queryID1, stf.QueryID)

// advancing pool again allows query 3 to start
state = p.Advance(ctx, nil)
state = p.Advance(ctx, &EventPoolPoll{})
require.IsType(t, &StatePoolQueryMessage[key.Key8, kadtest.StrAddr]{}, state)
st = state.(*StatePoolQueryMessage[key.Key8, kadtest.StrAddr])
require.Equal(t, queryID3, st.QueryID)
Expand Down
1 change: 1 addition & 0 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (q *Query[K, A]) onMessageFailure(ctx context.Context, node kad.NodeID[K])
switch st := ni.State.(type) {
case *StateNodeWaiting:
q.inFlight--
q.stats.Failure++
case *StateNodeUnresponsive:
// update node state to failed
break
Expand Down
Loading

0 comments on commit accf5ea

Please sign in to comment.