Skip to content

Commit

Permalink
Support checking assertion of Leadership
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Oct 16, 2024
1 parent 2b0032e commit dc5e14c
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
23 changes: 23 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ type Raft struct {
// to verify we are still the leader
verifyCh chan *verifyFuture

// assertedCh is used to check if the leader has fully asserted leadership
// by committing an entry in this term.
assertedCh chan *assertedFuture

// configurationsCh is used to get the configuration data safely from
// outside of the main thread.
configurationsCh chan *configurationsFuture
Expand Down Expand Up @@ -558,6 +562,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
assertedCh: make(chan *assertedFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
Expand Down Expand Up @@ -883,6 +888,24 @@ func (r *Raft) VerifyLeader() Future {
}
}

// VerifyAssertedLeadership is used to check if the node, if acting as a leader,
// has fully asserted leadership by committing an entry in this term. The returned
// future indicates the term for which leadership has been asserted.
//
// Since this call may be relatively slow, it is not suitable for code paths that requir
// low latency. One suggested way to use it is to call this once per term, storing the
// returned state in a variable. Once leadership has been asserted for a term, the state
// will not change, so there is no need to call this again for the same term.
//
// Must be run on the leader, or it will fail.
func (r *Raft) VerifyAssertedLeadership() AssertedFuture {
metrics.IncrCounter([]string{"raft", "verify_asserted_leadership"}, 1)
assertedFuture := &assertedFuture{}
assertedFuture.init()
r.assertedCh <- assertedFuture
return assertedFuture
}

// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
Expand Down
26 changes: 26 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ type ApplyFuture interface {
Response() interface{}
}

// AssertedFuture is used to check if the current node has asserted leadership
// for the current term by committing at least one entry in this term.
type AssertedFuture interface {
Future

// Asserted returns true if the current node has asserted leadership.
Asserted() bool

// Term returns the term in which the leadership was asserted.
Term() uint64
}

// ConfigurationFuture is used for GetConfiguration and can return the
// latest configuration in use by Raft.
type ConfigurationFuture interface {
Expand Down Expand Up @@ -243,6 +255,20 @@ type verifyFuture struct {
voteLock sync.Mutex
}

type assertedFuture struct {
deferError
asserted bool
term uint64
}

func (a *assertedFuture) Asserted() bool {
return a.asserted
}

func (a *assertedFuture) Term() uint64 {
return a.term
}

// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
Expand Down
21 changes: 21 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (r *Raft) runFollower() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
Expand Down Expand Up @@ -398,6 +403,11 @@ func (r *Raft) runCandidate() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
Expand Down Expand Up @@ -676,6 +686,8 @@ func (r *Raft) leaderLoop() {
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)

leadershipAsserted := false

for r.getState() == Leader {
r.mainThreadSaturation.sleeping()

Expand Down Expand Up @@ -788,6 +800,9 @@ func (r *Raft) leaderLoop() {
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
if !leadershipAsserted {
leadershipAsserted = true
}

// New configuration has been committed, set it as the committed
// value.
Expand Down Expand Up @@ -869,6 +884,12 @@ func (r *Raft) leaderLoop() {
v.respond(nil)
}

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
v.asserted = leadershipAsserted
v.term = r.getCurrentTerm()
v.respond(nil)

case future := <-r.userRestoreCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
Expand Down
81 changes: 81 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,87 @@ func TestRaft_VerifyLeader_PartialConnect(t *testing.T) {
}
}

func TestRaft_VerifyAssertedLeadership(t *testing.T) {
// Make the cluster
c := MakeCluster(3, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Verify that leadership has been asserted for the
// correct term.
if asserted.Asserted() != true {
t.Fatalf("expected leadership to be asserted")
}
if asserted.Term() != leader.getCurrentTerm() {
t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term())
}
}

func TestRaft_VerifyAssertedLeadership_Single(t *testing.T) {
// Make the cluster
c := MakeCluster(1, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Verify that leadership has been asserted for the
// correct term.
if asserted.Asserted() != true {
t.Fatalf("expected leadership to be asserted")
}
if asserted.Term() != leader.getCurrentTerm() {
t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term())
}
}

func TestRaft_VerifyAssertedLeadership_Fail(t *testing.T) {
// Make the cluster
c := MakeCluster(3, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Confirm that an error is received when called on a Follower.
follower := c.Followers()[0]
if err := follower.VerifyAssertedLeadership().Error(); err == nil {
t.Fatal("expected error asserting leadership on a Follower")
}
}

func TestRaft_NotifyCh(t *testing.T) {
ch := make(chan bool, 1)
conf := inmemConfig(t)
Expand Down

0 comments on commit dc5e14c

Please sign in to comment.