Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow raft apply committed logs before they are persisted #537

Merged
merged 14 commits into from
Mar 28, 2024
4 changes: 2 additions & 2 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5286,7 +5286,7 @@ fn test_group_commit_consistent() {
/// of the election with both priority and log.
#[test]
fn test_election_with_priority_log() {
let tests = vec![
let tests = [
// log is up to date or not 1..3, priority 1..3, id, state
(true, false, false, 3, 1, 1, 1, StateRole::Leader),
(true, false, false, 2, 2, 2, 1, StateRole::Leader),
Expand All @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() {
(false, false, true, 1, 1, 3, 1, StateRole::Leader),
];

for &(l1, l2, l3, p1, p2, p3, id, state) in tests.iter() {
for (l1, l2, l3, p1, p2, p3, id, state) in tests {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ pub struct Config {

/// Max size for committed entries in a `Ready`.
pub max_committed_size_per_ready: u64,

/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub max_apply_unpersisted_log_limit: u64,
}

impl Default for Config {
Expand All @@ -120,6 +124,7 @@ impl Default for Config {
priority: 0,
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
max_apply_unpersisted_log_limit: 0,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ before taking old, removed peers offline.
#![deny(clippy::all)]
#![deny(missing_docs)]
#![recursion_limit = "128"]
// TODO: remove this when we update the mininum rust compatible version.
#![allow(unused_imports)]
// This is necessary to support prost and rust-protobuf at the same time.
#![allow(clippy::useless_conversion)]
// This lint recommends some bad choices sometimes.
Expand Down
35 changes: 31 additions & 4 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl<T: Storage> Raft<T> {
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
raft_log: RaftLog::new(store, logger.clone(), c),
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
Expand Down Expand Up @@ -378,7 +378,9 @@ impl<T: Storage> Raft<T> {
r.load_state(&raft_state.hard_state);
}
if c.applied > 0 {
r.commit_apply(c.applied);
// at initialize, it is possible that applied_index > committed_index,
// so we should skip the check at `commit_apply`.
r.commit_apply_internal(c.applied, true);
}
r.become_follower(r.term, INVALID_ID);

Expand Down Expand Up @@ -596,6 +598,11 @@ impl<T: Storage> Raft<T> {
pub fn set_check_quorum(&mut self, check_quorum: bool) {
self.check_quorum = check_quorum;
}

/// Set the maximum limit that applied index can be ahead of persisted index.
pub fn set_max_apply_unpersisted_log_limit(&mut self, limit: u64) {
self.raft_log.max_apply_unpersisted_log_limit = limit;
}
}

impl<T: Storage> RaftCore<T> {
Expand Down Expand Up @@ -946,10 +953,30 @@ impl<T: Storage> Raft<T> {
/// # Hooks
///
/// * Post: Checks to see if it's time to finalize a Joint Consensus state.
#[inline]
pub fn commit_apply(&mut self, applied: u64) {
self.commit_apply_internal(applied, false)
}

/// Commit that the Raft peer has applied up to the given index.
///
/// Registers the new applied index to the Raft log.
/// if `skip_check` is true, will skip the applied_index check, this is only
/// used at initialization.
///
/// # Hooks
///
/// * Post: Checks to see if it's time to finalize a Joint Consensus state.
fn commit_apply_internal(&mut self, applied: u64, skip_check: bool) {
let old_applied = self.raft_log.applied;
#[allow(deprecated)]
self.raft_log.applied_to(applied);
if !skip_check {
#[allow(deprecated)]
self.raft_log.applied_to(applied);
} else {
// skip applied_index check at initialization.
assert!(applied > 0);
self.raft_log.applied_to_unchecked(applied);
}

// TODO: it may never auto_leave if leader steps down before enter joint is applied.
if self.prs.conf().auto_leave
Expand Down
Loading
Loading