diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 3b73287c..a62266d2 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -5286,7 +5286,7 @@ fn test_group_commit_consistent() { /// of the election with both priority and log. #[test] fn test_election_with_priority_log() { - let tests = vec![ + let tests = [ // log is up to date or not 1..3, priority 1..3, id, state (true, false, false, 3, 1, 1, 1, StateRole::Leader), (true, false, false, 2, 2, 2, 1, StateRole::Leader), @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() { (false, false, true, 1, 1, 3, 1, StateRole::Leader), ]; - for &(l1, l2, l3, p1, p2, p3, id, state) in tests.iter() { + for (l1, l2, l3, p1, p2, p3, id, state) in tests { let l = default_logger(); let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); diff --git a/src/config.rs b/src/config.rs index eed62efa..1bc6d323 100644 --- a/src/config.rs +++ b/src/config.rs @@ -98,6 +98,10 @@ pub struct Config { /// Max size for committed entries in a `Ready`. pub max_committed_size_per_ready: u64, + + /// Maximum raft log number that can be applied after commit but before persist. + /// The default value is 0, which means apply after both commit and persist. + pub max_apply_unpersisted_log_limit: u64, } impl Default for Config { @@ -120,6 +124,7 @@ impl Default for Config { priority: 0, max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, + max_apply_unpersisted_log_limit: 0, } } } diff --git a/src/lib.rs b/src/lib.rs index 131d716d..49444b56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -479,6 +479,8 @@ before taking old, removed peers offline. #![deny(clippy::all)] #![deny(missing_docs)] #![recursion_limit = "128"] +// TODO: remove this when we update the mininum rust compatible version. +#![allow(unused_imports)] // This is necessary to support prost and rust-protobuf at the same time. #![allow(clippy::useless_conversion)] // This lint recommends some bad choices sometimes. diff --git a/src/raft.rs b/src/raft.rs index 30f18930..b63e3fd1 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -330,7 +330,7 @@ impl Raft { r: RaftCore { id: c.id, read_states: Default::default(), - raft_log: RaftLog::new(store, logger.clone()), + raft_log: RaftLog::new(store, logger.clone(), c), max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, pending_request_snapshot: INVALID_INDEX, @@ -378,7 +378,9 @@ impl Raft { r.load_state(&raft_state.hard_state); } if c.applied > 0 { - r.commit_apply(c.applied); + // at initialize, it is possible that applied_index > committed_index, + // so we should skip the check at `commit_apply`. + r.commit_apply_internal(c.applied, true); } r.become_follower(r.term, INVALID_ID); @@ -596,6 +598,11 @@ impl Raft { pub fn set_check_quorum(&mut self, check_quorum: bool) { self.check_quorum = check_quorum; } + + /// Set the maximum limit that applied index can be ahead of persisted index. + pub fn set_max_apply_unpersisted_log_limit(&mut self, limit: u64) { + self.raft_log.max_apply_unpersisted_log_limit = limit; + } } impl RaftCore { @@ -946,10 +953,30 @@ impl Raft { /// # Hooks /// /// * Post: Checks to see if it's time to finalize a Joint Consensus state. + #[inline] pub fn commit_apply(&mut self, applied: u64) { + self.commit_apply_internal(applied, false) + } + + /// Commit that the Raft peer has applied up to the given index. + /// + /// Registers the new applied index to the Raft log. + /// if `skip_check` is true, will skip the applied_index check, this is only + /// used at initialization. + /// + /// # Hooks + /// + /// * Post: Checks to see if it's time to finalize a Joint Consensus state. + fn commit_apply_internal(&mut self, applied: u64, skip_check: bool) { let old_applied = self.raft_log.applied; - #[allow(deprecated)] - self.raft_log.applied_to(applied); + if !skip_check { + #[allow(deprecated)] + self.raft_log.applied_to(applied); + } else { + // skip applied_index check at initialization. + assert!(applied > 0); + self.raft_log.applied_to_unchecked(applied); + } // TODO: it may never auto_leave if leader steps down before enter joint is applied. if self.prs.conf().auto_leave diff --git a/src/raft_log.rs b/src/raft_log.rs index dfff305c..28aa3f37 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -19,6 +19,7 @@ use std::cmp; use slog::warn; use slog::Logger; +use crate::config::Config; use crate::eraftpb::{Entry, Snapshot}; use crate::errors::{Error, Result, StorageError}; use crate::log_unstable::Unstable; @@ -42,20 +43,30 @@ pub struct RaftLog { /// on a quorum of nodes. /// /// Invariant: applied <= committed + /// NOTE: this invariant can be break after restart if max_apply_unpersisted_log_limit > 0, + /// but once the committed catches up with applied, it should never fall behind again. pub committed: u64, /// The highest log position that is known to be persisted in stable /// storage. It's used for limiting the upper bound of committed and /// persisted entries. /// - /// Invariant: persisted < unstable.offset && applied <= persisted + /// Invariant: persisted < unstable.offset pub persisted: u64, /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= min(committed, persisted) + /// Invariant: applied <= committed. + /// NOTE: + /// - this invariant can be break after restart if max_apply_unpersisted_log_limit > 0, + /// but once the committed catches up with applied, it should never fall behind again. + /// - if `max_apply_unpersisted_log_limit` is 0, applied < persisted is also ensured + /// (if it is changed from >0 to 0, it is ensured after persisted catching up with applied). pub applied: u64, + + /// The maximum log gap between persisted and applied. + pub max_apply_unpersisted_log_limit: u64, } impl ToString for RaftLog @@ -76,7 +87,7 @@ where impl RaftLog { /// Creates a new raft log with a given storage and tag. - pub fn new(store: T, logger: Logger) -> RaftLog { + pub fn new(store: T, logger: Logger, cfg: &Config) -> RaftLog { let first_index = store.first_index().unwrap(); let last_index = store.last_index().unwrap(); @@ -87,6 +98,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), + max_apply_unpersisted_log_limit: cfg.max_apply_unpersisted_log_limit, } } @@ -310,7 +322,7 @@ impl RaftLog { if idx == 0 { return; } - if idx > cmp::min(self.committed, self.persisted) || idx < self.applied { + if idx > self.applied_index_upper_bound() || idx < self.applied { fatal!( self.unstable.logger, "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", @@ -320,6 +332,11 @@ impl RaftLog { self.persisted, ) } + self.applied_to_unchecked(idx); + } + + #[inline] + pub(crate) fn applied_to_unchecked(&mut self, idx: u64) { self.applied = idx; } @@ -422,7 +439,7 @@ impl RaftLog { /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). pub fn next_entries_since(&self, since_idx: u64, max_size: Option) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.applied_index_upper_bound() + 1; if high > offset { match self.slice( offset, @@ -437,6 +454,14 @@ impl RaftLog { None } + #[inline] + fn applied_index_upper_bound(&self) -> u64 { + std::cmp::min( + self.committed, + self.persisted + self.max_apply_unpersisted_log_limit, + ) + } + /// Returns all the available entries for execution. /// If applied is smaller than the index of snapshot, it returns all committed /// entries after the index of snapshot. @@ -448,7 +473,7 @@ impl RaftLog { /// max(`since_idx` + 1, first_index). pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.applied_index_upper_bound() + 1; high > offset } @@ -702,6 +727,7 @@ mod test { panic::{self, AssertUnwindSafe}, }; + use crate::config::Config; use crate::default_logger; use crate::eraftpb; use crate::errors::{Error, StorageError}; @@ -775,7 +801,7 @@ mod test { ]; for (i, &(ref ents, wconflict)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); let gconflict = raft_log.find_conflict(ents); if gconflict != wconflict { @@ -788,7 +814,7 @@ mod test { fn test_is_up_to_date() { let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)]; let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&previous_ents); let tests = vec![ // greater term, ignore lastIndex @@ -837,7 +863,7 @@ mod test { for (i, &(ref ents, windex, ref wents, wunstable)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().append(&previous_ents).expect("append failed"); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); let index = raft_log.append(ents); if index != windex { panic!("#{}: last_index = {}, want {}", i, index, windex); @@ -867,7 +893,7 @@ mod test { .append(&[new_entry(i, i)]) .expect("append failed"); } - let mut raft_log = RaftLog::new(storage, default_logger()); + let mut raft_log = RaftLog::new(storage, default_logger(), &Config::default()); for i in unstable_index..last_index { raft_log.append(&[new_entry(i + 1, i + 1)]); } @@ -914,7 +940,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(storagesnapi, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.restore(new_snapshot(unstablesnapi, 1)); assert_eq!(raft_log.committed, unstablesnapi); assert_eq!(raft_log.persisted, storagesnapi); @@ -947,7 +973,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1..num { raft_log.append(&[new_entry(offset + i, i)]); } @@ -978,7 +1004,7 @@ mod test { .expect("apply failed."); let entries = vec![new_entry(index + 1, term), new_entry(index + 2, term + 1)]; store.wl().append(&entries).expect(""); - let raft_log = RaftLog::new(store, default_logger()); + let raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.all_entries(), entries); assert_eq!(index + 1, raft_log.first_index()); @@ -1047,7 +1073,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(snap_index, snap_term)) .expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); assert_eq!(raft_log.persisted, snap_index); raft_log.append(new_ents); let unstable = raft_log.unstable_entries().to_vec(); @@ -1065,7 +1091,7 @@ mod test { } } - let mut raft_log = RaftLog::new(MemStorage::new(), default_logger()); + let mut raft_log = RaftLog::new(MemStorage::new(), default_logger(), &Config::default()); raft_log.restore(new_snapshot(100, 1)); assert_eq!(raft_log.unstable.offset, 101); raft_log.append(&[new_entry(101, 1)]); @@ -1095,7 +1121,7 @@ mod test { .expect(""); // append unstable entries to raftlog - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents[(unstable - 1)..]); let ents = raft_log.unstable_entries().to_vec(); @@ -1146,7 +1172,92 @@ mod test { for (i, &(applied, persisted, committed, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); + raft_log.append(&ents); + let unstable = raft_log.unstable_entries().to_vec(); + if let Some(e) = unstable.last() { + raft_log.stable_entries(e.get_index(), e.get_term()); + raft_log.mut_store().wl().append(&unstable).expect(""); + } + raft_log.maybe_persist(persisted, 1); + assert_eq!( + persisted, raft_log.persisted, + "#{}: persisted = {}, want {}", + i, raft_log.persisted, persisted + ); + raft_log.maybe_commit(committed, 1); + assert_eq!( + committed, raft_log.committed, + "#{}: committed = {}, want {}", + i, raft_log.committed, committed + ); + #[allow(deprecated)] + raft_log.applied_to(applied); + + let expect_has_next = expect_entries.is_some(); + let actual_has_next = raft_log.has_next_entries(); + if actual_has_next != expect_has_next { + panic!( + "#{}: hasNext = {}, want {}", + i, actual_has_next, expect_has_next + ); + } + + let next_entries = raft_log.next_entries(None); + if next_entries != expect_entries.map(|n| n.to_vec()) { + panic!( + "#{}: next_entries = {:?}, want {:?}", + i, next_entries, expect_entries + ); + } + } + + let ents = [ + new_entry(4, 1), + new_entry(5, 1), + new_entry(6, 1), + new_entry(7, 1), + new_entry(8, 1), + new_entry(9, 1), + new_entry(10, 1), + ]; + const UNLIMITED: u64 = u32::MAX as u64; + let tests = vec![ + (0, 3, 3, 0, None), + (0, 3, 4, 0, None), + (0, 3, 4, UNLIMITED, Some(&ents[..1])), + (0, 4, 6, 0, Some(&ents[..1])), + (0, 4, 6, 2, Some(&ents[..3])), + (0, 4, 6, 6, Some(&ents[..3])), + (0, 4, 10, 0, Some(&ents[..1])), + (0, 4, 10, 2, Some(&ents[..3])), + (0, 4, 10, 6, Some(&ents)), + (0, 4, 10, 7, Some(&ents)), + (0, 6, 4, 0, Some(&ents[..1])), + (0, 6, 4, UNLIMITED, Some(&ents[..1])), + (0, 5, 5, 0, Some(&ents[..2])), + (3, 4, 3, UNLIMITED, None), + (3, 5, 5, UNLIMITED, Some(&ents[..2])), + (3, 6, 7, UNLIMITED, Some(&ents[..4])), + (3, 7, 6, UNLIMITED, Some(&ents[..3])), + (4, 5, 5, UNLIMITED, Some(&ents[1..2])), + (4, 5, 5, UNLIMITED, Some(&ents[1..2])), + (4, 5, 7, UNLIMITED, Some(&ents[1..4])), + (4, 5, 9, UNLIMITED, Some(&ents[1..6])), + (4, 5, 10, UNLIMITED, Some(&ents[1..])), + (4, 7, 5, UNLIMITED, Some(&ents[1..2])), + (4, 7, 7, 0, Some(&ents[1..4])), + (5, 5, 5, 0, None), + (5, 7, 7, UNLIMITED, Some(&ents[2..4])), + (7, 7, 7, UNLIMITED, None), + ]; + for (i, &(applied, persisted, committed, limit, ref expect_entries)) in + tests.iter().enumerate() + { + let store = MemStorage::new(); + store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); + raft_log.max_apply_unpersisted_log_limit = limit; raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); if let Some(e) = unstable.last() { @@ -1206,7 +1317,7 @@ mod test { .append(&[new_entry(offset + i, offset + i)]) .expect(""); } - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in (num / 2)..num { raft_log.append(&[new_entry(offset + i, offset + i)]); } @@ -1345,7 +1456,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap(); store.wl().append(&entries(offset + 1, half)).unwrap(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&entries(half, last)); // Test that scan() returns the same entries as slice(), on all inputs. @@ -1594,7 +1705,7 @@ mod test { tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = commit; raft_log.persisted = persist; @@ -1649,7 +1760,7 @@ mod test { ]; for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = previous_commit; let has_panic = @@ -1686,7 +1797,7 @@ mod test { for i in 1u64..index { store.wl().append(&[new_entry(i, 0)]).expect(""); } - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.maybe_commit(index - 1, 0); let committed = raft_log.committed; #[allow(deprecated)] @@ -1716,7 +1827,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 0)) .expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1u64..=num { raft_log.append(&[new_entry(i + offset, 0)]); } @@ -1766,7 +1877,7 @@ mod test { fn test_restore_snap() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(100, 1)).expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.committed, 100); assert_eq!(raft_log.persisted, 100); raft_log.restore(new_snapshot(200, 1));