diff --git a/api.go b/api.go index 4ddeb36a..1e9be6f9 100644 --- a/api.go +++ b/api.go @@ -188,6 +188,10 @@ type Raft struct { // to verify we are still the leader verifyCh chan *verifyFuture + // assertedCh is used to check if the leader has fully asserted leadership + // by committing an entry in this term. + assertedCh chan *assertedFuture + // configurationsCh is used to get the configuration data safely from // outside of the main thread. configurationsCh chan *configurationsFuture @@ -558,6 +562,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna stable: stable, trans: trans, verifyCh: make(chan *verifyFuture, 64), + assertedCh: make(chan *assertedFuture, 64), configurationsCh: make(chan *configurationsFuture, 8), bootstrapCh: make(chan *bootstrapFuture), observers: make(map[uint64]*Observer), @@ -883,6 +888,24 @@ func (r *Raft) VerifyLeader() Future { } } +// VerifyAssertedLeadership is used to check if the node, if acting as a leader, +// has fully asserted leadership by committing an entry in this term. The returned +// future indicates the term for which leadership has been asserted. +// +// Since this call may be relatively slow, it is not suitable for code paths that requir +// low latency. One suggested way to use it is to call this once per term, storing the +// returned state in a variable. Once leadership has been asserted for a term, the state +// will not change, so there is no need to call this again for the same term. +// +// Must be run on the leader, or it will fail. +func (r *Raft) VerifyAssertedLeadership() AssertedFuture { + metrics.IncrCounter([]string{"raft", "verify_asserted_leadership"}, 1) + assertedFuture := &assertedFuture{} + assertedFuture.init() + r.assertedCh <- assertedFuture + return assertedFuture +} + // GetConfiguration returns the latest configuration. This may not yet be // committed. The main loop can access this directly. func (r *Raft) GetConfiguration() ConfigurationFuture { diff --git a/future.go b/future.go index 303da448..5ce68bb6 100644 --- a/future.go +++ b/future.go @@ -44,6 +44,18 @@ type ApplyFuture interface { Response() interface{} } +// AssertedFuture is used to check if the current node has asserted leadership +// for the current term by committing at least one entry in this term. +type AssertedFuture interface { + Future + + // Asserted returns true if the current node has asserted leadership. + Asserted() bool + + // Term returns the term in which the leadership was asserted. + Term() uint64 +} + // ConfigurationFuture is used for GetConfiguration and can return the // latest configuration in use by Raft. type ConfigurationFuture interface { @@ -243,6 +255,20 @@ type verifyFuture struct { voteLock sync.Mutex } +type assertedFuture struct { + deferError + asserted bool + term uint64 +} + +func (a *assertedFuture) Asserted() bool { + return a.asserted +} + +func (a *assertedFuture) Term() uint64 { + return a.term +} + // leadershipTransferFuture is used to track the progress of a leadership // transfer internally. type leadershipTransferFuture struct { diff --git a/raft.go b/raft.go index cbc9a59a..23de2625 100644 --- a/raft.go +++ b/raft.go @@ -185,6 +185,11 @@ func (r *Raft) runFollower() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + // Reject any operations since we are not the leader + v.respond(ErrNotLeader) + case ur := <-r.userRestoreCh: r.mainThreadSaturation.working() // Reject any restores since we are not the leader @@ -398,6 +403,11 @@ func (r *Raft) runCandidate() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + // Reject any operations since we are not the leader + v.respond(ErrNotLeader) + case ur := <-r.userRestoreCh: r.mainThreadSaturation.working() // Reject any restores since we are not the leader @@ -676,6 +686,8 @@ func (r *Raft) leaderLoop() { // based on the current config value. lease := time.After(r.config().LeaderLeaseTimeout) + leadershipAsserted := false + for r.getState() == Leader { r.mainThreadSaturation.sleeping() @@ -788,6 +800,9 @@ func (r *Raft) leaderLoop() { oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) + if !leadershipAsserted { + leadershipAsserted = true + } // New configuration has been committed, set it as the committed // value. @@ -869,6 +884,12 @@ func (r *Raft) leaderLoop() { v.respond(nil) } + case v := <-r.assertedCh: + r.mainThreadSaturation.working() + v.asserted = leadershipAsserted + v.term = r.getCurrentTerm() + v.respond(nil) + case future := <-r.userRestoreCh: r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { diff --git a/raft_test.go b/raft_test.go index 2db115b6..88972791 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1971,6 +1971,87 @@ func TestRaft_VerifyLeader_PartialConnect(t *testing.T) { } } +func TestRaft_VerifyAssertedLeadership(t *testing.T) { + // Make the cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify that leadership has been asserted for the + // correct term. + if asserted.Asserted() != true { + t.Fatalf("expected leadership to be asserted") + } + if asserted.Term() != leader.getCurrentTerm() { + t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term()) + } +} + +func TestRaft_VerifyAssertedLeadership_Single(t *testing.T) { + // Make the cluster + c := MakeCluster(1, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify that leadership has been asserted for the + // correct term. + if asserted.Asserted() != true { + t.Fatalf("expected leadership to be asserted") + } + if asserted.Term() != leader.getCurrentTerm() { + t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term()) + } +} + +func TestRaft_VerifyAssertedLeadership_Fail(t *testing.T) { + // Make the cluster + c := MakeCluster(3, t, nil) + defer c.Close() + + // Get the leader + leader := c.Leader() + + time.Sleep(c.propagateTimeout) + + // Check that leadership has been asserted. + asserted := leader.VerifyAssertedLeadership() + + // Wait for the asserted to return + if err := asserted.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Confirm that an error is received when called on a Follower. + follower := c.Followers()[0] + if err := follower.VerifyAssertedLeadership().Error(); err == nil { + t.Fatal("expected error asserting leadership on a Follower") + } +} + func TestRaft_NotifyCh(t *testing.T) { ch := make(chan bool, 1) conf := inmemConfig(t)