Skip to content

Commit

Permalink
allow raft apply committed logs before they are persisted
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Mar 11, 2024
1 parent 65a0062 commit 7d8dc02
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ pub struct Config {

/// Max size for committed entries in a `Ready`.
pub max_committed_size_per_ready: u64,

/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub applied_unpersisted_log_limit: u64,
}

impl Default for Config {
Expand All @@ -120,6 +124,7 @@ impl Default for Config {
priority: 0,
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
applied_unpersisted_log_limit: 0,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ impl<T: Storage> Raft<T> {
max_committed_size_per_ready: c.max_committed_size_per_ready,
},
};
r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit;
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
let new_cs = r.post_conf_change();
if !raft_proto::conf_state_eq(&new_cs, conf_state) {
Expand Down Expand Up @@ -599,6 +600,10 @@ impl<T: Storage> Raft<T> {
pub fn set_check_quorum(&mut self, check_quorum: bool) {
self.check_quorum = check_quorum;
}

pub fn set_applied_unpersisted_log_limit(&mut self, limit: u64) {
self.raft_log.apply_unpersisted_log_limit = limit;
}
}

impl<T: Storage> RaftCore<T> {
Expand Down
112 changes: 107 additions & 5 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ pub struct RaftLog<T: Storage> {
/// storage. It's used for limiting the upper bound of committed and
/// persisted entries.
///
/// Invariant: persisted < unstable.offset && applied <= persisted
/// Invariant: persisted < unstable.offset
pub persisted: u64,

/// The highest log position that the application has been instructed
/// to apply to its state machine.
///
/// Invariant: applied <= min(committed, persisted)
/// Invariant: applied <= min(committed, persisted + `apply_unpersisted_log_limit`)
pub applied: u64,

/// the maximum log gap between persisted_index and applied_index.
/// Caller should ensure the value won't lead to the upper bound overflow.
pub apply_unpersisted_log_limit: u64,
}

impl<T> ToString for RaftLog<T>
Expand Down Expand Up @@ -87,6 +91,7 @@ impl<T: Storage> RaftLog<T> {
persisted: last_index,
applied: first_index - 1,
unstable: Unstable::new(last_index + 1, logger),
apply_unpersisted_log_limit: 0,
}
}

Expand Down Expand Up @@ -310,7 +315,11 @@ impl<T: Storage> RaftLog<T> {
if idx == 0 {
return;
}
if idx > cmp::min(self.committed, self.persisted) || idx < self.applied {
// Do not check idx with committed or persisted index here becase when `apply_unpersisted_log_limit` > 0:
// 1. then it is possible idx > persisted.
// 2. when the application restart after applied but before committed entried(and committed index) is persisted
// then it is also possible idx > committed.
if idx < self.applied {
fatal!(
self.unstable.logger,
"applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]",
Expand Down Expand Up @@ -422,7 +431,7 @@ impl<T: Storage> RaftLog<T> {
/// Returns committed and persisted entries since max(`since_idx` + 1, first_index).
pub fn next_entries_since(&self, since_idx: u64, max_size: Option<u64>) -> Option<Vec<Entry>> {
let offset = cmp::max(since_idx + 1, self.first_index());
let high = cmp::min(self.committed, self.persisted) + 1;
let high = self.next_entries_upper_bound();
if high > offset {
match self.slice(
offset,
Expand All @@ -437,6 +446,14 @@ impl<T: Storage> RaftLog<T> {
None
}

#[inline]
fn next_entries_upper_bound(&self) -> u64 {
std::cmp::min(
self.committed,
self.persisted + self.apply_unpersisted_log_limit,
) + 1
}

/// Returns all the available entries for execution.
/// If applied is smaller than the index of snapshot, it returns all committed
/// entries after the index of snapshot.
Expand All @@ -448,7 +465,7 @@ impl<T: Storage> RaftLog<T> {
/// max(`since_idx` + 1, first_index).
pub fn has_next_entries_since(&self, since_idx: u64) -> bool {
let offset = cmp::max(since_idx + 1, self.first_index());
let high = cmp::min(self.committed, self.persisted) + 1;
let high = self.next_entries_upper_bound();
high > offset
}

Expand Down Expand Up @@ -1185,6 +1202,91 @@ mod test {
);
}
}

let ents = [
new_entry(4, 1),
new_entry(5, 1),
new_entry(6, 1),
new_entry(7, 1),
new_entry(8, 1),
new_entry(9, 1),
new_entry(10, 1),
];
const MAX: u64 = u32::MAX as u64;
let tests = vec![
(0, 3, 3, 0, None),
(0, 3, 4, 0, None),
(0, 3, 4, MAX, Some(&ents[..1])),
(0, 4, 6, 0, Some(&ents[..1])),
(0, 4, 6, 2, Some(&ents[..3])),
(0, 4, 6, 6, Some(&ents[..3])),
(0, 4, 10, 0, Some(&ents[..1])),
(0, 4, 10, 2, Some(&ents[..3])),
(0, 4, 10, 6, Some(&ents)),
(0, 4, 10, 7, Some(&ents)),
(0, 6, 4, 0, Some(&ents[..1])),
(0, 6, 4, MAX, Some(&ents[..1])),
(0, 5, 5, 0, Some(&ents[..2])),
(3, 4, 3, MAX, None),
(3, 5, 5, MAX, Some(&ents[..2])),
(3, 6, 7, MAX, Some(&ents[..4])),
(3, 7, 6, MAX, Some(&ents[..3])),
(4, 5, 5, MAX, Some(&ents[1..2])),
(4, 5, 5, MAX, Some(&ents[1..2])),
(4, 5, 7, MAX, Some(&ents[1..4])),
(4, 5, 9, MAX, Some(&ents[1..6])),
(4, 5, 10, MAX, Some(&ents[1..])),
(4, 7, 5, MAX, Some(&ents[1..2])),
(4, 7, 7, 0, Some(&ents[1..4])),
(5, 5, 5, 0, None),
(5, 7, 7, MAX, Some(&ents[2..4])),
(7, 7, 7, MAX, None),
];
for (i, &(applied, persisted, committed, limit, ref expect_entries)) in
tests.iter().enumerate()
{
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(3, 1)).expect("");
let mut raft_log = RaftLog::new(store, l.clone());
raft_log.apply_unpersisted_log_limit = limit;
raft_log.append(&ents);
let unstable = raft_log.unstable_entries().to_vec();
if let Some(e) = unstable.last() {
raft_log.stable_entries(e.get_index(), e.get_term());
raft_log.mut_store().wl().append(&unstable).expect("");
}
raft_log.maybe_persist(persisted, 1);
assert_eq!(
persisted, raft_log.persisted,
"#{}: persisted = {}, want {}",
i, raft_log.persisted, persisted
);
raft_log.maybe_commit(committed, 1);
assert_eq!(
committed, raft_log.committed,
"#{}: committed = {}, want {}",
i, raft_log.committed, committed
);
#[allow(deprecated)]
raft_log.applied_to(applied);

let expect_has_next = expect_entries.is_some();
let actual_has_next = raft_log.has_next_entries();
if actual_has_next != expect_has_next {
panic!(
"#{}: hasNext = {}, want {}",
i, actual_has_next, expect_has_next
);
}

let next_entries = raft_log.next_entries(None);
if next_entries != expect_entries.map(|n| n.to_vec()) {
panic!(
"#{}: next_entries = {:?}, want {:?}",
i, next_entries, expect_entries
);
}
}
}

#[test]
Expand Down

0 comments on commit 7d8dc02

Please sign in to comment.