Skip to content

Commit

Permalink
raft: send up-to-date commit index in heartbeats
Browse files Browse the repository at this point in the history
TODO: describe why it is now safe

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 26, 2024
1 parent 9bffaa4 commit ffbd5f7
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 40 deletions.
15 changes: 3 additions & 12 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,21 +672,12 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Commit: r.raftLog.committed,
Context: ctx,
}

r.send(m)
})
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down
13 changes: 2 additions & 11 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2699,10 +2699,6 @@ func TestBcastBeat(t *testing.T) {
if len(msgs) != 2 {
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
}
wantCommitMap := map[uint64]uint64{
2: min(sm.raftLog.committed, sm.trk.Progress[2].Match),
3: min(sm.raftLog.committed, sm.trk.Progress[3].Match),
}
for i, m := range msgs {
if m.Type != pb.MsgHeartbeat {
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
Expand All @@ -2713,13 +2709,8 @@ func TestBcastBeat(t *testing.T) {
if m.LogTerm != 0 {
t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
}
if wantCommitMap[m.To] == 0 {
t.Fatalf("#%d: unexpected to %d", i, m.To)
} else {
if m.Commit != wantCommitMap[m.To] {
t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
}
delete(wantCommitMap, m.To)
if m.Commit != sm.raftLog.committed {
t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, sm.raftLog.committed)
}
if len(m.Entries) != 0 {
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
Expand Down
14 changes: 7 additions & 7 deletions testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,16 @@ process-ready 4
----
Ready MustSync=false:
Messages:
4->1 MsgHeartbeat Term:3 Log:0/0
4->2 MsgHeartbeat Term:3 Log:0/0
4->3 MsgHeartbeat Term:3 Log:0/0
4->5 MsgHeartbeat Term:3 Log:0/0
4->6 MsgHeartbeat Term:3 Log:0/0
4->7 MsgHeartbeat Term:3 Log:0/0
4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->2 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->3 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->5 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->6 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->7 MsgHeartbeat Term:3 Log:0/0 Commit:11

deliver-msgs 1
----
4->1 MsgHeartbeat Term:3 Log:0/0
4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11
INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3]
INFO 1 became follower at term 3

Expand Down
14 changes: 8 additions & 6 deletions testdata/lagging_commit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ process-ready 1
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:13
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13

# Since the heartbeat message does not bump the follower's commit index, it will
# take another roundtrip with the leader to update it. As such, the total time
Expand All @@ -146,12 +146,18 @@ Messages:
# leader sent the up-to-date commit index in the heartbeat message.
#
# See https://github.com/etcd-io/raft/issues/138 which aims to fix this.
#
# Now this is fixed!
stabilize 1 3
----
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:13
CommittedEntries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 receiving messages
Expand All @@ -164,10 +170,6 @@ stabilize 1 3
1->3 MsgApp Term:1 Log:1/13 Commit:13
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:13
CommittedEntries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
3->1 MsgAppResp Term:1 Log:0/13
> 1 receiving messages
Expand Down
4 changes: 2 additions & 2 deletions testdata/replicate_pause.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ stabilize 1
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17

stabilize 2 3
----
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17
> 2 handling Ready
Ready MustSync=false:
Messages:
Expand Down
4 changes: 2 additions & 2 deletions testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ process-ready 1
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11

# Iterate until no more work is done by the new peer. It receives the heartbeat
# and responds.
stabilize 3
----
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1]
INFO 3 became follower at term 1
> 3 handling Ready
Expand Down

0 comments on commit ffbd5f7

Please sign in to comment.