From 0a293017773ee27fd09b9c8b85017f0a259e0d3e Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:10:06 -0700 Subject: [PATCH] [Consensus] remove PeerState from CommitSyncer (#19696) ## Description - With less fetch parallelism, randomized target authorities should be acceptable for load distribution. And it is easier to reason about. - Add a metric for fetch failures per peer. ## Test plan CI PT --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/commit_syncer.rs | 155 +++++++++------------------- consensus/core/src/error.rs | 3 - consensus/core/src/metrics.rs | 2 +- 3 files changed, 50 insertions(+), 110 deletions(-) diff --git a/consensus/core/src/commit_syncer.rs b/consensus/core/src/commit_syncer.rs index c106d9445bec3..5110d95d002ae 100644 --- a/consensus/core/src/commit_syncer.rs +++ b/consensus/core/src/commit_syncer.rs @@ -35,12 +35,12 @@ use consensus_config::AuthorityIndex; use futures::{stream::FuturesOrdered, StreamExt as _}; use itertools::Itertools as _; use mysten_metrics::spawn_logged_monitored_task; -use parking_lot::{Mutex, RwLock}; -use rand::prelude::SliceRandom as _; +use parking_lot::RwLock; +use rand::{prelude::SliceRandom as _, rngs::ThreadRng}; use tokio::{ sync::oneshot, task::{JoinHandle, JoinSet}, - time::{sleep, Instant, MissedTickBehavior}, + time::{sleep, MissedTickBehavior}, }; use tracing::{debug, info, warn}; @@ -81,8 +81,6 @@ pub(crate) struct CommitSyncer { // Shared components wrapper. inner: Arc>, - // State of peers shared by fetch tasks, to determine the next peer to fetch against. - peer_state: Arc>, // States only used by the scheduler. @@ -113,7 +111,6 @@ impl CommitSyncer { block_verifier: Arc, dag_state: Arc>, ) -> Self { - let peer_state = Arc::new(Mutex::new(PeerState::new(&context))); let inner = Arc::new(Inner { context, core_thread_dispatcher, @@ -126,7 +123,6 @@ impl CommitSyncer { let synced_commit_index = inner.dag_state.read().last_commit_index(); CommitSyncer { inner, - peer_state, inflight_fetches: JoinSet::new(), pending_fetches: BTreeSet::new(), fetched_ranges: BTreeMap::new(), @@ -354,11 +350,8 @@ impl CommitSyncer { let Some(commit_range) = self.pending_fetches.pop_first() else { break; }; - self.inflight_fetches.spawn(Self::fetch_loop( - self.inner.clone(), - self.peer_state.clone(), - commit_range, - )); + self.inflight_fetches + .spawn(Self::fetch_loop(self.inner.clone(), commit_range)); } let metrics = &self.inner.context.metrics.node_metrics; @@ -378,7 +371,6 @@ impl CommitSyncer { // Returns the fetched commits and blocks referenced by the commits. async fn fetch_loop( inner: Arc>, - peer_state: Arc>, commit_range: CommitRange, ) -> (CommitIndex, Vec, Vec) { let _timer = inner @@ -389,21 +381,42 @@ impl CommitSyncer { .start_timer(); info!("Starting to fetch commits in {commit_range:?} ...",); loop { - match Self::fetch_once(inner.clone(), peer_state.clone(), commit_range.clone()).await { - Ok((commits, blocks)) => { - info!("Finished fetching commits in {commit_range:?}",); - return (commit_range.end(), commits, blocks); - } - Err(e) => { - warn!("Failed to fetch: {}", e); - let error: &'static str = e.into(); - inner - .context - .metrics - .node_metrics - .commit_sync_fetch_once_errors - .with_label_values(&[error]) - .inc(); + let mut all_authorities = inner + .context + .committee + .authorities() + .filter_map(|(i, _)| { + if i != inner.context.own_index { + Some(i) + } else { + None + } + }) + .collect_vec(); + all_authorities.shuffle(&mut ThreadRng::default()); + for authority in all_authorities { + match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await { + Ok((commits, blocks)) => { + info!("Finished fetching commits in {commit_range:?}",); + return (commit_range.end(), commits, blocks); + } + Err(e) => { + let hostname = inner + .context + .committee + .authority(authority) + .hostname + .clone(); + warn!("Failed to fetch from {hostname}: {}", e); + let error: &'static str = e.into(); + inner + .context + .metrics + .node_metrics + .commit_sync_fetch_once_errors + .with_label_values(&[&hostname, error]) + .inc(); + } } } } @@ -414,14 +427,11 @@ impl CommitSyncer { // and sent to Core for processing. async fn fetch_once( inner: Arc>, - peer_state: Arc>, + target_authority: AuthorityIndex, commit_range: CommitRange, ) -> ConsensusResult<(Vec, Vec)> { - const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(30); + const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15); const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120); - const FETCH_RETRY_BASE_INTERVAL: Duration = Duration::from_secs(1); - const FETCH_RETRY_INTERVAL_LIMIT: u32 = 30; - const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1); let _timer = inner .context @@ -430,50 +440,17 @@ impl CommitSyncer { .commit_sync_fetch_once_latency .start_timer(); - // 1. Find an available authority to fetch commits and blocks from, and wait - // if it is not yet ready. - let Some((available_time, retries, target_authority)) = - peer_state.lock().available_authorities.pop_first() - else { - sleep(MAX_RETRY_INTERVAL).await; - return Err(ConsensusError::NoAvailableAuthorityToFetchCommits); - }; - let now = Instant::now(); - if now < available_time { - sleep(available_time - now).await; - } - - // 2. Fetch commits in the commit range from the selected authority. - let (serialized_commits, serialized_blocks) = match inner + // 1. Fetch commits in the commit range from the target authority. + let (serialized_commits, serialized_blocks) = inner .network_client .fetch_commits( target_authority, commit_range.clone(), FETCH_COMMITS_TIMEOUT, ) - .await - { - Ok(result) => { - let mut peer_state = peer_state.lock(); - let now = Instant::now(); - peer_state - .available_authorities - .insert((now, 0, target_authority)); - result - } - Err(e) => { - let mut peer_state = peer_state.lock(); - let now = Instant::now(); - peer_state.available_authorities.insert(( - now + FETCH_RETRY_BASE_INTERVAL * retries.min(FETCH_RETRY_INTERVAL_LIMIT), - retries.saturating_add(1), - target_authority, - )); - return Err(e); - } - }; + .await?; - // 3. Verify the response contains blocks that can certify the last returned commit, + // 2. Verify the response contains blocks that can certify the last returned commit, // and the returned commits are chained by digest, so earlier commits are certified // as well. let commits = inner.verify_commits( @@ -483,7 +460,7 @@ impl CommitSyncer { serialized_blocks, )?; - // 4. Fetch blocks referenced by the commits, from the same authority. + // 3. Fetch blocks referenced by the commits, from the same authority. let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect(); let mut requests: FuturesOrdered<_> = block_refs .chunks(inner.context.parameters.max_blocks_per_fetch) @@ -492,7 +469,7 @@ impl CommitSyncer { let i = i as u32; let inner = inner.clone(); async move { - // Pipeline the requests to avoid overloading the target. + // 4. Send out pipelined fetch requests to avoid overloading the target authority. sleep(Duration::from_millis(200) * i).await; // TODO: add some retries. let serialized_blocks = inner @@ -703,40 +680,6 @@ impl Inner { } } -struct PeerState { - // The value is a tuple of - // - the next available time for the authority to fetch from, - // - count of current consecutive failures fetching from the authority, reset on success, - // - authority index. - // TODO: move this to a separate module, add load balancing, add throttling, and consider - // health of peer via previous request failures and leader scores. - available_authorities: BTreeSet<(Instant, u32, AuthorityIndex)>, -} - -impl PeerState { - fn new(context: &Context) -> Self { - // Randomize the initial order of authorities. - let mut shuffled_authority_indices: Vec<_> = context - .committee - .authorities() - .filter_map(|(index, _)| { - if index != context.own_index { - Some(index) - } else { - None - } - }) - .collect(); - shuffled_authority_indices.shuffle(&mut rand::thread_rng()); - Self { - available_authorities: shuffled_authority_indices - .into_iter() - .map(|i| (Instant::now(), 0, i)) - .collect(), - } - } -} - #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; diff --git a/consensus/core/src/error.rs b/consensus/core/src/error.rs index 32336d363e2b2..d78915b0c3097 100644 --- a/consensus/core/src/error.rs +++ b/consensus/core/src/error.rs @@ -126,9 +126,6 @@ pub(crate) enum ConsensusError { block_timestamp_ms: u64, }, - #[error("No available authority to fetch commits")] - NoAvailableAuthorityToFetchCommits, - #[error("Received no commit from peer {peer}")] NoCommitReceived { peer: AuthorityIndex }, diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 9588658fbc8ea..39d0d26d1b601 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -602,7 +602,7 @@ impl NodeMetrics { commit_sync_fetch_once_errors: register_int_counter_vec_with_registry!( "commit_sync_fetch_once_errors", "Number of errors when attempting to fetch commits and blocks from single authority during commit sync.", - &["error"], + &["authority", "error"], registry ).unwrap(), round_prober_quorum_round_gaps: register_int_gauge_vec_with_registry!(