diff --git a/src/config.rs b/src/config.rs index eed62efa..b03da44d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -98,6 +98,10 @@ pub struct Config { /// Max size for committed entries in a `Ready`. pub max_committed_size_per_ready: u64, + + /// Maximum raft log number that can be applied after commit but before persist. + /// The default value is 0, which means apply after both commit and persist. + pub applied_unpersisted_log_limit: u64, } impl Default for Config { @@ -120,6 +124,7 @@ impl Default for Config { priority: 0, max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, + applied_unpersisted_log_limit: 0, } } } diff --git a/src/raft.rs b/src/raft.rs index 44bea42a..58650c44 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -366,6 +366,7 @@ impl Raft { max_committed_size_per_ready: c.max_committed_size_per_ready, }, }; + r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; let new_cs = r.post_conf_change(); if !raft_proto::conf_state_eq(&new_cs, conf_state) { @@ -599,6 +600,10 @@ impl Raft { pub fn set_check_quorum(&mut self, check_quorum: bool) { self.check_quorum = check_quorum; } + + pub fn set_applied_unpersisted_log_limit(&mut self, limit: u64) { + self.raft_log.apply_unpersisted_log_limit = limit; + } } impl RaftCore { diff --git a/src/raft_log.rs b/src/raft_log.rs index 2024ba60..5dcaa398 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -48,14 +48,18 @@ pub struct RaftLog { /// storage. It's used for limiting the upper bound of committed and /// persisted entries. /// - /// Invariant: persisted < unstable.offset && applied <= persisted + /// Invariant: persisted < unstable.offset pub persisted: u64, /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= min(committed, persisted) + /// Invariant: applied <= min(committed, persisted + `apply_unpersisted_log_limit`) pub applied: u64, + + /// the maximum log gap between persisted_index and applied_index. + /// Caller should ensure the value won't lead to the upper bound overflow. + pub apply_unpersisted_log_limit: u64, } impl ToString for RaftLog @@ -87,6 +91,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), + apply_unpersisted_log_limit: 0, } } @@ -310,7 +315,11 @@ impl RaftLog { if idx == 0 { return; } - if idx > cmp::min(self.committed, self.persisted) || idx < self.applied { + // Do not check idx with committed or persisted index here becase when `apply_unpersisted_log_limit` > 0: + // 1. then it is possible idx > persisted. + // 2. when the application restart after applied but before committed entried(and committed index) is persisted + // then it is also possible idx > committed. + if idx < self.applied { fatal!( self.unstable.logger, "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", @@ -422,7 +431,7 @@ impl RaftLog { /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). pub fn next_entries_since(&self, since_idx: u64, max_size: Option) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.next_entries_upper_bound(); if high > offset { match self.slice( offset, @@ -437,6 +446,14 @@ impl RaftLog { None } + #[inline] + fn next_entries_upper_bound(&self) -> u64 { + std::cmp::min( + self.committed, + self.persisted + self.apply_unpersisted_log_limit, + ) + 1 + } + /// Returns all the available entries for execution. /// If applied is smaller than the index of snapshot, it returns all committed /// entries after the index of snapshot. @@ -448,7 +465,7 @@ impl RaftLog { /// max(`since_idx` + 1, first_index). pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.next_entries_upper_bound(); high > offset } @@ -1185,6 +1202,91 @@ mod test { ); } } + + let ents = [ + new_entry(4, 1), + new_entry(5, 1), + new_entry(6, 1), + new_entry(7, 1), + new_entry(8, 1), + new_entry(9, 1), + new_entry(10, 1), + ]; + const MAX: u64 = u32::MAX as u64; + let tests = vec![ + (0, 3, 3, 0, None), + (0, 3, 4, 0, None), + (0, 3, 4, MAX, Some(&ents[..1])), + (0, 4, 6, 0, Some(&ents[..1])), + (0, 4, 6, 2, Some(&ents[..3])), + (0, 4, 6, 6, Some(&ents[..3])), + (0, 4, 10, 0, Some(&ents[..1])), + (0, 4, 10, 2, Some(&ents[..3])), + (0, 4, 10, 6, Some(&ents)), + (0, 4, 10, 7, Some(&ents)), + (0, 6, 4, 0, Some(&ents[..1])), + (0, 6, 4, MAX, Some(&ents[..1])), + (0, 5, 5, 0, Some(&ents[..2])), + (3, 4, 3, MAX, None), + (3, 5, 5, MAX, Some(&ents[..2])), + (3, 6, 7, MAX, Some(&ents[..4])), + (3, 7, 6, MAX, Some(&ents[..3])), + (4, 5, 5, MAX, Some(&ents[1..2])), + (4, 5, 5, MAX, Some(&ents[1..2])), + (4, 5, 7, MAX, Some(&ents[1..4])), + (4, 5, 9, MAX, Some(&ents[1..6])), + (4, 5, 10, MAX, Some(&ents[1..])), + (4, 7, 5, MAX, Some(&ents[1..2])), + (4, 7, 7, 0, Some(&ents[1..4])), + (5, 5, 5, 0, None), + (5, 7, 7, MAX, Some(&ents[2..4])), + (7, 7, 7, MAX, None), + ]; + for (i, &(applied, persisted, committed, limit, ref expect_entries)) in + tests.iter().enumerate() + { + let store = MemStorage::new(); + store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); + let mut raft_log = RaftLog::new(store, l.clone()); + raft_log.apply_unpersisted_log_limit = limit; + raft_log.append(&ents); + let unstable = raft_log.unstable_entries().to_vec(); + if let Some(e) = unstable.last() { + raft_log.stable_entries(e.get_index(), e.get_term()); + raft_log.mut_store().wl().append(&unstable).expect(""); + } + raft_log.maybe_persist(persisted, 1); + assert_eq!( + persisted, raft_log.persisted, + "#{}: persisted = {}, want {}", + i, raft_log.persisted, persisted + ); + raft_log.maybe_commit(committed, 1); + assert_eq!( + committed, raft_log.committed, + "#{}: committed = {}, want {}", + i, raft_log.committed, committed + ); + #[allow(deprecated)] + raft_log.applied_to(applied); + + let expect_has_next = expect_entries.is_some(); + let actual_has_next = raft_log.has_next_entries(); + if actual_has_next != expect_has_next { + panic!( + "#{}: hasNext = {}, want {}", + i, actual_has_next, expect_has_next + ); + } + + let next_entries = raft_log.next_entries(None); + if next_entries != expect_entries.map(|n| n.to_vec()) { + panic!( + "#{}: next_entries = {:?}, want {:?}", + i, next_entries, expect_entries + ); + } + } } #[test]