From e338309ac02c858de89928b0e4a47b805253a931 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Thu, 23 Nov 2023 14:47:38 -0500 Subject: [PATCH 1/7] Reproduce leadership transfer failures that can occur when writes are happening during the transfer. Fix the problem partially: it can still occur if we dispatchLogs after the transfer message has been sent to the target, before it wins the election. --- raft.go | 40 ++++++++++++++++++++++++++++++++-------- raft_test.go | 30 ++++++++++++++++++++++++++---- testing.go | 6 +++++- 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/raft.go b/raft.go index 439fe6e2..7e9ebe74 100644 --- a/raft.go +++ b/raft.go @@ -6,7 +6,9 @@ package raft import ( "bytes" "container/list" + "context" "fmt" + "golang.org/x/sync/semaphore" "io" "sync/atomic" "time" @@ -92,6 +94,7 @@ type leaderState struct { replState map[ServerID]*followerReplication notify map[*verifyFuture]struct{} stepDown chan struct{} + applyable *semaphore.Weighted } // setLeader is used to modify the current leader Address and ID of the cluster @@ -401,7 +404,7 @@ func (r *Raft) setLeadershipTransferInProgress(v bool) { func (r *Raft) getLeadershipTransferInProgress() bool { v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress) - return v == 1 + return v > 0 } func (r *Raft) setupLeaderState() { @@ -413,6 +416,7 @@ func (r *Raft) setupLeaderState() { r.leaderState.replState = make(map[ServerID]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) + r.leaderState.applyable = semaphore.NewWeighted(1) } // runLeader runs the main loop while in leader state. Do the setup here and drop into @@ -483,6 +487,7 @@ func (r *Raft) runLeader() { r.leaderState.replState = nil r.leaderState.notify = nil r.leaderState.stepDown = nil + r.leaderState.applyable = nil // If we are stepping down for some reason, no known leader. // We may have stepped down due to an RPC call, which would @@ -521,7 +526,9 @@ func (r *Raft) runLeader() { // maintain that there exists at most one uncommitted configuration entry in // any log, so we have to do proper no-ops here. noop := &logFuture{log: Log{Type: LogNoop}} + r.leaderState.applyable.Acquire(context.Background(), 1) r.dispatchLogs([]*logFuture{noop}) + r.leaderState.applyable.Release(1) // Sit in the leader loop until we step down r.leaderLoop() @@ -648,7 +655,7 @@ func (r *Raft) leaderLoop() { continue } - r.logger.Debug("starting leadership transfer", "id", future.ID, "address", future.Address) + r.logger.Debug("starting leadership transfer", "id", *future.ID, "address", *future.Address) // When we are leaving leaderLoop, we are no longer // leader, so we should stop transferring. @@ -843,10 +850,13 @@ func (r *Raft) leaderLoop() { case newLog := <-r.applyCh: r.mainThreadSaturation.working() - if r.getLeadershipTransferInProgress() { + //if r.getLeadershipTransferInProgress() { + //} + if !r.leaderState.applyable.TryAcquire(1) { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue + } // Group commit, gather all the ready commits ready := []*logFuture{newLog} @@ -869,6 +879,7 @@ func (r *Raft) leaderLoop() { } else { r.dispatchLogs(ready) } + r.leaderState.applyable.Release(1) case <-lease: r.mainThreadSaturation.working() @@ -928,13 +939,18 @@ func (r *Raft) verifyLeader(v *verifyFuture) { // leadershipTransfer is doing the heavy lifting for the leadership transfer. func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) { // make sure we are not already stopped - select { - case <-stopCh: - doneCh <- nil - return - default: + + for !r.leaderState.applyable.TryAcquire(1) { + select { + case <-stopCh: + doneCh <- nil + return + default: + } } + defer r.leaderState.applyable.Release(1) + r.logger.Trace("leadership transfer progress", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() { err := &deferError{} err.init() @@ -950,6 +966,7 @@ func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *foll return } } + r.logger.Trace("leadership transfer progress", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) // Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce // messaging for read-only queries. If this is implemented, the lease @@ -1140,6 +1157,12 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { // configuration entry to the log. This must only be called from the // main thread. func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { + if !r.leaderState.applyable.TryAcquire(1) { + future.respond(fmt.Errorf("can't apply, semaphor held")) + return + } + defer r.leaderState.applyable.Release(1) + configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req) if err != nil { future.respond(err) @@ -1212,6 +1235,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { } r.leaderState.commitment.match(r.localID, lastIndex) + r.logger.Trace("dispatchLogs", "lastIndex", lastIndex) // Update the last log since it's on disk now r.setLastLog(lastIndex, term) diff --git a/raft_test.go b/raft_test.go index 8e1a00e3..578db51b 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2338,16 +2338,38 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { } func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { - c := MakeCluster(7, t, nil) + conf := inmemConfig(t) + conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) + c := MakeCluster(7, t, conf) defer c.Close() - oldLeader := c.Leader().localID + // Wait for a leader + leader := c.Leader() + + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + for { + select { + case <-doneCh: + return + default: + future := leader.Apply([]byte("test"), 0) + if err := future.Error(); err != nil { + t.Logf("[ERR] err: %v", err) + } + time.Sleep(time.Microsecond / 10) + } + } + }() + time.Sleep(time.Second / 2) + follower := c.GetInState(Follower)[0] - future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) + future := leader.LeadershipTransferToServer(follower.localID, follower.localAddr) if future.Error() != nil { t.Fatalf("Didn't expect error: %v", future.Error()) } - if oldLeader == c.Leader().localID { + if follower.localID != c.Leader().localID { t.Error("Leadership should have been transitioned to specified server.") } } diff --git a/testing.go b/testing.go index 91cf6e76..044005dd 100644 --- a/testing.go +++ b/testing.go @@ -211,12 +211,16 @@ func newTestLogger(tb testing.TB) hclog.Logger { // is logged after the test is complete. func newTestLoggerWithPrefix(tb testing.TB, prefix string) hclog.Logger { if testing.Verbose() { - return hclog.New(&hclog.LoggerOptions{Name: prefix}) + return hclog.New(&hclog.LoggerOptions{ + Name: prefix, + Level: hclog.Trace, + }) } return hclog.New(&hclog.LoggerOptions{ Name: prefix, Output: &testLoggerAdapter{tb: tb, prefix: prefix}, + Level: hclog.Trace, }) } From a76a6f869b3462ab31c5ca9e687df9065b36d57b Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 09:25:43 -0500 Subject: [PATCH 2/7] Fix some collateral damage I introduced. Also a bug in leaderTransfer changes: we weren't properly respecting stopCh. --- go.mod | 1 + go.sum | 1 + raft.go | 24 +++++++++++++++++------- raft_test.go | 9 +++++++-- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 4cdae5d3..aa2bb985 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/go-msgpack/v2 v2.1.1 github.com/stretchr/testify v1.8.4 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e ) require ( diff --git a/go.sum b/go.sum index a670dd67..7f331842 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,7 @@ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/raft.go b/raft.go index 7e9ebe74..ae072af1 100644 --- a/raft.go +++ b/raft.go @@ -655,7 +655,7 @@ func (r *Raft) leaderLoop() { continue } - r.logger.Debug("starting leadership transfer", "id", *future.ID, "address", *future.Address) + r.logger.Debug("starting leadership transfer", "id", future.ID, "address", future.Address) // When we are leaving leaderLoop, we are no longer // leader, so we should stop transferring. @@ -850,9 +850,7 @@ func (r *Raft) leaderLoop() { case newLog := <-r.applyCh: r.mainThreadSaturation.working() - //if r.getLeadershipTransferInProgress() { - //} - if !r.leaderState.applyable.TryAcquire(1) { + if r.getLeadershipTransferInProgress() || !r.leaderState.applyable.TryAcquire(1) { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue @@ -939,18 +937,30 @@ func (r *Raft) verifyLeader(v *verifyFuture) { // leadershipTransfer is doing the heavy lifting for the leadership transfer. func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) { // make sure we are not already stopped + r.logger.Debug("leadershipTransfer", "id", id, "address", address) - for !r.leaderState.applyable.TryAcquire(1) { + select { + case <-stopCh: + doneCh <- nil + return + default: + } + +LOOP: + for { select { case <-stopCh: doneCh <- nil return default: + if r.leaderState.applyable.TryAcquire(1) { + break LOOP + } } } defer r.leaderState.applyable.Release(1) - r.logger.Trace("leadership transfer progress", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) + r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", atomic.LoadUint64(&repl.nextIndex)) for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() { err := &deferError{} err.init() @@ -966,7 +976,7 @@ func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *foll return } } - r.logger.Trace("leadership transfer progress", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) + r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) // Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce // messaging for read-only queries. If this is implemented, the lease diff --git a/raft_test.go b/raft_test.go index 578db51b..c284edbd 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2347,7 +2347,9 @@ func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { leader := c.Leader() doneCh := make(chan struct{}) - defer close(doneCh) + t.Cleanup(func() { + close(doneCh) + }) go func() { for { select { @@ -2356,6 +2358,9 @@ func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { default: future := leader.Apply([]byte("test"), 0) if err := future.Error(); err != nil { + if errors.Is(err, ErrRaftShutdown) || errors.Is(err, ErrNotLeader) { + return + } t.Logf("[ERR] err: %v", err) } time.Sleep(time.Microsecond / 10) @@ -2532,7 +2537,7 @@ func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) { } func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { - r := Raft{leaderState: leaderState{}} + r := Raft{leaderState: leaderState{}, logger: hclog.New(nil)} r.setupLeaderState() stopCh := make(chan struct{}) From 8cecc28cf547c5ff66e43c2b47d0fc2ca52c8f39 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 09:34:28 -0500 Subject: [PATCH 3/7] Attempt at a "fix": after we send the TimeoutNow, wait for up to ElectionTimeout before resuming applies. Fix is in scare quotes because I'm not all sure this is acceptable. --- raft.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/raft.go b/raft.go index ae072af1..4e76d2a1 100644 --- a/raft.go +++ b/raft.go @@ -701,6 +701,19 @@ func (r *Raft) leaderLoop() { r.logger.Debug(err.Error()) } future.respond(err) + if err == nil { + // Wait for up to ElectionTimeout before flagging the + // leadership transfer as done and unblocking applies in + // the leaderLoop. + select { + case <-time.After(r.config().ElectionTimeout): + err := fmt.Errorf("leadership transfer timeout") + r.logger.Debug(err.Error()) + case <-leftLeaderLoop: + err := fmt.Errorf("lost leadership during transfer (expected)") + r.logger.Debug(err.Error()) + } + } } }() @@ -854,8 +867,8 @@ func (r *Raft) leaderLoop() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue - } + // Group commit, gather all the ready commits ready := []*logFuture{newLog} GROUP_COMMIT_LOOP: From 477cf7e54c5aa5248e442c5112a970453882c893 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 09:42:18 -0500 Subject: [PATCH 4/7] Slow down writes a bit to make this more amenable to testing in parallel, and improve test log output. --- raft_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/raft_test.go b/raft_test.go index c284edbd..ec077bfd 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2361,9 +2361,11 @@ func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { if errors.Is(err, ErrRaftShutdown) || errors.Is(err, ErrNotLeader) { return } - t.Logf("[ERR] err: %v", err) + if !errors.Is(err, ErrLeadershipTransferInProgress) { + t.Logf("[ERR] err: %v", err) + } } - time.Sleep(time.Microsecond / 10) + time.Sleep(time.Microsecond) } } }() From 2b715ac126cf16f5783ccfcd0c0c61454c1cd7dc Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 13:07:39 -0500 Subject: [PATCH 5/7] Revert changes that turned out be unneeded. --- raft.go | 37 ++----------------------------------- testing.go | 6 +----- 2 files changed, 3 insertions(+), 40 deletions(-) diff --git a/raft.go b/raft.go index 4e76d2a1..ce532450 100644 --- a/raft.go +++ b/raft.go @@ -6,9 +6,7 @@ package raft import ( "bytes" "container/list" - "context" "fmt" - "golang.org/x/sync/semaphore" "io" "sync/atomic" "time" @@ -94,7 +92,6 @@ type leaderState struct { replState map[ServerID]*followerReplication notify map[*verifyFuture]struct{} stepDown chan struct{} - applyable *semaphore.Weighted } // setLeader is used to modify the current leader Address and ID of the cluster @@ -404,7 +401,7 @@ func (r *Raft) setLeadershipTransferInProgress(v bool) { func (r *Raft) getLeadershipTransferInProgress() bool { v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress) - return v > 0 + return v == 1 } func (r *Raft) setupLeaderState() { @@ -416,7 +413,6 @@ func (r *Raft) setupLeaderState() { r.leaderState.replState = make(map[ServerID]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) - r.leaderState.applyable = semaphore.NewWeighted(1) } // runLeader runs the main loop while in leader state. Do the setup here and drop into @@ -487,7 +483,6 @@ func (r *Raft) runLeader() { r.leaderState.replState = nil r.leaderState.notify = nil r.leaderState.stepDown = nil - r.leaderState.applyable = nil // If we are stepping down for some reason, no known leader. // We may have stepped down due to an RPC call, which would @@ -526,9 +521,7 @@ func (r *Raft) runLeader() { // maintain that there exists at most one uncommitted configuration entry in // any log, so we have to do proper no-ops here. noop := &logFuture{log: Log{Type: LogNoop}} - r.leaderState.applyable.Acquire(context.Background(), 1) r.dispatchLogs([]*logFuture{noop}) - r.leaderState.applyable.Release(1) // Sit in the leader loop until we step down r.leaderLoop() @@ -863,7 +856,7 @@ func (r *Raft) leaderLoop() { case newLog := <-r.applyCh: r.mainThreadSaturation.working() - if r.getLeadershipTransferInProgress() || !r.leaderState.applyable.TryAcquire(1) { + if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue @@ -890,7 +883,6 @@ func (r *Raft) leaderLoop() { } else { r.dispatchLogs(ready) } - r.leaderState.applyable.Release(1) case <-lease: r.mainThreadSaturation.working() @@ -950,8 +942,6 @@ func (r *Raft) verifyLeader(v *verifyFuture) { // leadershipTransfer is doing the heavy lifting for the leadership transfer. func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) { // make sure we are not already stopped - r.logger.Debug("leadershipTransfer", "id", id, "address", address) - select { case <-stopCh: doneCh <- nil @@ -959,21 +949,6 @@ func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *foll default: } -LOOP: - for { - select { - case <-stopCh: - doneCh <- nil - return - default: - if r.leaderState.applyable.TryAcquire(1) { - break LOOP - } - } - } - defer r.leaderState.applyable.Release(1) - - r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", atomic.LoadUint64(&repl.nextIndex)) for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() { err := &deferError{} err.init() @@ -989,7 +964,6 @@ LOOP: return } } - r.logger.Trace("leadershipTransfer", "my_last_index", r.getLastIndex(), "follower_next_index", repl.nextIndex) // Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce // messaging for read-only queries. If this is implemented, the lease @@ -1180,12 +1154,6 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { // configuration entry to the log. This must only be called from the // main thread. func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { - if !r.leaderState.applyable.TryAcquire(1) { - future.respond(fmt.Errorf("can't apply, semaphor held")) - return - } - defer r.leaderState.applyable.Release(1) - configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req) if err != nil { future.respond(err) @@ -1258,7 +1226,6 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { } r.leaderState.commitment.match(r.localID, lastIndex) - r.logger.Trace("dispatchLogs", "lastIndex", lastIndex) // Update the last log since it's on disk now r.setLastLog(lastIndex, term) diff --git a/testing.go b/testing.go index 044005dd..91cf6e76 100644 --- a/testing.go +++ b/testing.go @@ -211,16 +211,12 @@ func newTestLogger(tb testing.TB) hclog.Logger { // is logged after the test is complete. func newTestLoggerWithPrefix(tb testing.TB, prefix string) hclog.Logger { if testing.Verbose() { - return hclog.New(&hclog.LoggerOptions{ - Name: prefix, - Level: hclog.Trace, - }) + return hclog.New(&hclog.LoggerOptions{Name: prefix}) } return hclog.New(&hclog.LoggerOptions{ Name: prefix, Output: &testLoggerAdapter{tb: tb, prefix: prefix}, - Level: hclog.Trace, }) } From 04fdca60bc4364a3f4f849da3684429c90bab348 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 13:16:57 -0500 Subject: [PATCH 6/7] A bit more cleanup, plus now we wait for the transfer to complete or fail before responding to the transfer request. --- go.mod | 1 - go.sum | 1 - raft.go | 11 +++++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index aa2bb985..4cdae5d3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/go-msgpack/v2 v2.1.1 github.com/stretchr/testify v1.8.4 - golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e ) require ( diff --git a/go.sum b/go.sum index 7f331842..a670dd67 100644 --- a/go.sum +++ b/go.sum @@ -97,7 +97,6 @@ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/raft.go b/raft.go index ce532450..28c11283 100644 --- a/raft.go +++ b/raft.go @@ -692,9 +692,8 @@ func (r *Raft) leaderLoop() { case err := <-doneCh: if err != nil { r.logger.Debug(err.Error()) - } - future.respond(err) - if err == nil { + future.respond(err) + } else { // Wait for up to ElectionTimeout before flagging the // leadership transfer as done and unblocking applies in // the leaderLoop. @@ -702,9 +701,10 @@ func (r *Raft) leaderLoop() { case <-time.After(r.config().ElectionTimeout): err := fmt.Errorf("leadership transfer timeout") r.logger.Debug(err.Error()) + future.respond(err) case <-leftLeaderLoop: - err := fmt.Errorf("lost leadership during transfer (expected)") - r.logger.Debug(err.Error()) + r.logger.Debug("lost leadership during transfer (expected)") + future.respond(nil) } } } @@ -861,7 +861,6 @@ func (r *Raft) leaderLoop() { newLog.respond(ErrLeadershipTransferInProgress) continue } - // Group commit, gather all the ready commits ready := []*logFuture{newLog} GROUP_COMMIT_LOOP: From cb622973cd2c65dd2752c49d0520f2a3894b2d91 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 24 Nov 2023 15:15:39 -0500 Subject: [PATCH 7/7] Make the test more robust. Part of this required modifying GetInState, which is racy in that in between calling pollState and setting up event monitoring, the cluster could've elected a leader. When that happens, Leader() errors returning 0 leaders. --- raft_test.go | 59 +++++++++++++++++++++++++++++++++++++--------------- testing.go | 7 ++++--- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/raft_test.go b/raft_test.go index ec077bfd..3eaf1e3c 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2337,42 +2337,67 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { } } -func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { +func TestRaft_LeadershipTransferWithWrites(t *testing.T) { conf := inmemConfig(t) conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) c := MakeCluster(7, t, conf) defer c.Close() - // Wait for a leader - leader := c.Leader() - doneCh := make(chan struct{}) - t.Cleanup(func() { - close(doneCh) - }) + var writerErr error + var wg sync.WaitGroup + var writes int + wg.Add(1) + leader := c.Leader() go func() { + defer wg.Done() for { select { case <-doneCh: return default: future := leader.Apply([]byte("test"), 0) - if err := future.Error(); err != nil { - if errors.Is(err, ErrRaftShutdown) || errors.Is(err, ErrNotLeader) { - return - } - if !errors.Is(err, ErrLeadershipTransferInProgress) { - t.Logf("[ERR] err: %v", err) - } + switch err := future.Error(); { + case errors.Is(err, ErrRaftShutdown): + return + case errors.Is(err, ErrNotLeader): + leader = c.Leader() + case errors.Is(err, ErrLeadershipTransferInProgress): + continue + case errors.Is(err, ErrLeadershipLost): + continue + case err == nil: + writes++ + default: + writerErr = err } - time.Sleep(time.Microsecond) + time.Sleep(time.Millisecond) } } }() - time.Sleep(time.Second / 2) + + follower := c.Followers()[0] + future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) + if future.Error() != nil { + t.Fatalf("Didn't expect error: %v", future.Error()) + } + if follower.localID != c.Leader().localID { + t.Error("Leadership should have been transitioned to specified server.") + } + close(doneCh) + wg.Wait() + if writerErr != nil { + t.Fatal(writerErr) + } + t.Logf("writes: %d", writes) +} + +func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { + c := MakeCluster(7, t, nil) + defer c.Close() follower := c.GetInState(Follower)[0] - future := leader.LeadershipTransferToServer(follower.localID, follower.localAddr) + future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) if future.Error() != nil { t.Fatalf("Didn't expect error: %v", future.Error()) } diff --git a/testing.go b/testing.go index 91cf6e76..e0885714 100644 --- a/testing.go +++ b/testing.go @@ -433,7 +433,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { // restart the timer. pollStartTime := time.Now() for { - inState, highestTerm := c.pollState(s) + _, highestTerm := c.pollState(s) inStateTime := time.Now() // Sometimes this routine is called very early on before the @@ -479,8 +479,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft { c.t.Fatalf("timer channel errored") } - c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", - s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) + inState, highestTerm := c.pollState(s) + c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), highestTerm is %d, %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", + s, inStateTime, len(inState), highestTerm, inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) return inState } }