From 29e5bb423ba561545a51ee82e683b45d9e450c34 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 17 Oct 2024 19:08:33 -0400 Subject: [PATCH] Support checking assertion of Leadership This allows clients of this library to check if the current node, assuming it is the Leader, has explicitly asserted leadership in the current term. "Asserting" is defined has having committed at least one log entry in the current term, as per the Raft paper. This change adds a new channel, which is only serviced when in Leadership mode. This channel will respond with the asserted status via a Future. Clients are recommended to call this function only until they have confirmed that the leadership has been asserted. Once asserted for a given term the state can be cached until the end of the term. This code is necessary to properly implement the Read Optimization outlined in the Raft paper and dissertation. The dissertation explains that it is not sufficient to simply check the Leader's commit index. Instead a client must first confirm that the Leader has explicitly asserted leadership by committing a log entry in the current term. Only then is commit index valid. --- api.go | 24 ++++++++++++++++ future.go | 26 +++++++++++++++++ raft.go | 21 ++++++++++++++ raft_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 152 insertions(+) diff --git a/api.go b/api.go index 4ddeb36a..ebf41165 100644 --- a/api.go +++ b/api.go @@ -188,6 +188,10 @@ type Raft struct { // to verify we are still the leader verifyCh chan *verifyFuture + // assertedCh is used to check if the leader has fully asserted leadership. + // A leader does this by committing an entry to the log in the current term. + assertedCh chan *assertedFuture + // configurationsCh is used to get the configuration data safely from // outside of the main thread. configurationsCh chan *configurationsFuture @@ -558,6 +562,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna stable: stable, trans: trans, verifyCh: make(chan *verifyFuture, 64), + assertedCh: make(chan *assertedFuture, 64), configurationsCh: make(chan *configurationsFuture, 8), bootstrapCh: make(chan *bootstrapFuture), observers: make(map[uint64]*Observer), @@ -883,6 +888,25 @@ func (r *Raft) VerifyLeader() Future { } } +// VerifyAssertedLeadership is used to check if the node, if acting as a leader, +// has fully asserted leadership by committing an entry in this term. The returned +// future indicates the term for which leadership has been asserted. +// +// Since this call may be relatively slow, it is not suitable for code paths that +// require low latency. One suggested way to use it is to call this once per term, +// storing the returned state in a variable. Once leadership has been asserted for +// a term, the state will not change, so there is no need to call this again for +// the same term. +// +// Must be run on the leader, or it will fail. +func (r *Raft) VerifyAssertedLeadership() AssertedFuture { + metrics.IncrCounter([]string{"raft", "verify_asserted_leadership"}, 1) + assertedFuture := &assertedFuture{} + assertedFuture.init() + r.assertedCh <- assertedFuture + return assertedFuture +} + // GetConfiguration returns the latest configuration. This may not yet be // committed. The main loop can access this directly. func (r *Raft) GetConfiguration() ConfigurationFuture { diff --git a/future.go b/future.go index 303da448..cc25a474 100644 --- a/future.go +++ b/future.go @@ -44,6 +44,18 @@ type ApplyFuture interface { Response() interface{} } +// AssertedFuture is used to check if the current node has asserted leadership. +// A Leader asserts leadership by committing at least one log in this term. +type AssertedFuture interface { + Future + + // Asserted returns true if the current node has asserted leadership. + Asserted() bool + + // Term returns the term in which the leadership was asserted. + Term() uint64 +} + // ConfigurationFuture is used for GetConfiguration and can return the // latest configuration in use by Raft. type ConfigurationFuture interface { @@ -243,6 +255,20 @@ type verifyFuture struct { voteLock sync.Mutex } +type assertedFuture struct { + deferError + asserted bool + term uint64 +} + +func (a *assertedFuture) Asserted() bool { + return a.asserted +} + +func (a *assertedFuture) Term() uint64 { + return a.term +} + // leadershipTransferFuture is used to track the progress of a leadership // transfer internally. type leadershipTransferFuture struct { diff --git a/raft.go b/raft.go index cbc9a59a..0ce6ff7c 100644 --- a/raft.go +++ b/raft.go @@ -185,6 +185,11 @@ func (r *Raft) runFollower() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + // Reject any operations since we are not the leader + v.respond(ErrNotLeader) + case ur := <-r.userRestoreCh: r.mainThreadSaturation.working() // Reject any restores since we are not the leader @@ -398,6 +403,11 @@ func (r *Raft) runCandidate() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + // Reject any operations since we are not the leader + v.respond(ErrNotLeader) + case ur := <-r.userRestoreCh: r.mainThreadSaturation.working() // Reject any restores since we are not the leader @@ -676,6 +686,10 @@ func (r *Raft) leaderLoop() { // based on the current config value. lease := time.After(r.config().LeaderLeaseTimeout) + // Track whether leadership has been asserted. This is true as long + // as the leader has committed at least one log in this term. + leadershipAsserted := false + for r.getState() == Leader { r.mainThreadSaturation.sleeping() @@ -788,6 +802,7 @@ func (r *Raft) leaderLoop() { oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) + leadershipAsserted = true // New configuration has been committed, set it as the committed // value. @@ -869,6 +884,12 @@ func (r *Raft) leaderLoop() { v.respond(nil) } + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + v.asserted = leadershipAsserted + v.term = r.getCurrentTerm() + v.respond(nil) + case future := <-r.userRestoreCh: r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { diff --git a/raft_test.go b/raft_test.go index 2db115b6..88972791 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1971,6 +1971,87 @@ func TestRaft_VerifyLeader_PartialConnect(t *testing.T) { } } +func TestRaft_VerifyAssertedLeadership(t *testing.T) { + // Make the cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify that leadership has been asserted for the + // correct term. + if asserted.Asserted() != true { + t.Fatalf("expected leadership to be asserted") + } + if asserted.Term() != leader.getCurrentTerm() { + t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term()) + } +} + +func TestRaft_VerifyAssertedLeadership_Single(t *testing.T) { + // Make the cluster + c := MakeCluster(1, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify that leadership has been asserted for the + // correct term. + if asserted.Asserted() != true { + t.Fatalf("expected leadership to be asserted") + } + if asserted.Term() != leader.getCurrentTerm() { + t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term()) + } +} + +func TestRaft_VerifyAssertedLeadership_Fail(t *testing.T) { + // Make the cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Confirm that an error is received when called on a Follower. + follower := c.Followers()[0] + if err := follower.VerifyAssertedLeadership().Error(); err == nil { + t.Fatal("expected error asserting leadership on a Follower") + } +} + func TestRaft_NotifyCh(t *testing.T) { ch := make(chan bool, 1) conf := inmemConfig(t)