Skip to content

Commit

Permalink
[Consensus] remove PeerState from CommitSyncer (#19696)
Browse files Browse the repository at this point in the history
## Description 

- With less fetch parallelism, randomized target authorities should be
acceptable for load distribution. And it is easier to reason about.
- Add a metric for fetch failures per peer.

## Test plan 

CI
PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mwtian authored Oct 4, 2024
1 parent 59f115b commit 0a29301
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 110 deletions.
155 changes: 49 additions & 106 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use consensus_config::AuthorityIndex;
use futures::{stream::FuturesOrdered, StreamExt as _};
use itertools::Itertools as _;
use mysten_metrics::spawn_logged_monitored_task;
use parking_lot::{Mutex, RwLock};
use rand::prelude::SliceRandom as _;
use parking_lot::RwLock;
use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
use tokio::{
sync::oneshot,
task::{JoinHandle, JoinSet},
time::{sleep, Instant, MissedTickBehavior},
time::{sleep, MissedTickBehavior},
};
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -81,8 +81,6 @@ pub(crate) struct CommitSyncer<C: NetworkClient> {

// Shared components wrapper.
inner: Arc<Inner<C>>,
// State of peers shared by fetch tasks, to determine the next peer to fetch against.
peer_state: Arc<Mutex<PeerState>>,

// States only used by the scheduler.

Expand Down Expand Up @@ -113,7 +111,6 @@ impl<C: NetworkClient> CommitSyncer<C> {
block_verifier: Arc<dyn BlockVerifier>,
dag_state: Arc<RwLock<DagState>>,
) -> Self {
let peer_state = Arc::new(Mutex::new(PeerState::new(&context)));
let inner = Arc::new(Inner {
context,
core_thread_dispatcher,
Expand All @@ -126,7 +123,6 @@ impl<C: NetworkClient> CommitSyncer<C> {
let synced_commit_index = inner.dag_state.read().last_commit_index();
CommitSyncer {
inner,
peer_state,
inflight_fetches: JoinSet::new(),
pending_fetches: BTreeSet::new(),
fetched_ranges: BTreeMap::new(),
Expand Down Expand Up @@ -354,11 +350,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
let Some(commit_range) = self.pending_fetches.pop_first() else {
break;
};
self.inflight_fetches.spawn(Self::fetch_loop(
self.inner.clone(),
self.peer_state.clone(),
commit_range,
));
self.inflight_fetches
.spawn(Self::fetch_loop(self.inner.clone(), commit_range));
}

let metrics = &self.inner.context.metrics.node_metrics;
Expand All @@ -378,7 +371,6 @@ impl<C: NetworkClient> CommitSyncer<C> {
// Returns the fetched commits and blocks referenced by the commits.
async fn fetch_loop(
inner: Arc<Inner<C>>,
peer_state: Arc<Mutex<PeerState>>,
commit_range: CommitRange,
) -> (CommitIndex, Vec<TrustedCommit>, Vec<VerifiedBlock>) {
let _timer = inner
Expand All @@ -389,21 +381,42 @@ impl<C: NetworkClient> CommitSyncer<C> {
.start_timer();
info!("Starting to fetch commits in {commit_range:?} ...",);
loop {
match Self::fetch_once(inner.clone(), peer_state.clone(), commit_range.clone()).await {
Ok((commits, blocks)) => {
info!("Finished fetching commits in {commit_range:?}",);
return (commit_range.end(), commits, blocks);
}
Err(e) => {
warn!("Failed to fetch: {}", e);
let error: &'static str = e.into();
inner
.context
.metrics
.node_metrics
.commit_sync_fetch_once_errors
.with_label_values(&[error])
.inc();
let mut all_authorities = inner
.context
.committee
.authorities()
.filter_map(|(i, _)| {
if i != inner.context.own_index {
Some(i)
} else {
None
}
})
.collect_vec();
all_authorities.shuffle(&mut ThreadRng::default());
for authority in all_authorities {
match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await {
Ok((commits, blocks)) => {
info!("Finished fetching commits in {commit_range:?}",);
return (commit_range.end(), commits, blocks);
}
Err(e) => {
let hostname = inner
.context
.committee
.authority(authority)
.hostname
.clone();
warn!("Failed to fetch from {hostname}: {}", e);
let error: &'static str = e.into();
inner
.context
.metrics
.node_metrics
.commit_sync_fetch_once_errors
.with_label_values(&[&hostname, error])
.inc();
}
}
}
}
Expand All @@ -414,14 +427,11 @@ impl<C: NetworkClient> CommitSyncer<C> {
// and sent to Core for processing.
async fn fetch_once(
inner: Arc<Inner<C>>,
peer_state: Arc<Mutex<PeerState>>,
target_authority: AuthorityIndex,
commit_range: CommitRange,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(30);
const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15);
const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120);
const FETCH_RETRY_BASE_INTERVAL: Duration = Duration::from_secs(1);
const FETCH_RETRY_INTERVAL_LIMIT: u32 = 30;
const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);

let _timer = inner
.context
Expand All @@ -430,50 +440,17 @@ impl<C: NetworkClient> CommitSyncer<C> {
.commit_sync_fetch_once_latency
.start_timer();

// 1. Find an available authority to fetch commits and blocks from, and wait
// if it is not yet ready.
let Some((available_time, retries, target_authority)) =
peer_state.lock().available_authorities.pop_first()
else {
sleep(MAX_RETRY_INTERVAL).await;
return Err(ConsensusError::NoAvailableAuthorityToFetchCommits);
};
let now = Instant::now();
if now < available_time {
sleep(available_time - now).await;
}

// 2. Fetch commits in the commit range from the selected authority.
let (serialized_commits, serialized_blocks) = match inner
// 1. Fetch commits in the commit range from the target authority.
let (serialized_commits, serialized_blocks) = inner
.network_client
.fetch_commits(
target_authority,
commit_range.clone(),
FETCH_COMMITS_TIMEOUT,
)
.await
{
Ok(result) => {
let mut peer_state = peer_state.lock();
let now = Instant::now();
peer_state
.available_authorities
.insert((now, 0, target_authority));
result
}
Err(e) => {
let mut peer_state = peer_state.lock();
let now = Instant::now();
peer_state.available_authorities.insert((
now + FETCH_RETRY_BASE_INTERVAL * retries.min(FETCH_RETRY_INTERVAL_LIMIT),
retries.saturating_add(1),
target_authority,
));
return Err(e);
}
};
.await?;

// 3. Verify the response contains blocks that can certify the last returned commit,
// 2. Verify the response contains blocks that can certify the last returned commit,
// and the returned commits are chained by digest, so earlier commits are certified
// as well.
let commits = inner.verify_commits(
Expand All @@ -483,7 +460,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
serialized_blocks,
)?;

// 4. Fetch blocks referenced by the commits, from the same authority.
// 3. Fetch blocks referenced by the commits, from the same authority.
let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
let mut requests: FuturesOrdered<_> = block_refs
.chunks(inner.context.parameters.max_blocks_per_fetch)
Expand All @@ -492,7 +469,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
let i = i as u32;
let inner = inner.clone();
async move {
// Pipeline the requests to avoid overloading the target.
// 4. Send out pipelined fetch requests to avoid overloading the target authority.
sleep(Duration::from_millis(200) * i).await;
// TODO: add some retries.
let serialized_blocks = inner
Expand Down Expand Up @@ -703,40 +680,6 @@ impl<C: NetworkClient> Inner<C> {
}
}

struct PeerState {
// The value is a tuple of
// - the next available time for the authority to fetch from,
// - count of current consecutive failures fetching from the authority, reset on success,
// - authority index.
// TODO: move this to a separate module, add load balancing, add throttling, and consider
// health of peer via previous request failures and leader scores.
available_authorities: BTreeSet<(Instant, u32, AuthorityIndex)>,
}

impl PeerState {
fn new(context: &Context) -> Self {
// Randomize the initial order of authorities.
let mut shuffled_authority_indices: Vec<_> = context
.committee
.authorities()
.filter_map(|(index, _)| {
if index != context.own_index {
Some(index)
} else {
None
}
})
.collect();
shuffled_authority_indices.shuffle(&mut rand::thread_rng());
Self {
available_authorities: shuffled_authority_indices
.into_iter()
.map(|i| (Instant::now(), 0, i))
.collect(),
}
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
Expand Down
3 changes: 0 additions & 3 deletions consensus/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ pub(crate) enum ConsensusError {
block_timestamp_ms: u64,
},

#[error("No available authority to fetch commits")]
NoAvailableAuthorityToFetchCommits,

#[error("Received no commit from peer {peer}")]
NoCommitReceived { peer: AuthorityIndex },

Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ impl NodeMetrics {
commit_sync_fetch_once_errors: register_int_counter_vec_with_registry!(
"commit_sync_fetch_once_errors",
"Number of errors when attempting to fetch commits and blocks from single authority during commit sync.",
&["error"],
&["authority", "error"],
registry
).unwrap(),
round_prober_quorum_round_gaps: register_int_gauge_vec_with_registry!(
Expand Down

0 comments on commit 0a29301

Please sign in to comment.