Skip to content

Commit

Permalink
Fix rare leadership transfer failures when writes happen during trans…
Browse files Browse the repository at this point in the history
…fer (#581)

After initiating a leadership transfer, for up to electiontimeout duration, the old leader will no longer permit writes to occur.  Allowing writes to occur in this scenario can result in the transfer failing, due to some node other than the target having the highest index.  Moreover the writes might be rolled back anyway by the new leader if insufficiently widely committed.
  • Loading branch information
ncabatoff authored Dec 4, 2023
1 parent 1462fd5 commit 62eaa1c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
15 changes: 14 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,21 @@ func (r *Raft) leaderLoop() {
case err := <-doneCh:
if err != nil {
r.logger.Debug(err.Error())
future.respond(err)
} else {
// Wait for up to ElectionTimeout before flagging the
// leadership transfer as done and unblocking applies in
// the leaderLoop.
select {
case <-time.After(r.config().ElectionTimeout):
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
future.respond(err)
case <-leftLeaderLoop:
r.logger.Debug("lost leadership during transfer (expected)")
future.respond(nil)
}
}
future.respond(err)
}
}()

Expand Down
60 changes: 57 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,17 +2337,71 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) {
}
}

func TestRaft_LeadershipTransferWithWrites(t *testing.T) {
conf := inmemConfig(t)
conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace})
c := MakeCluster(7, t, conf)
defer c.Close()

doneCh := make(chan struct{})
var writerErr error
var wg sync.WaitGroup
var writes int
wg.Add(1)
leader := c.Leader()
go func() {
defer wg.Done()
for {
select {
case <-doneCh:
return
default:
future := leader.Apply([]byte("test"), 0)
switch err := future.Error(); {
case errors.Is(err, ErrRaftShutdown):
return
case errors.Is(err, ErrNotLeader):
leader = c.Leader()
case errors.Is(err, ErrLeadershipTransferInProgress):
continue
case errors.Is(err, ErrLeadershipLost):
continue
case err == nil:
writes++
default:
writerErr = err
}
time.Sleep(time.Millisecond)
}
}
}()

follower := c.Followers()[0]
future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr)
if future.Error() != nil {
t.Fatalf("Didn't expect error: %v", future.Error())
}
if follower.localID != c.Leader().localID {
t.Error("Leadership should have been transitioned to specified server.")
}
close(doneCh)
wg.Wait()
if writerErr != nil {
t.Fatal(writerErr)
}
t.Logf("writes: %d", writes)
}

func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) {
c := MakeCluster(7, t, nil)
defer c.Close()

oldLeader := c.Leader().localID
follower := c.GetInState(Follower)[0]
future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr)
if future.Error() != nil {
t.Fatalf("Didn't expect error: %v", future.Error())
}
if oldLeader == c.Leader().localID {
if follower.localID != c.Leader().localID {
t.Error("Leadership should have been transitioned to specified server.")
}
}
Expand Down Expand Up @@ -2510,7 +2564,7 @@ func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) {
}

func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
r := Raft{leaderState: leaderState{}}
r := Raft{leaderState: leaderState{}, logger: hclog.New(nil)}
r.setupLeaderState()

stopCh := make(chan struct{})
Expand Down
7 changes: 4 additions & 3 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
// restart the timer.
pollStartTime := time.Now()
for {
inState, highestTerm := c.pollState(s)
_, highestTerm := c.pollState(s)
inStateTime := time.Now()

// Sometimes this routine is called very early on before the
Expand Down Expand Up @@ -479,8 +479,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.Fatalf("timer channel errored")
}

c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
inState, highestTerm := c.pollState(s)
c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), highestTerm is %d, %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), highestTerm, inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
return inState
}
}
Expand Down

0 comments on commit 62eaa1c

Please sign in to comment.