diff --git a/consensus/core/src/commit_syncer.rs b/consensus/core/src/commit_syncer.rs index c106d9445bec3..5110d95d002ae 100644 --- a/consensus/core/src/commit_syncer.rs +++ b/consensus/core/src/commit_syncer.rs @@ -35,12 +35,12 @@ use consensus_config::AuthorityIndex; use futures::{stream::FuturesOrdered, StreamExt as _}; use itertools::Itertools as _; use mysten_metrics::spawn_logged_monitored_task; -use parking_lot::{Mutex, RwLock}; -use rand::prelude::SliceRandom as _; +use parking_lot::RwLock; +use rand::{prelude::SliceRandom as _, rngs::ThreadRng}; use tokio::{ sync::oneshot, task::{JoinHandle, JoinSet}, - time::{sleep, Instant, MissedTickBehavior}, + time::{sleep, MissedTickBehavior}, }; use tracing::{debug, info, warn}; @@ -81,8 +81,6 @@ pub(crate) struct CommitSyncer { // Shared components wrapper. inner: Arc>, - // State of peers shared by fetch tasks, to determine the next peer to fetch against. - peer_state: Arc>, // States only used by the scheduler. @@ -113,7 +111,6 @@ impl CommitSyncer { block_verifier: Arc, dag_state: Arc>, ) -> Self { - let peer_state = Arc::new(Mutex::new(PeerState::new(&context))); let inner = Arc::new(Inner { context, core_thread_dispatcher, @@ -126,7 +123,6 @@ impl CommitSyncer { let synced_commit_index = inner.dag_state.read().last_commit_index(); CommitSyncer { inner, - peer_state, inflight_fetches: JoinSet::new(), pending_fetches: BTreeSet::new(), fetched_ranges: BTreeMap::new(), @@ -354,11 +350,8 @@ impl CommitSyncer { let Some(commit_range) = self.pending_fetches.pop_first() else { break; }; - self.inflight_fetches.spawn(Self::fetch_loop( - self.inner.clone(), - self.peer_state.clone(), - commit_range, - )); + self.inflight_fetches + .spawn(Self::fetch_loop(self.inner.clone(), commit_range)); } let metrics = &self.inner.context.metrics.node_metrics; @@ -378,7 +371,6 @@ impl CommitSyncer { // Returns the fetched commits and blocks referenced by the commits. async fn fetch_loop( inner: Arc>, - peer_state: Arc>, commit_range: CommitRange, ) -> (CommitIndex, Vec, Vec) { let _timer = inner @@ -389,21 +381,42 @@ impl CommitSyncer { .start_timer(); info!("Starting to fetch commits in {commit_range:?} ...",); loop { - match Self::fetch_once(inner.clone(), peer_state.clone(), commit_range.clone()).await { - Ok((commits, blocks)) => { - info!("Finished fetching commits in {commit_range:?}",); - return (commit_range.end(), commits, blocks); - } - Err(e) => { - warn!("Failed to fetch: {}", e); - let error: &'static str = e.into(); - inner - .context - .metrics - .node_metrics - .commit_sync_fetch_once_errors - .with_label_values(&[error]) - .inc(); + let mut all_authorities = inner + .context + .committee + .authorities() + .filter_map(|(i, _)| { + if i != inner.context.own_index { + Some(i) + } else { + None + } + }) + .collect_vec(); + all_authorities.shuffle(&mut ThreadRng::default()); + for authority in all_authorities { + match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await { + Ok((commits, blocks)) => { + info!("Finished fetching commits in {commit_range:?}",); + return (commit_range.end(), commits, blocks); + } + Err(e) => { + let hostname = inner + .context + .committee + .authority(authority) + .hostname + .clone(); + warn!("Failed to fetch from {hostname}: {}", e); + let error: &'static str = e.into(); + inner + .context + .metrics + .node_metrics + .commit_sync_fetch_once_errors + .with_label_values(&[&hostname, error]) + .inc(); + } } } } @@ -414,14 +427,11 @@ impl CommitSyncer { // and sent to Core for processing. async fn fetch_once( inner: Arc>, - peer_state: Arc>, + target_authority: AuthorityIndex, commit_range: CommitRange, ) -> ConsensusResult<(Vec, Vec)> { - const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(30); + const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15); const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120); - const FETCH_RETRY_BASE_INTERVAL: Duration = Duration::from_secs(1); - const FETCH_RETRY_INTERVAL_LIMIT: u32 = 30; - const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1); let _timer = inner .context @@ -430,50 +440,17 @@ impl CommitSyncer { .commit_sync_fetch_once_latency .start_timer(); - // 1. Find an available authority to fetch commits and blocks from, and wait - // if it is not yet ready. - let Some((available_time, retries, target_authority)) = - peer_state.lock().available_authorities.pop_first() - else { - sleep(MAX_RETRY_INTERVAL).await; - return Err(ConsensusError::NoAvailableAuthorityToFetchCommits); - }; - let now = Instant::now(); - if now < available_time { - sleep(available_time - now).await; - } - - // 2. Fetch commits in the commit range from the selected authority. - let (serialized_commits, serialized_blocks) = match inner + // 1. Fetch commits in the commit range from the target authority. + let (serialized_commits, serialized_blocks) = inner .network_client .fetch_commits( target_authority, commit_range.clone(), FETCH_COMMITS_TIMEOUT, ) - .await - { - Ok(result) => { - let mut peer_state = peer_state.lock(); - let now = Instant::now(); - peer_state - .available_authorities - .insert((now, 0, target_authority)); - result - } - Err(e) => { - let mut peer_state = peer_state.lock(); - let now = Instant::now(); - peer_state.available_authorities.insert(( - now + FETCH_RETRY_BASE_INTERVAL * retries.min(FETCH_RETRY_INTERVAL_LIMIT), - retries.saturating_add(1), - target_authority, - )); - return Err(e); - } - }; + .await?; - // 3. Verify the response contains blocks that can certify the last returned commit, + // 2. Verify the response contains blocks that can certify the last returned commit, // and the returned commits are chained by digest, so earlier commits are certified // as well. let commits = inner.verify_commits( @@ -483,7 +460,7 @@ impl CommitSyncer { serialized_blocks, )?; - // 4. Fetch blocks referenced by the commits, from the same authority. + // 3. Fetch blocks referenced by the commits, from the same authority. let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect(); let mut requests: FuturesOrdered<_> = block_refs .chunks(inner.context.parameters.max_blocks_per_fetch) @@ -492,7 +469,7 @@ impl CommitSyncer { let i = i as u32; let inner = inner.clone(); async move { - // Pipeline the requests to avoid overloading the target. + // 4. Send out pipelined fetch requests to avoid overloading the target authority. sleep(Duration::from_millis(200) * i).await; // TODO: add some retries. let serialized_blocks = inner @@ -703,40 +680,6 @@ impl Inner { } } -struct PeerState { - // The value is a tuple of - // - the next available time for the authority to fetch from, - // - count of current consecutive failures fetching from the authority, reset on success, - // - authority index. - // TODO: move this to a separate module, add load balancing, add throttling, and consider - // health of peer via previous request failures and leader scores. - available_authorities: BTreeSet<(Instant, u32, AuthorityIndex)>, -} - -impl PeerState { - fn new(context: &Context) -> Self { - // Randomize the initial order of authorities. - let mut shuffled_authority_indices: Vec<_> = context - .committee - .authorities() - .filter_map(|(index, _)| { - if index != context.own_index { - Some(index) - } else { - None - } - }) - .collect(); - shuffled_authority_indices.shuffle(&mut rand::thread_rng()); - Self { - available_authorities: shuffled_authority_indices - .into_iter() - .map(|i| (Instant::now(), 0, i)) - .collect(), - } - } -} - #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; diff --git a/consensus/core/src/error.rs b/consensus/core/src/error.rs index 32336d363e2b2..d78915b0c3097 100644 --- a/consensus/core/src/error.rs +++ b/consensus/core/src/error.rs @@ -126,9 +126,6 @@ pub(crate) enum ConsensusError { block_timestamp_ms: u64, }, - #[error("No available authority to fetch commits")] - NoAvailableAuthorityToFetchCommits, - #[error("Received no commit from peer {peer}")] NoCommitReceived { peer: AuthorityIndex }, diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 9588658fbc8ea..39d0d26d1b601 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -602,7 +602,7 @@ impl NodeMetrics { commit_sync_fetch_once_errors: register_int_counter_vec_with_registry!( "commit_sync_fetch_once_errors", "Number of errors when attempting to fetch commits and blocks from single authority during commit sync.", - &["error"], + &["authority", "error"], registry ).unwrap(), round_prober_quorum_round_gaps: register_int_gauge_vec_with_registry!(