diff --git a/raft/raft.go b/raft/raft.go index 70a260dbe65..7be4407ee2b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) { return case pb.MsgReadIndex: if r.quorum() > 1 { + if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { + // Reject read only request when this leader has not committed any log entry at its term. + return + } + // thinking: use an interally defined context instead of the user given context. // We can express this in terms of the term and index instead of a user-supplied value. // This would allow multiple reads to piggyback on the same message. diff --git a/raft/raft_test.go b/raft/raft_test.go index a6fc4d98a40..c429651284d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1856,6 +1856,77 @@ func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) { } } +// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message +// when it commits at least one log entry at it term. +func TestReadOnlyForNewLeader(t *testing.T) { + cfg := newTestConfig(1, []uint64{1, 2, 3}, 10, 1, + &MemoryStorage{ + ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}, + hardState: pb.HardState{Commit: 1, Term: 1}, + }) + cfg.Applied = 1 + a := newRaft(cfg) + cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1, + &MemoryStorage{ + ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, + hardState: pb.HardState{Commit: 2, Term: 1}, + }) + cfg.Applied = 2 + b := newRaft(cfg) + cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1, + &MemoryStorage{ + ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, + hardState: pb.HardState{Commit: 2, Term: 1}, + }) + cfg.Applied = 2 + c := newRaft(cfg) + nt := newNetwork(a, b, c) + + // Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader. + nt.ignore(pb.MsgApp) + // Force peer a to become leader. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + if a.state != StateLeader { + t.Fatalf("state = %s, want %s", a.state, StateLeader) + } + + // Ensure peer a drops read only request. + var windex uint64 = 4 + wctx := []byte("ctx") + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) + if len(a.readStates) != 0 { + t.Fatalf("len(readStates) = %d, want zero", len(a.readStates)) + } + + nt.recover() + + // Force peer a to commit a log entry at its term + for i := 0; i < a.heartbeatTimeout; i++ { + a.tick() + } + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + if a.raftLog.committed != 4 { + t.Fatalf("committed = %d, want 4", a.raftLog.committed) + } + lastLogTerm := a.raftLog.zeroTermOnErrCompacted(a.raftLog.term(a.raftLog.committed)) + if lastLogTerm != a.Term { + t.Fatalf("last log term = %d, want %d", lastLogTerm, a.Term) + } + + // Ensure peer a accepts read only request after it commits a entry at its term. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) + if len(a.readStates) != 1 { + t.Fatalf("len(readStates) = %d, want 1", len(a.readStates)) + } + rs := a.readStates[0] + if rs.Index != windex { + t.Fatalf("readIndex = %d, want %d", rs.Index, windex) + } + if !bytes.Equal(rs.RequestCtx, wctx) { + t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx) + } +} + func TestLeaderAppResp(t *testing.T) { // initial progress: match = 0; next = 3 tests := []struct {