Skip to content

Commit

Permalink
[Consensus] Disable periodic synchronizer when commit is lagging and …
Browse files Browse the repository at this point in the history
…far missing blocks (#19763)

## Description 

This is fixing/improving the behaviour of the periodic synchronizer to:
(1) allow the backfilling of missing blocks when commit is lagging but
we do have missing blocks that are in an tolerable threshold range from
the current highest accepted round
(2) stop the periodic synchronizer when commit is lagging and missing
blocks are too far in the future

Point (1) will allow bypassing a recently observed issue where
equivocating blocks make their way into the DAG but they are not
committed (due to the current commit rule this is not allowed). Then
nodes that are using the commit syncer to catch up might come across
blocks that appear on the fetched committed sub dags but do have
ancestor dependencies on equivocated blocks that never got committed.
Currently as the synchronizer is disabled nothing will attempts to fetch
the missing block effectively making the catch up stop. Now with change
(1) will be able to do so.

## Test plan 

CI/PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Oct 11, 2024
1 parent a4d9207 commit 35221c7
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ tonic:
connection_buffer_size: 33554432
excessive_message_size: 16777216
message_size_limit: 67108864

8 changes: 6 additions & 2 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
}

debug!(
"Fetched certified blocks: {}",
"Fetched certified blocks for commit range {:?}: {}",
fetched_commit_range,
blocks.iter().map(|b| b.reference().to_string()).join(","),
);
// If core thread cannot handle the incoming blocks, it is ok to block here.
Expand All @@ -301,7 +302,10 @@ impl<C: NetworkClient> CommitSyncer<C> {
match self.inner.core_thread_dispatcher.add_blocks(blocks).await {
Ok(missing) => {
if !missing.is_empty() {
warn!("Fetched blocks have missing ancestors: {:?}", missing);
warn!(
"Fetched blocks have missing ancestors: {:?} for commit range {:?}",
missing, fetched_commit_range
);
}
}
Err(e) => {
Expand Down
188 changes: 155 additions & 33 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ const MAX_BLOCKS_PER_FETCH: usize = 32;

const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;

/// The number of rounds above the highest accepted round that still willing to fetch missing blocks via the periodic
/// synchronizer. Any missing blocks of higher rounds are considered too far in the future to fetch. This property is taken into
/// account only when it's detected that the node has fallen behind on its commit compared to the rest of the network, otherwise
/// scheduler will attempt to fetch any missing block.
const SYNC_MISSING_BLOCK_ROUND_THRESHOLD: u32 = 50;

struct BlocksGuard {
map: Arc<InflightBlocksMap>,
block_refs: BTreeSet<BlockRef>,
Expand Down Expand Up @@ -827,19 +833,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}

async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
if commit_lagging {
trace!("Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index}");
self.context
.metrics
.node_metrics
.fetch_blocks_scheduler_skipped
.with_label_values(&["commit_lagging"])
.inc();
return Ok(());
}

let missing_blocks = self
let mut missing_blocks = self
.core_dispatcher
.get_missing_blocks()
.await
Expand All @@ -859,6 +853,31 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let commands_sender = self.commands_sender.clone();
let dag_state = self.dag_state.clone();

let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
if commit_lagging {
// As node is commit lagging try to sync only the missing blocks that are within the acceptable round thresholds to sync. The rest we don't attempt to
// sync yet.
let highest_accepted_round = dag_state.read().highest_accepted_round();
missing_blocks = missing_blocks
.into_iter()
.take_while(|b| {
b.round <= highest_accepted_round + SYNC_MISSING_BLOCK_ROUND_THRESHOLD
})
.collect::<BTreeSet<_>>();

// If no missing blocks are within the acceptable thresholds to sync while we commit lag, then we disable the scheduler completely for this run.
if missing_blocks.is_empty() {
trace!("Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future.");
self.context
.metrics
.node_metrics
.fetch_blocks_scheduler_skipped
.with_label_values(&["commit_lagging"])
.inc();
return Ok(());
}
}

self.fetch_blocks_scheduler_task
.spawn(monitored_future!(async move {
let _scope = monitored_scope("FetchMissingBlocksScheduler");
Expand Down Expand Up @@ -1036,8 +1055,11 @@ mod tests {
use parking_lot::RwLock;
use tokio::{sync::Mutex, time::sleep};

use crate::commit::{CommitVote, TrustedCommit};
use crate::{authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher};
use crate::{
authority_service::COMMIT_LAG_MULTIPLIER,
core_thread::MockCoreThreadDispatcher,
synchronizer::{MAX_BLOCKS_PER_FETCH, SYNC_MISSING_BLOCK_ROUND_THRESHOLD},
};
use crate::{
block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
block_verifier::NoopBlockVerifier,
Expand All @@ -1054,6 +1076,10 @@ mod tests {
},
CommitDigest, CommitIndex,
};
use crate::{
commit::{CommitVote, TrustedCommit},
BlockAPI,
};

type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
Expand Down Expand Up @@ -1444,7 +1470,8 @@ mod tests {
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_skip_when_commit_lagging() {
async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds(
) {
// GIVEN
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context);
Expand All @@ -1455,8 +1482,96 @@ mod tests {
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));

// AND stub some missing blocks
let expected_blocks = (0..10)
// AND stub some missing blocks. The highest accepted round is 0. Create some blocks that are below and above the threshold sync.
let expected_blocks = (0..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();

let missing_blocks = expected_blocks
.iter()
.map(|block| block.reference())
.collect::<BTreeSet<_>>();
core_dispatcher.stub_missing_blocks(missing_blocks).await;

// AND stub the requests for authority 1 & 2
// Make the first authority timeout, so the second will be called. "We" are authority = 0, so
// we are skipped anyways.
let mut expected_blocks = expected_blocks
.into_iter()
.filter(|block| block.round() <= SYNC_MISSING_BLOCK_ROUND_THRESHOLD)
.collect::<Vec<_>>();

for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
network_client
.stub_fetch_blocks(
chunk.to_vec(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;

network_client
.stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
.await;
}

// Now create some blocks to simulate a commit lag
let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
let commit_index: CommitIndex = round - 1;
let blocks = (0..4)
.map(|authority| {
let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
let block = TestBlock::new(round, authority)
.set_commit_votes(commit_votes)
.build();

VerifiedBlock::new_for_test(block)
})
.collect::<Vec<_>>();

// Pass them through the commit vote monitor - so now there will be a big commit lag to prevent
// the scheduled synchronizer from running
for block in blocks {
commit_vote_monitor.observe_block(&block);
}

// WHEN start the synchronizer and wait for a couple of seconds where normally the synchronizer should have kicked in.
let _handle = Synchronizer::start(
network_client.clone(),
context.clone(),
core_dispatcher.clone(),
commit_vote_monitor.clone(),
block_verifier.clone(),
dag_state.clone(),
false,
);

sleep(4 * FETCH_REQUEST_TIMEOUT).await;

// We should be in commit lag mode, but since there are missing blocks within the acceptable round thresholds those ones should be fetched. Nothing above.
let mut added_blocks = core_dispatcher.get_add_blocks().await;

added_blocks.sort_by_key(|block| block.reference());
expected_blocks.sort_by_key(|block| block.reference());

assert_eq!(added_blocks, expected_blocks);
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
// GIVEN
let (context, _) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let network_client = Arc::new(MockNetworkClient::default());
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));

// AND stub some missing blocks. The highest accepted round is 0. Create blocks that are above the threshold sync.
let mut expected_blocks = (SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2
..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 3)
.map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
.collect::<Vec<_>>();
let missing_blocks = expected_blocks
Expand All @@ -1470,20 +1585,18 @@ mod tests {
// AND stub the requests for authority 1 & 2
// Make the first authority timeout, so the second will be called. "We" are authority = 0, so
// we are skipped anyways.
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;
network_client
.stub_fetch_blocks(
expected_blocks.clone(),
AuthorityIndex::new_for_test(2),
None,
)
.await;
for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
network_client
.stub_fetch_blocks(
chunk.to_vec(),
AuthorityIndex::new_for_test(1),
Some(FETCH_REQUEST_TIMEOUT),
)
.await;
network_client
.stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
.await;
}

// Now create some blocks to simulate a commit lag
let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
Expand Down Expand Up @@ -1540,10 +1653,19 @@ mod tests {
);
}

// Now stub again the missing blocks to fetch the exact same ones.
core_dispatcher
.stub_missing_blocks(missing_blocks.clone())
.await;

sleep(2 * FETCH_REQUEST_TIMEOUT).await;

// THEN the missing blocks should now be fetched and added to core
let added_blocks = core_dispatcher.get_add_blocks().await;
let mut added_blocks = core_dispatcher.get_add_blocks().await;

added_blocks.sort_by_key(|block| block.reference());
expected_blocks.sort_by_key(|block| block.reference());

assert_eq!(added_blocks, expected_blocks);
}

Expand Down

0 comments on commit 35221c7

Please sign in to comment.