From b2781bff67c5842aed4d38ba3f5118f5f6565f7a Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 10 Oct 2024 21:36:13 -0700 Subject: [PATCH] [consensus] fallback heuristics for optimistic quorum store (#14346) * [consensus] Fallback heuristics for optimistic quorum store * [optqs] set minimum batch age for optimistic batch proposals * new unit tests and existing test fixes * allow handling OptQS payload by default --- consensus/consensus-types/src/common.rs | 4 +- consensus/consensus-types/src/lib.rs | 1 + .../src/payload_pull_params.rs | 91 ++++++++ .../consensus-types/src/request_response.rs | 5 +- .../consensus-types/src/round_timeout.rs | 9 +- consensus/src/block_storage/block_store.rs | 16 +- consensus/src/block_storage/block_tree.rs | 34 ++- consensus/src/dag/dag_driver.rs | 5 +- consensus/src/dag/tests/helpers.rs | 3 +- consensus/src/epoch_manager.rs | 12 + consensus/src/lib.rs | 2 + consensus/src/liveness/mod.rs | 1 + consensus/src/liveness/proposal_generator.rs | 13 +- .../src/liveness/proposal_generator_test.rs | 14 ++ .../src/liveness/proposal_status_tracker.rs | 210 ++++++++++++++++++ consensus/src/liveness/round_state.rs | 29 ++- consensus/src/liveness/round_state_test.rs | 21 +- consensus/src/payload_client/mixed.rs | 11 +- consensus/src/payload_client/mod.rs | 75 +------ consensus/src/payload_client/user/mod.rs | 3 +- .../user/quorum_store_client.rs | 13 +- consensus/src/payload_manager.rs | 39 ++-- consensus/src/pending_votes.rs | 160 +++++++++++-- consensus/src/pending_votes_test.rs | 161 ++++++++++++++ .../src/quorum_store/batch_proof_queue.rs | 40 +++- consensus/src/quorum_store/proof_manager.rs | 58 ++--- .../src/quorum_store/quorum_store_builder.rs | 3 +- .../tests/batch_proof_queue_test.rs | 12 +- .../tests/direct_mempool_quorum_store_test.rs | 2 +- .../quorum_store/tests/proof_manager_test.rs | 4 +- consensus/src/round_manager.rs | 84 ++++--- consensus/src/round_manager_fuzzing.rs | 6 +- consensus/src/round_manager_test.rs | 15 +- .../src/test_utils/mock_payload_manager.rs | 7 +- consensus/src/test_utils/mod.rs | 19 ++ .../src/bounded_vec_deque.rs | 8 + .../tests/staged/consensus.yaml | 7 + types/src/validator_verifier.rs | 21 +- 38 files changed, 980 insertions(+), 238 deletions(-) create mode 100644 consensus/consensus-types/src/payload_pull_params.rs create mode 100644 consensus/src/liveness/proposal_status_tracker.rs create mode 100644 consensus/src/pending_votes_test.rs diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 7dbc1888b7203..db20a4fd9f3fd 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -6,7 +6,6 @@ use crate::{ payload::{OptQuorumStorePayload, PayloadExecutionLimit}, proof_of_store::{BatchInfo, ProofCache, ProofOfStore}, }; -use anyhow::bail; use aptos_crypto::{ hash::{CryptoHash, CryptoHasher}, HashValue, @@ -520,8 +519,7 @@ impl Payload { (true, Payload::OptQuorumStore(opt_quorum_store)) => { let proof_with_data = opt_quorum_store.proof_with_data(); Self::verify_with_cache(&proof_with_data.batch_summary, validator, proof_cache)?; - // TODO(ibalajiarun): Remove this log when OptQS is enabled. - bail!("OptQuorumStore Payload is not expected yet"); + Ok(()) }, (_, _) => Err(anyhow::anyhow!( "Wrong payload type. Expected Payload::InQuorumStore {} got {} ", diff --git a/consensus/consensus-types/src/lib.rs b/consensus/consensus-types/src/lib.rs index 5d351d279bf8e..27ca8b6f92874 100644 --- a/consensus/consensus-types/src/lib.rs +++ b/consensus/consensus-types/src/lib.rs @@ -13,6 +13,7 @@ pub mod order_vote; pub mod order_vote_msg; pub mod order_vote_proposal; pub mod payload; +pub mod payload_pull_params; pub mod pipeline; pub mod pipeline_execution_result; pub mod pipelined_block; diff --git a/consensus/consensus-types/src/payload_pull_params.rs b/consensus/consensus-types/src/payload_pull_params.rs new file mode 100644 index 0000000000000..682f9b2185194 --- /dev/null +++ b/consensus/consensus-types/src/payload_pull_params.rs @@ -0,0 +1,91 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + common::{Author, PayloadFilter}, + utils::PayloadTxnsSize, +}; +use std::{collections::HashSet, time::Duration}; + +#[derive(Clone)] +pub struct OptQSPayloadPullParams { + pub exclude_authors: HashSet, + pub minimum_batch_age_usecs: u64, +} + +pub struct PayloadPullParameters { + pub max_poll_time: Duration, + pub max_txns: PayloadTxnsSize, + pub max_txns_after_filtering: u64, + pub soft_max_txns_after_filtering: u64, + pub max_inline_txns: PayloadTxnsSize, + pub user_txn_filter: PayloadFilter, + pub pending_ordering: bool, + pub pending_uncommitted_blocks: usize, + pub recent_max_fill_fraction: f32, + pub block_timestamp: Duration, + pub maybe_optqs_payload_pull_params: Option, +} + +impl std::fmt::Debug for OptQSPayloadPullParams { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OptQSPayloadPullParams") + .field("exclude_authors", &self.exclude_authors) + .field("minimum_batch_age_useds", &self.minimum_batch_age_usecs) + .finish() + } +} + +impl PayloadPullParameters { + pub fn new_for_test( + max_poll_time: Duration, + max_txns: u64, + max_txns_bytes: u64, + max_txns_after_filtering: u64, + soft_max_txns_after_filtering: u64, + max_inline_txns: u64, + max_inline_txns_bytes: u64, + user_txn_filter: PayloadFilter, + pending_ordering: bool, + pending_uncommitted_blocks: usize, + recent_max_fill_fraction: f32, + block_timestamp: Duration, + ) -> Self { + Self { + max_poll_time, + max_txns: PayloadTxnsSize::new(max_txns, max_txns_bytes), + max_txns_after_filtering, + soft_max_txns_after_filtering, + max_inline_txns: PayloadTxnsSize::new(max_inline_txns, max_inline_txns_bytes), + user_txn_filter, + pending_ordering, + pending_uncommitted_blocks, + recent_max_fill_fraction, + block_timestamp, + maybe_optqs_payload_pull_params: None, + } + } +} + +impl std::fmt::Debug for PayloadPullParameters { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PayloadPullParameters") + .field("max_poll_time", &self.max_poll_time) + .field("max_items", &self.max_txns) + .field("max_unique_items", &self.max_txns_after_filtering) + .field( + "soft_max_txns_after_filtering", + &self.soft_max_txns_after_filtering, + ) + .field("max_inline_items", &self.max_inline_txns) + .field("pending_ordering", &self.pending_ordering) + .field( + "pending_uncommitted_blocks", + &self.pending_uncommitted_blocks, + ) + .field("recent_max_fill_fraction", &self.recent_max_fill_fraction) + .field("block_timestamp", &self.block_timestamp) + .field("optqs_params", &self.maybe_optqs_payload_pull_params) + .finish() + } +} diff --git a/consensus/consensus-types/src/request_response.rs b/consensus/consensus-types/src/request_response.rs index c650141e7878a..f10e35285e532 100644 --- a/consensus/consensus-types/src/request_response.rs +++ b/consensus/consensus-types/src/request_response.rs @@ -3,6 +3,7 @@ use crate::{ common::{Payload, PayloadFilter}, + payload_pull_params::OptQSPayloadPullParams, utils::PayloadTxnsSize, }; use anyhow::Result; @@ -16,8 +17,8 @@ pub struct GetPayloadRequest { pub max_txns_after_filtering: u64, // soft max number of transactions after filtering in the block (i.e. include one that crosses it) pub soft_max_txns_after_filtering: u64, - // target txns with opt batches in max_txns as pct - pub opt_batch_txns_pct: u8, + // opt payload pull params + pub maybe_optqs_payload_pull_params: Option, // max number of inline transactions (transactions without a proof of store) pub max_inline_txns: PayloadTxnsSize, // return non full diff --git a/consensus/consensus-types/src/round_timeout.rs b/consensus/consensus-types/src/round_timeout.rs index c4596fc2a9d5b..e16d718f7dd38 100644 --- a/consensus/consensus-types/src/round_timeout.rs +++ b/consensus/consensus-types/src/round_timeout.rs @@ -7,15 +7,18 @@ use crate::{ timeout_2chain::TwoChainTimeout, }; use anyhow::{ensure, Context}; +use aptos_bitvec::BitVec; use aptos_crypto::bls12381; use aptos_short_hex_str::AsShortHexStr; use aptos_types::validator_verifier::ValidatorVerifier; use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)] +#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Hash, Debug)] pub enum RoundTimeoutReason { Unknown, ProposalNotReceived, + PayloadUnavailable { missing_authors: BitVec }, + NoQC, } impl std::fmt::Display for RoundTimeoutReason { @@ -23,6 +26,10 @@ impl std::fmt::Display for RoundTimeoutReason { match self { RoundTimeoutReason::Unknown => write!(f, "Unknown"), RoundTimeoutReason::ProposalNotReceived => write!(f, "ProposalNotReceived"), + RoundTimeoutReason::PayloadUnavailable { .. } => { + write!(f, "PayloadUnavailable",) + }, + RoundTimeoutReason::NoQC => write!(f, "NoQC"), } } } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 8670e161602da..f56dfa80c23eb 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -18,6 +18,7 @@ use crate::{ util::time_service::TimeService, }; use anyhow::{bail, ensure, format_err, Context}; +use aptos_bitvec::BitVec; use aptos_consensus_types::{ block::Block, common::Round, @@ -472,18 +473,19 @@ impl BlockStore { self.pending_blocks.clone() } - pub async fn wait_for_payload(&self, block: &Block) -> anyhow::Result<()> { - tokio::time::timeout( - Duration::from_secs(1), - self.payload_manager.get_transactions(block), - ) - .await??; + pub async fn wait_for_payload(&self, block: &Block, deadline: Duration) -> anyhow::Result<()> { + let duration = deadline.saturating_sub(self.time_service.get_current_timestamp()); + tokio::time::timeout(duration, self.payload_manager.get_transactions(block)).await??; Ok(()) } - pub fn check_payload(&self, proposal: &Block) -> bool { + pub fn check_payload(&self, proposal: &Block) -> Result<(), BitVec> { self.payload_manager.check_payload_availability(proposal) } + + pub fn get_block_for_round(&self, round: Round) -> Option> { + self.inner.read().get_block_for_round(round) + } } impl BlockReader for BlockStore { diff --git a/consensus/src/block_storage/block_tree.rs b/consensus/src/block_storage/block_tree.rs index 0edb607579c72..5d1df54149cbf 100644 --- a/consensus/src/block_storage/block_tree.rs +++ b/consensus/src/block_storage/block_tree.rs @@ -15,10 +15,13 @@ use aptos_consensus_types::{ }; use aptos_crypto::HashValue; use aptos_logger::prelude::*; -use aptos_types::{block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures}; +use aptos_types::{ + block_info::{BlockInfo, Round}, + ledger_info::LedgerInfoWithSignatures, +}; use mirai_annotations::{checked_verify_eq, precondition}; use std::{ - collections::{vec_deque::VecDeque, HashMap, HashSet}, + collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -89,6 +92,9 @@ pub struct BlockTree { pruned_block_ids: VecDeque, /// Num pruned blocks to keep in memory. max_pruned_blocks_in_mem: usize, + + /// Round to Block index. We expect only one block per round. + round_to_ids: BTreeMap, } impl BlockTree { @@ -108,6 +114,8 @@ impl BlockTree { let root_id = root.id(); let mut id_to_block = HashMap::new(); + let mut round_to_ids = BTreeMap::new(); + round_to_ids.insert(root.round(), root_id); id_to_block.insert(root_id, LinkableBlock::new(root)); counters::NUM_BLOCKS_IN_TREE.set(1); @@ -132,6 +140,7 @@ impl BlockTree { pruned_block_ids, max_pruned_blocks_in_mem, highest_2chain_timeout_cert, + round_to_ids, } } @@ -165,7 +174,10 @@ impl BlockTree { fn remove_block(&mut self, block_id: HashValue) { // Remove the block from the store - self.id_to_block.remove(&block_id); + if let Some(block) = self.id_to_block.remove(&block_id) { + let round = block.executed_block().round(); + self.round_to_ids.remove(&round); + }; self.id_to_quorum_cert.remove(&block_id); } @@ -178,6 +190,12 @@ impl BlockTree { .map(|lb| lb.executed_block().clone()) } + pub(super) fn get_block_for_round(&self, round: Round) -> Option> { + self.round_to_ids + .get(&round) + .and_then(|block_id| self.get_block(block_id)) + } + pub(super) fn ordered_root(&self) -> Arc { self.get_block(&self.ordered_root_id) .expect("Root must exist") @@ -241,6 +259,16 @@ impl BlockTree { let linkable_block = LinkableBlock::new(block); let arc_block = Arc::clone(linkable_block.executed_block()); assert!(self.id_to_block.insert(block_id, linkable_block).is_none()); + // Note: the assumption is that we have/enforce unequivocal proposer election. + if let Some(old_block_id) = self.round_to_ids.get(&arc_block.round()) { + warn!( + "Multiple blocks received for round {}. Previous block id: {}", + arc_block.round(), + old_block_id + ); + } else { + self.round_to_ids.insert(arc_block.round(), block_id); + } counters::NUM_BLOCKS_IN_TREE.inc(); Ok(arc_block) } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 2395ba30ef264..fa0caee1faa8a 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -21,13 +21,14 @@ use crate::{ }, DAGRpcResult, RpcHandler, }, - payload_client::{PayloadClient, PayloadPullParameters}, + payload_client::PayloadClient, }; use anyhow::{bail, ensure}; use aptos_collections::BoundedVecDeque; use aptos_config::config::DagPayloadConfig; use aptos_consensus_types::{ common::{Author, Payload, PayloadFilter}, + payload_pull_params::PayloadPullParameters, utils::PayloadTxnsSize, }; use aptos_crypto::hash::CryptoHash; @@ -266,7 +267,7 @@ impl DagDriver { max_txns_after_filtering: max_txns, soft_max_txns_after_filtering: max_txns, max_inline_txns: PayloadTxnsSize::new(100, 100 * 1024), - opt_batch_txns_pct: 0, + maybe_optqs_payload_pull_params: None, user_txn_filter: payload_filter, pending_ordering: false, pending_uncommitted_blocks: 0, diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index ff19b6876e2db..dab407099c303 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -8,6 +8,7 @@ use crate::{ }, payload_manager::TPayloadManager, }; +use aptos_bitvec::BitVec; use aptos_consensus_types::{ block::Block, common::{Author, Payload, Round}, @@ -26,7 +27,7 @@ impl TPayloadManager for MockPayloadManager { fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) {} - fn check_payload_availability(&self, _block: &Block) -> bool { + fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { unimplemented!() } diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index c0e1ba3ac7ecb..9dea39ef66045 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -21,6 +21,7 @@ use crate::{ proposal_generator::{ ChainHealthBackoffConfig, PipelineBackpressureConfig, ProposalGenerator, }, + proposal_status_tracker::{ExponentialWindowFailureTracker, OptQSPullParamsProvider}, proposer_election::ProposerElection, rotating_proposer_election::{choose_leader, RotatingProposer}, round_proposer_election::RoundProposer, @@ -826,6 +827,15 @@ impl EpochManager

{ self.pending_blocks.clone(), )); + let failures_tracker = Arc::new(Mutex::new(ExponentialWindowFailureTracker::new( + 100, + epoch_state.verifier.get_ordered_account_addresses(), + ))); + let opt_qs_payload_param_provider = Arc::new(OptQSPullParamsProvider::new( + self.config.quorum_store.enable_opt_quorum_store, + failures_tracker.clone(), + )); + info!(epoch = epoch, "Create ProposalGenerator"); // txn manager is required both by proposal generator (to pull the proposers) // and by event processor (to update their status). @@ -854,6 +864,7 @@ impl EpochManager

{ self.config .quorum_store .allow_batches_without_pos_in_proposal, + opt_qs_payload_param_provider, ); let (round_manager_tx, round_manager_rx) = aptos_channel::new( QueueStyle::KLAST, @@ -887,6 +898,7 @@ impl EpochManager

{ onchain_randomness_config, onchain_jwk_consensus_config, fast_rand_config, + failures_tracker, ); round_manager.init(last_vote).await; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f8545073966bd..3660afb3b49f5 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -31,6 +31,8 @@ mod network_tests; mod payload_client; mod pending_order_votes; mod pending_votes; +#[cfg(test)] +mod pending_votes_test; pub mod persistent_liveness_storage; mod pipeline; pub mod quorum_store; diff --git a/consensus/src/liveness/mod.rs b/consensus/src/liveness/mod.rs index f7e8f11bceb05..effa52291246f 100644 --- a/consensus/src/liveness/mod.rs +++ b/consensus/src/liveness/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod cached_proposer_election; pub(crate) mod leader_reputation; pub(crate) mod proposal_generator; +pub(crate) mod proposal_status_tracker; pub(crate) mod proposer_election; pub(crate) mod rotating_proposer_election; pub(crate) mod round_proposer_election; diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index 334b0a76fbf4e..47bdea4c9ce95 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -2,7 +2,9 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::proposer_election::ProposerElection; +use super::{ + proposal_status_tracker::TOptQSPullParamsProvider, proposer_election::ProposerElection, +}; use crate::{ block_storage::BlockReader, counters::{ @@ -12,7 +14,7 @@ use crate::{ PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION, }, - payload_client::{PayloadClient, PayloadPullParameters}, + payload_client::PayloadClient, util::time_service::TimeService, }; use anyhow::{bail, ensure, format_err, Context}; @@ -23,6 +25,7 @@ use aptos_consensus_types::{ block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, + payload_pull_params::PayloadPullParameters, pipelined_block::ExecutionSummary, quorum_cert::QuorumCert, utils::PayloadTxnsSize, @@ -267,6 +270,7 @@ pub struct ProposalGenerator { vtxn_config: ValidatorTxnConfig, allow_batches_without_pos_in_proposal: bool, + opt_qs_payload_param_provider: Arc, } impl ProposalGenerator { @@ -287,6 +291,7 @@ impl ProposalGenerator { quorum_store_enabled: bool, vtxn_config: ValidatorTxnConfig, allow_batches_without_pos_in_proposal: bool, + opt_qs_payload_param_provider: Arc, ) -> Self { Self { author, @@ -305,6 +310,7 @@ impl ProposalGenerator { quorum_store_enabled, vtxn_config, allow_batches_without_pos_in_proposal, + opt_qs_payload_param_provider, } } @@ -353,6 +359,7 @@ impl ProposalGenerator { bail!("Already proposed in the round {}", round); } } + let maybe_optqs_payload_pull_params = self.opt_qs_payload_param_provider.get_params(); let hqc = self.ensure_highest_quorum_cert(round)?; @@ -456,7 +463,7 @@ impl ProposalGenerator { soft_max_txns_after_filtering: max_txns_from_block_to_execute .unwrap_or(max_block_txns_after_filtering), max_inline_txns: self.max_inline_txns, - opt_batch_txns_pct: 0, + maybe_optqs_payload_pull_params, user_txn_filter: payload_filter, pending_ordering, pending_uncommitted_blocks: pending_blocks.len(), diff --git a/consensus/src/liveness/proposal_generator_test.rs b/consensus/src/liveness/proposal_generator_test.rs index aae56dc864644..5aa907d7fe672 100644 --- a/consensus/src/liveness/proposal_generator_test.rs +++ b/consensus/src/liveness/proposal_generator_test.rs @@ -8,6 +8,7 @@ use crate::{ proposal_generator::{ ChainHealthBackoffConfig, PipelineBackpressureConfig, ProposalGenerator, }, + proposal_status_tracker::TOptQSPullParamsProvider, rotating_proposer_election::RotatingProposer, unequivocal_proposer_election::UnequivocalProposerElection, }, @@ -17,6 +18,7 @@ use crate::{ use aptos_consensus_types::{ block::{block_test_utils::certificate_for_genesis, Block}, common::Author, + payload_pull_params::OptQSPayloadPullParams, utils::PayloadTxnsSize, }; use aptos_types::{on_chain_config::ValidatorTxnConfig, validator_signer::ValidatorSigner}; @@ -27,6 +29,14 @@ fn empty_callback() -> BoxFuture<'static, ()> { async move {}.boxed() } +struct MockOptQSPayloadProvider {} + +impl TOptQSPullParamsProvider for MockOptQSPayloadProvider { + fn get_params(&self) -> Option { + None + } +} + #[tokio::test] async fn test_proposal_generation_empty_tree() { let signer = ValidatorSigner::random(None); @@ -47,6 +57,7 @@ async fn test_proposal_generation_empty_tree() { false, ValidatorTxnConfig::default_disabled(), true, + Arc::new(MockOptQSPayloadProvider {}), ); let proposer_election = Arc::new(UnequivocalProposerElection::new(Arc::new( RotatingProposer::new(vec![signer.author()], 1), @@ -92,6 +103,7 @@ async fn test_proposal_generation_parent() { false, ValidatorTxnConfig::default_disabled(), true, + Arc::new(MockOptQSPayloadProvider {}), ); let proposer_election = Arc::new(UnequivocalProposerElection::new(Arc::new( RotatingProposer::new(vec![inserter.signer().author()], 1), @@ -167,6 +179,7 @@ async fn test_old_proposal_generation() { false, ValidatorTxnConfig::default_disabled(), true, + Arc::new(MockOptQSPayloadProvider {}), ); let proposer_election = Arc::new(UnequivocalProposerElection::new(Arc::new( RotatingProposer::new(vec![inserter.signer().author()], 1), @@ -207,6 +220,7 @@ async fn test_correct_failed_authors() { false, ValidatorTxnConfig::default_disabled(), true, + Arc::new(MockOptQSPayloadProvider {}), ); let proposer_election = Arc::new(UnequivocalProposerElection::new(Arc::new( RotatingProposer::new(vec![author, peer1, peer2], 1), diff --git a/consensus/src/liveness/proposal_status_tracker.rs b/consensus/src/liveness/proposal_status_tracker.rs new file mode 100644 index 0000000000000..23f635e260fb8 --- /dev/null +++ b/consensus/src/liveness/proposal_status_tracker.rs @@ -0,0 +1,210 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::round_state::NewRoundReason; +use aptos_collections::BoundedVecDeque; +use aptos_consensus_types::{ + common::Author, payload_pull_params::OptQSPayloadPullParams, round_timeout::RoundTimeoutReason, +}; +use aptos_infallible::Mutex; +use std::{collections::HashSet, sync::Arc}; + +pub trait TPastProposalStatusTracker: Send + Sync { + fn push(&self, status: NewRoundReason); +} + +pub trait TOptQSPullParamsProvider: Send + Sync { + fn get_params(&self) -> Option; +} + +/// A exponential window based algorithm to decide whether to go optimistic or not, based on +/// configurable number of past proposal statuses +/// +/// Initialize the window at 2. +/// - For each proposal failure, double the window up to a MAX size +/// - If there are no failures within the window, then propose optimistic batch +/// - If there are no failures up to MAX proposals, reset the window to 2. +pub struct ExponentialWindowFailureTracker { + window: usize, + max_window: usize, + past_round_statuses: BoundedVecDeque, + last_consecutive_success_count: usize, + ordered_authors: Vec, +} + +impl ExponentialWindowFailureTracker { + pub(crate) fn new(max_window: usize, ordered_authors: Vec) -> Self { + Self { + window: 2, + max_window, + past_round_statuses: BoundedVecDeque::new(max_window), + last_consecutive_success_count: 0, + ordered_authors, + } + } + + pub(crate) fn push(&mut self, status: NewRoundReason) { + self.past_round_statuses.push_back(status); + self.compute_failure_window(); + } + + fn last_consecutive_statuses_matching(&self, matcher: F) -> usize + where + F: Fn(&NewRoundReason) -> bool, + { + self.past_round_statuses + .iter() + .rev() + .take_while(|reason| matcher(reason)) + .count() + } + + fn compute_failure_window(&mut self) { + self.last_consecutive_success_count = self.last_consecutive_statuses_matching(|reason| { + !matches!( + reason, + NewRoundReason::Timeout(RoundTimeoutReason::PayloadUnavailable { .. }) + ) + }); + if self.last_consecutive_success_count == 0 { + self.window *= 2; + self.window = self.window.min(self.max_window); + } else if self.last_consecutive_success_count == self.past_round_statuses.len() { + self.window = 2; + } + } + + fn get_exclude_authors(&self) -> HashSet { + let mut exclude_authors = HashSet::new(); + + let limit = self.window; + for round_reason in self.past_round_statuses.iter().rev().take(limit) { + if let NewRoundReason::Timeout(RoundTimeoutReason::PayloadUnavailable { + missing_authors, + }) = round_reason + { + for author_idx in missing_authors.iter_ones() { + if let Some(author) = self.ordered_authors.get(author_idx) { + exclude_authors.insert(*author); + } + } + } + } + + exclude_authors + } +} + +impl TPastProposalStatusTracker for Mutex { + fn push(&self, status: NewRoundReason) { + self.lock().push(status) + } +} + +pub struct OptQSPullParamsProvider { + enable_opt_qs: bool, + failure_tracker: Arc>, +} + +impl OptQSPullParamsProvider { + pub fn new( + enable_opt_qs: bool, + failure_tracker: Arc>, + ) -> Self { + Self { + enable_opt_qs, + failure_tracker, + } + } +} + +impl TOptQSPullParamsProvider for OptQSPullParamsProvider { + fn get_params(&self) -> Option { + if !self.enable_opt_qs { + return None; + } + + let tracker = self.failure_tracker.lock(); + + if tracker.last_consecutive_success_count < tracker.window { + return None; + } + + let exclude_authors = tracker.get_exclude_authors(); + Some(OptQSPayloadPullParams { + exclude_authors, + minimum_batch_age_usecs: 50_000_000, + }) + } +} + +#[cfg(test)] +mod tests { + use super::ExponentialWindowFailureTracker; + use crate::liveness::round_state::NewRoundReason; + use aptos_bitvec::BitVec; + use aptos_consensus_types::round_timeout::RoundTimeoutReason; + use aptos_types::validator_verifier::random_validator_verifier; + + #[test] + fn test_exponential_window_failure_tracker() { + let (_signers, verifier) = random_validator_verifier(4, None, false); + let mut tracker = + ExponentialWindowFailureTracker::new(100, verifier.get_ordered_account_addresses()); + assert_eq!(tracker.max_window, 100); + + tracker.push(NewRoundReason::QCReady); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 1); + + tracker.push(NewRoundReason::QCReady); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 2); + + tracker.push(NewRoundReason::QCReady); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 3); + + tracker.push(NewRoundReason::Timeout( + RoundTimeoutReason::ProposalNotReceived, + )); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 4); + + tracker.push(NewRoundReason::Timeout(RoundTimeoutReason::NoQC)); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 5); + + tracker.push(NewRoundReason::Timeout(RoundTimeoutReason::Unknown)); + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, 6); + + tracker.push(NewRoundReason::Timeout( + RoundTimeoutReason::PayloadUnavailable { + missing_authors: BitVec::with_num_bits(4), + }, + )); + assert_eq!(tracker.window, 4); + assert_eq!(tracker.last_consecutive_success_count, 0); + + tracker.push(NewRoundReason::QCReady); + assert_eq!(tracker.window, 4); + assert_eq!(tracker.last_consecutive_success_count, 1); + + // Check that the window does not grow beyond max_window + for _ in 0..10 { + tracker.push(NewRoundReason::Timeout( + RoundTimeoutReason::PayloadUnavailable { + missing_authors: BitVec::with_num_bits(4), + }, + )); + } + assert_eq!(tracker.window, tracker.max_window); + + for _ in 0..tracker.max_window { + tracker.push(NewRoundReason::QCReady); + } + assert_eq!(tracker.window, 2); + assert_eq!(tracker.last_consecutive_success_count, tracker.max_window); + } +} diff --git a/consensus/src/liveness/round_state.rs b/consensus/src/liveness/round_state.rs index 37912ecbdaaa7..2c7ea4c198da6 100644 --- a/consensus/src/liveness/round_state.rs +++ b/consensus/src/liveness/round_state.rs @@ -8,8 +8,11 @@ use crate::{ util::time_service::{SendTask, TimeService}, }; use aptos_consensus_types::{ - common::Round, round_timeout::RoundTimeout, sync_info::SyncInfo, - timeout_2chain::TwoChainTimeoutWithPartialSignatures, vote::Vote, + common::Round, + round_timeout::{RoundTimeout, RoundTimeoutReason}, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutWithPartialSignatures, + vote::Vote, }; use aptos_crypto::HashValue; use aptos_logger::{prelude::*, Schema}; @@ -19,17 +22,17 @@ use serde::Serialize; use std::{fmt, sync::Arc, time::Duration}; /// A reason for starting a new round: introduced for monitoring / debug purposes. -#[derive(Serialize, Debug, PartialEq, Eq)] +#[derive(Serialize, Debug, PartialEq, Eq, Clone)] pub enum NewRoundReason { QCReady, - Timeout, + Timeout(RoundTimeoutReason), } impl fmt::Display for NewRoundReason { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { NewRoundReason::QCReady => write!(f, "QCReady"), - NewRoundReason::Timeout => write!(f, "TCReady"), + NewRoundReason::Timeout(_) => write!(f, "TCReady"), } } } @@ -240,7 +243,11 @@ impl RoundState { /// Notify the RoundState about the potentially new QC, TC, and highest ordered round. /// Note that some of these values might not be available by the caller. - pub fn process_certificates(&mut self, sync_info: SyncInfo) -> Option { + pub fn process_certificates( + &mut self, + sync_info: SyncInfo, + verifier: &ValidatorVerifier, + ) -> Option { if sync_info.highest_ordered_round() > self.highest_ordered_round { self.highest_ordered_round = sync_info.highest_ordered_round(); } @@ -254,13 +261,21 @@ impl RoundState { self.vote_sent = None; self.timeout_sent = None; let timeout = self.setup_timeout(1); + + let (prev_round_timeout_votes, prev_round_timeout_reason) = prev_round_timeout_votes + .map(|votes| votes.unpack_aggregate(verifier)) + .unzip(); + // The new round reason is QCReady in case both QC.round + 1 == new_round, otherwise // it's Timeout and TC.round + 1 == new_round. let new_round_reason = if sync_info.highest_certified_round() + 1 == new_round { NewRoundReason::QCReady } else { - NewRoundReason::Timeout + let prev_round_timeout_reason = + prev_round_timeout_reason.unwrap_or(RoundTimeoutReason::Unknown); + NewRoundReason::Timeout(prev_round_timeout_reason) }; + let new_round_event = NewRoundEvent { round: self.current_round, reason: new_round_reason, diff --git a/consensus/src/liveness/round_state_test.rs b/consensus/src/liveness/round_state_test.rs index ad2eec8809e53..10027e86c351e 100644 --- a/consensus/src/liveness/round_state_test.rs +++ b/consensus/src/liveness/round_state_test.rs @@ -11,6 +11,7 @@ use crate::{ use aptos_consensus_types::{ common::Round, quorum_cert::QuorumCert, + round_timeout::RoundTimeoutReason, sync_info::SyncInfo, timeout_2chain::{TwoChainTimeout, TwoChainTimeoutCertificate}, vote_data::VoteData, @@ -20,6 +21,7 @@ use aptos_types::{ aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_verifier::random_validator_verifier, }; use futures::StreamExt; use std::{sync::Arc, time::Duration}; @@ -40,10 +42,11 @@ fn test_round_time_interval() { #[tokio::test] /// Verify that RoundState properly outputs local timeout events upon timeout async fn test_basic_timeout() { + let (_, verifier) = random_validator_verifier(1, None, false); let (mut pm, mut timeout_rx) = make_round_state(); // jump start the round_state - pm.process_certificates(generate_sync_info(Some(0), None, None)); + pm.process_certificates(generate_sync_info(Some(0), None, None), &verifier); for _ in 0..2 { let round = timeout_rx.next().await.unwrap(); // Here we just test timeout send retry, @@ -55,30 +58,31 @@ async fn test_basic_timeout() { #[test] fn test_round_event_generation() { + let (_, verifier) = random_validator_verifier(1, None, false); let (mut pm, _) = make_round_state(); // Happy path with new QC expect_qc( 2, - pm.process_certificates(generate_sync_info(Some(1), None, None)), + pm.process_certificates(generate_sync_info(Some(1), None, None), &verifier), ); // Old QC does not generate anything assert!(pm - .process_certificates(generate_sync_info(Some(1), None, None)) + .process_certificates(generate_sync_info(Some(1), None, None), &verifier) .is_none()); // A TC for a higher round expect_timeout( 3, - pm.process_certificates(generate_sync_info(None, Some(2), None)), + pm.process_certificates(generate_sync_info(None, Some(2), None), &verifier), ); // In case both QC and TC are present choose the one with the higher value expect_timeout( 4, - pm.process_certificates(generate_sync_info(Some(2), Some(3), None)), + pm.process_certificates(generate_sync_info(Some(2), Some(3), None), &verifier), ); // In case both QC and TC are present with the same value, choose QC expect_qc( 5, - pm.process_certificates(generate_sync_info(Some(4), Some(4), None)), + pm.process_certificates(generate_sync_info(Some(4), Some(4), None), &verifier), ); } @@ -101,7 +105,10 @@ fn expect_qc(round: Round, event: Option) { fn expect_timeout(round: Round, event: Option) { let event = event.unwrap(); assert_eq!(round, event.round); - assert_eq!(event.reason, NewRoundReason::Timeout); + assert_eq!( + event.reason, + NewRoundReason::Timeout(RoundTimeoutReason::Unknown) + ); } fn generate_sync_info( diff --git a/consensus/src/payload_client/mixed.rs b/consensus/src/payload_client/mixed.rs index 5e35b5aff6bae..afc981ab4a3b8 100644 --- a/consensus/src/payload_client/mixed.rs +++ b/consensus/src/payload_client/mixed.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::PayloadPullParameters; use crate::{ error::QuorumStoreError, payload_client::{user::UserPayloadClient, PayloadClient}, }; -use aptos_consensus_types::{common::Payload, utils::PayloadTxnsSize}; +use aptos_consensus_types::{ + common::Payload, payload_pull_params::PayloadPullParameters, utils::PayloadTxnsSize, +}; use aptos_logger::debug; use aptos_types::{on_chain_config::ValidatorTxnConfig, validator_txn::ValidatorTransaction}; use aptos_validator_transaction_pool::TransactionFilter; @@ -112,9 +113,11 @@ impl PayloadClient for MixedPayloadClient { mod tests { use crate::payload_client::{ mixed::MixedPayloadClient, user, validator::DummyValidatorTxnClient, PayloadClient, - PayloadPullParameters, }; - use aptos_consensus_types::common::{Payload, PayloadFilter}; + use aptos_consensus_types::{ + common::{Payload, PayloadFilter}, + payload_pull_params::PayloadPullParameters, + }; use aptos_types::{on_chain_config::ValidatorTxnConfig, validator_txn::ValidatorTransaction}; use aptos_validator_transaction_pool as vtxn_pool; use std::{collections::HashSet, sync::Arc, time::Duration}; diff --git a/consensus/src/payload_client/mod.rs b/consensus/src/payload_client/mod.rs index 1b769faa9c36a..e38ba3194329f 100644 --- a/consensus/src/payload_client/mod.rs +++ b/consensus/src/payload_client/mod.rs @@ -2,88 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 use crate::error::QuorumStoreError; -use aptos_consensus_types::{ - common::{Payload, PayloadFilter}, - utils::PayloadTxnsSize, -}; +use aptos_consensus_types::{common::Payload, payload_pull_params::PayloadPullParameters}; use aptos_types::validator_txn::ValidatorTransaction; use aptos_validator_transaction_pool::TransactionFilter; -use core::fmt; use futures::future::BoxFuture; -use std::time::Duration; pub mod mixed; pub mod user; pub mod validator; -pub struct PayloadPullParameters { - pub max_poll_time: Duration, - pub max_txns: PayloadTxnsSize, - pub max_txns_after_filtering: u64, - pub soft_max_txns_after_filtering: u64, - pub max_inline_txns: PayloadTxnsSize, - pub opt_batch_txns_pct: u8, - pub user_txn_filter: PayloadFilter, - pub pending_ordering: bool, - pub pending_uncommitted_blocks: usize, - pub recent_max_fill_fraction: f32, - pub block_timestamp: Duration, -} - -impl PayloadPullParameters { - #[cfg(test)] - fn new_for_test( - max_poll_time: Duration, - max_txns: u64, - max_txns_bytes: u64, - max_txns_after_filtering: u64, - soft_max_txns_after_filtering: u64, - max_inline_txns: u64, - max_inline_txns_bytes: u64, - user_txn_filter: PayloadFilter, - pending_ordering: bool, - pending_uncommitted_blocks: usize, - recent_max_fill_fraction: f32, - block_timestamp: Duration, - ) -> Self { - Self { - max_poll_time, - max_txns: PayloadTxnsSize::new(max_txns, max_txns_bytes), - max_txns_after_filtering, - soft_max_txns_after_filtering, - max_inline_txns: PayloadTxnsSize::new(max_inline_txns, max_inline_txns_bytes), - opt_batch_txns_pct: 0, - user_txn_filter, - pending_ordering, - pending_uncommitted_blocks, - recent_max_fill_fraction, - block_timestamp, - } - } -} - -impl fmt::Debug for PayloadPullParameters { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PayloadPullParameters") - .field("max_poll_time", &self.max_poll_time) - .field("max_items", &self.max_txns) - .field("max_unique_items", &self.max_txns_after_filtering) - .field( - "soft_max_txns_after_filtering", - &self.soft_max_txns_after_filtering, - ) - .field("max_inline_items", &self.max_inline_txns) - .field("pending_ordering", &self.pending_ordering) - .field( - "pending_uncommitted_blocks", - &self.pending_uncommitted_blocks, - ) - .field("recent_max_fill_fraction", &self.recent_max_fill_fraction) - .field("block_timestamp", &self.block_timestamp) - .finish() - } -} - #[async_trait::async_trait] pub trait PayloadClient: Send + Sync { async fn pull_payload( diff --git a/consensus/src/payload_client/user/mod.rs b/consensus/src/payload_client/user/mod.rs index 9d6cafbed2322..e3f2ca8acba43 100644 --- a/consensus/src/payload_client/user/mod.rs +++ b/consensus/src/payload_client/user/mod.rs @@ -1,9 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::PayloadPullParameters; use crate::error::QuorumStoreError; -use aptos_consensus_types::common::Payload; +use aptos_consensus_types::{common::Payload, payload_pull_params::PayloadPullParameters}; #[cfg(test)] use aptos_types::transaction::SignedTransaction; use futures::future::BoxFuture; diff --git a/consensus/src/payload_client/user/quorum_store_client.rs b/consensus/src/payload_client/user/quorum_store_client.rs index b145ba1f76f61..c8c541208c863 100644 --- a/consensus/src/payload_client/user/quorum_store_client.rs +++ b/consensus/src/payload_client/user/quorum_store_client.rs @@ -2,13 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - counters::WAIT_FOR_FULL_BLOCKS_TRIGGERED, - error::QuorumStoreError, - monitor, - payload_client::{user::UserPayloadClient, PayloadPullParameters}, + counters::WAIT_FOR_FULL_BLOCKS_TRIGGERED, error::QuorumStoreError, monitor, + payload_client::user::UserPayloadClient, }; use aptos_consensus_types::{ common::{Payload, PayloadFilter}, + payload_pull_params::{OptQSPayloadPullParams, PayloadPullParameters}, request_response::{GetPayloadCommand, GetPayloadRequest, GetPayloadResponse}, utils::PayloadTxnsSize, }; @@ -52,7 +51,7 @@ impl QuorumStoreClient { max_txns_after_filtering: u64, soft_max_txns_after_filtering: u64, max_inline_txns: PayloadTxnsSize, - txns_with_proofs_pct: u8, + maybe_optqs_payload_pull_params: Option, return_non_full: bool, exclude_payloads: PayloadFilter, block_timestamp: Duration, @@ -62,7 +61,7 @@ impl QuorumStoreClient { max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, - opt_batch_txns_pct: txns_with_proofs_pct, + maybe_optqs_payload_pull_params, max_inline_txns, filter: exclude_payloads, return_non_full, @@ -119,7 +118,7 @@ impl UserPayloadClient for QuorumStoreClient { params.max_txns_after_filtering, params.soft_max_txns_after_filtering, params.max_inline_txns, - params.opt_batch_txns_pct, + params.maybe_optqs_payload_pull_params.clone(), return_non_full || return_empty || done, params.user_txn_filter.clone(), params.block_timestamp, diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index ca0fe612c0f8a..16f3e305ea585 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -10,6 +10,7 @@ use crate::{ counters, quorum_store::{batch_store::BatchReader, quorum_store_coordinator::CoordinatorCommand}, }; +use aptos_bitvec::BitVec; use aptos_consensus_types::{ block::Block, common::{DataStatus, Payload, ProofWithData, Round}, @@ -28,7 +29,7 @@ use async_trait::async_trait; use futures::{channel::mpsc::Sender, FutureExt}; use itertools::Itertools; use std::{ - collections::{btree_map::Entry, BTreeMap}, + collections::{btree_map::Entry, BTreeMap, HashMap}, ops::Deref, sync::Arc, }; @@ -49,7 +50,7 @@ pub trait TPayloadManager: Send + Sync { /// Check if the transactions corresponding are available. This is specific to payload /// manager implementations. For optimistic quorum store, we only check if optimistic /// batches are available locally. - fn check_payload_availability(&self, block: &Block) -> bool; + fn check_payload_availability(&self, block: &Block) -> Result<(), BitVec>; /// Get the transactions in a block's payload. This function returns a vector of transactions. async fn get_transactions( @@ -73,8 +74,8 @@ impl TPayloadManager for DirectMempoolPayloadManager { fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} - fn check_payload_availability(&self, _block: &Block) -> bool { - true + fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { + Ok(()) } async fn get_transactions( @@ -104,6 +105,7 @@ pub struct QuorumStorePayloadManager { coordinator_tx: Sender, maybe_consensus_publisher: Option>, ordered_authors: Vec, + address_to_validator_index: HashMap, } impl QuorumStorePayloadManager { @@ -112,12 +114,14 @@ impl QuorumStorePayloadManager { coordinator_tx: Sender, maybe_consensus_publisher: Option>, ordered_authors: Vec, + address_to_validator_index: HashMap, ) -> Self { Self { batch_reader, coordinator_tx, maybe_consensus_publisher, ordered_authors, + address_to_validator_index, } } @@ -295,17 +299,17 @@ impl TPayloadManager for QuorumStorePayloadManager { }; } - fn check_payload_availability(&self, block: &Block) -> bool { + fn check_payload_availability(&self, block: &Block) -> Result<(), BitVec> { let Some(payload) = block.payload() else { - return true; + return Ok(()); }; match payload { Payload::DirectMempool(_) => { unreachable!("QuorumStore doesn't support DirectMempool payload") }, - Payload::InQuorumStore(_) => true, - Payload::InQuorumStoreWithLimit(_) => true, + Payload::InQuorumStore(_) => Ok(()), + Payload::InQuorumStoreWithLimit(_) => Ok(()), Payload::QuorumStoreInlineHybrid(inline_batches, proofs, _) => { fn update_availability_metrics<'a>( batch_reader: &Arc, @@ -352,15 +356,24 @@ impl TPayloadManager for QuorumStorePayloadManager { // The payload is considered available because it contains only proofs that guarantee network availabiliy // or inlined transactions. - true + Ok(()) }, Payload::OptQuorumStore(opt_qs_payload) => { + let mut missing_authors = BitVec::with_num_bits(self.ordered_authors.len() as u16); for batch in opt_qs_payload.opt_batches().deref() { if self.batch_reader.exists(batch.digest()).is_none() { - return false; + let index = *self + .address_to_validator_index + .get(&batch.author()) + .expect("Payload author should have been verified"); + missing_authors.set(index as u16); } } - true + if missing_authors.all_zeros() { + Ok(()) + } else { + Err(missing_authors) + } }, } } @@ -450,7 +463,7 @@ impl TPayloadManager for QuorumStorePayloadManager { ) .await?; let inline_batch_txns = opt_qs_payload.inline_batches().transactions(); - let all_txns = [opt_batch_txns, proof_batch_txns, inline_batch_txns].concat(); + let all_txns = [proof_batch_txns, opt_batch_txns, inline_batch_txns].concat(); BlockTransactionPayload::new_opt_quorum_store( all_txns, opt_qs_payload.proof_with_data().deref().clone(), @@ -733,7 +746,7 @@ impl TPayloadManager for ConsensusObserverPayloadManager { // noop } - fn check_payload_availability(&self, _block: &Block) -> bool { + fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { unreachable!("this method isn't used in ConsensusObserver") } diff --git a/consensus/src/pending_votes.rs b/consensus/src/pending_votes.rs index 221ed6bca1970..e6651681766b3 100644 --- a/consensus/src/pending_votes.rs +++ b/consensus/src/pending_votes.rs @@ -9,14 +9,17 @@ //! Votes are automatically dropped when the structure goes out of scope. use crate::counters; +use aptos_bitvec::BitVec; use aptos_consensus_types::{ common::Author, quorum_cert::QuorumCert, - round_timeout::RoundTimeout, - timeout_2chain::{TwoChainTimeoutCertificate, TwoChainTimeoutWithPartialSignatures}, + round_timeout::{RoundTimeout, RoundTimeoutReason}, + timeout_2chain::{ + TwoChainTimeout, TwoChainTimeoutCertificate, TwoChainTimeoutWithPartialSignatures, + }, vote::Vote, }; -use aptos_crypto::{hash::CryptoHash, HashValue}; +use aptos_crypto::{bls12381, hash::CryptoHash, HashValue}; use aptos_logger::prelude::*; use aptos_types::{ ledger_info::{LedgerInfoWithSignatures, LedgerInfoWithUnverifiedSignatures}, @@ -59,6 +62,106 @@ pub enum VoteStatus { NotEnoughVotes(LedgerInfoWithUnverifiedSignatures), } +#[derive(Debug)] +pub(super) struct TwoChainTimeoutVotes { + timeout_reason: HashMap, + partial_2chain_tc: TwoChainTimeoutWithPartialSignatures, +} + +impl TwoChainTimeoutVotes { + pub(super) fn new(timeout: TwoChainTimeout) -> Self { + Self { + partial_2chain_tc: TwoChainTimeoutWithPartialSignatures::new(timeout.clone()), + timeout_reason: HashMap::new(), + } + } + + pub(super) fn add( + &mut self, + author: Author, + timeout: TwoChainTimeout, + signature: bls12381::Signature, + reason: RoundTimeoutReason, + ) { + self.partial_2chain_tc.add(author, timeout, signature); + self.timeout_reason.entry(author).or_insert(reason); + } + + pub(super) fn partial_2chain_tc_mut(&mut self) -> &mut TwoChainTimeoutWithPartialSignatures { + &mut self.partial_2chain_tc + } + + fn aggregated_timeout_reason(&self, verifier: &ValidatorVerifier) -> RoundTimeoutReason { + let mut reason_voting_power: HashMap = HashMap::new(); + let mut missing_batch_authors: HashMap = HashMap::new(); + // let ordered_authors = verifier.get_ordered_account_addresses(); + for (author, reason) in &self.timeout_reason { + // To aggregate the reason, we only care about the variant type itself and + // exclude any data within the variants. + let reason_key = match reason { + reason @ RoundTimeoutReason::Unknown + | reason @ RoundTimeoutReason::ProposalNotReceived + | reason @ RoundTimeoutReason::NoQC => reason.clone(), + RoundTimeoutReason::PayloadUnavailable { missing_authors } => { + for missing_idx in missing_authors.iter_ones() { + *missing_batch_authors.entry(missing_idx).or_default() += + verifier.get_voting_power(author).unwrap_or_default() as u128; + } + RoundTimeoutReason::PayloadUnavailable { + // Since we care only about the variant type, we replace the bitvec + // with a placeholder. + missing_authors: BitVec::with_num_bits(verifier.len() as u16), + } + }, + }; + *reason_voting_power.entry(reason_key).or_default() += + verifier.get_voting_power(author).unwrap_or_default() as u128; + } + // The aggregated timeout reason is the reason with the most voting power received from + // at least f+1 peers by voting power. If such voting power does not exist, then the + // reason is unknown. + + reason_voting_power + .into_iter() + .max_by_key(|(_, voting_power)| *voting_power) + .filter(|(_, voting_power)| { + verifier + .check_aggregated_voting_power(*voting_power, false) + .is_ok() + }) + .map(|(reason, _)| { + // If the aggregated reason is due to unavailable payload, we will compute the + // aggregated missing authors bitvec counting batch authors that have been reported + // missing by minority peers. + if matches!(reason, RoundTimeoutReason::PayloadUnavailable { .. }) { + let mut aggregated_bitvec = BitVec::with_num_bits(verifier.len() as u16); + for (author_idx, voting_power) in missing_batch_authors { + if verifier + .check_aggregated_voting_power(voting_power, false) + .is_ok() + { + aggregated_bitvec.set(author_idx as u16); + } + } + RoundTimeoutReason::PayloadUnavailable { + missing_authors: aggregated_bitvec, + } + } else { + reason + } + }) + .unwrap_or(RoundTimeoutReason::Unknown) + } + + pub(crate) fn unpack_aggregate( + self, + verifier: &ValidatorVerifier, + ) -> (TwoChainTimeoutWithPartialSignatures, RoundTimeoutReason) { + let aggregated_reason = self.aggregated_timeout_reason(verifier); + (self.partial_2chain_tc, aggregated_reason) + } +} + /// A PendingVotes structure keep track of votes pub struct PendingVotes { /// Maps LedgerInfo digest to associated signatures. @@ -66,7 +169,7 @@ pub struct PendingVotes { /// or due to different NIL proposals (clients can have a different view of what block to extend). li_digest_to_votes: HashMap, /// Tracks all the signatures of the 2-chain timeout for the given round. - maybe_partial_2chain_tc: Option, + maybe_2chain_timeout_votes: Option, /// Map of Author to (vote, li_digest). This is useful to discard multiple votes. author_to_vote: HashMap, /// Whether we have echoed timeout for this round. @@ -78,7 +181,7 @@ impl PendingVotes { pub fn new() -> Self { PendingVotes { li_digest_to_votes: HashMap::new(), - maybe_partial_2chain_tc: None, + maybe_2chain_timeout_votes: None, author_to_vote: HashMap::new(), echo_timeout: false, } @@ -119,10 +222,17 @@ impl PendingVotes { .with_label_values(&[&round_timeout.author().to_string()]) .set(cur_round as i64); - let partial_tc = self - .maybe_partial_2chain_tc - .get_or_insert_with(|| TwoChainTimeoutWithPartialSignatures::new(timeout.clone())); - partial_tc.add(round_timeout.author(), timeout.clone(), signature.clone()); + let two_chain_votes = self + .maybe_2chain_timeout_votes + .get_or_insert_with(|| TwoChainTimeoutVotes::new(timeout.clone())); + two_chain_votes.add( + round_timeout.author(), + timeout.clone(), + signature.clone(), + round_timeout.reason().clone(), + ); + + let partial_tc = two_chain_votes.partial_2chain_tc_mut(); let tc_voting_power = match validator_verifier.check_voting_power(partial_tc.signers(), true) { Ok(_) => { @@ -319,10 +429,17 @@ impl PendingVotes { .with_label_values(&[&vote.author().to_string()]) .set(cur_round); - let partial_tc = self - .maybe_partial_2chain_tc - .get_or_insert_with(|| TwoChainTimeoutWithPartialSignatures::new(timeout.clone())); - partial_tc.add(vote.author(), timeout.clone(), signature.clone()); + let two_chain_votes = self + .maybe_2chain_timeout_votes + .get_or_insert_with(|| TwoChainTimeoutVotes::new(timeout.clone())); + two_chain_votes.add( + vote.author(), + timeout.clone(), + signature.clone(), + RoundTimeoutReason::Unknown, + ); + + let partial_tc = two_chain_votes.partial_2chain_tc_mut(); let tc_voting_power = match validator_verifier.check_voting_power(partial_tc.signers(), true) { Ok(_) => { @@ -362,12 +479,7 @@ impl PendingVotes { VoteReceptionResult::VoteAdded(voting_power) } - pub fn drain_votes( - &mut self, - ) -> ( - Vec<(HashValue, VoteStatus)>, - Option, - ) { + pub fn drain_votes(&mut self) -> (Vec<(HashValue, VoteStatus)>, Option) { for (hash_index, _) in self.li_digest_to_votes.values() { let hash_index_str = hash_index_to_str(*hash_index); for author in self.author_to_vote.keys() { @@ -376,8 +488,8 @@ impl PendingVotes { .set(0_f64); } } - if let Some(partial_tc) = &self.maybe_partial_2chain_tc { - for author in partial_tc.signers() { + if let Some(votes) = &self.maybe_2chain_timeout_votes { + for author in votes.partial_2chain_tc.signers() { counters::CONSENSUS_CURRENT_ROUND_TIMEOUT_VOTED_POWER .with_label_values(&[&author.to_string()]) .set(0_f64); @@ -389,7 +501,7 @@ impl PendingVotes { .drain() .map(|(key, (_, vote_status))| (key, vote_status)) .collect(), - self.maybe_partial_2chain_tc.take(), + self.maybe_2chain_timeout_votes.take(), ) } } @@ -429,9 +541,9 @@ impl fmt::Display for PendingVotes { // collect timeout votes let timeout_votes = self - .maybe_partial_2chain_tc + .maybe_2chain_timeout_votes .as_ref() - .map(|partial_tc| partial_tc.signers().collect::>()); + .map(|votes| votes.partial_2chain_tc.signers().collect::>()); if let Some(authors) = timeout_votes { write!(f, "{} timeout {:?}", authors.len(), authors)?; diff --git a/consensus/src/pending_votes_test.rs b/consensus/src/pending_votes_test.rs new file mode 100644 index 0000000000000..e2fc1de8d3ff6 --- /dev/null +++ b/consensus/src/pending_votes_test.rs @@ -0,0 +1,161 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::pending_votes::TwoChainTimeoutVotes; +use aptos_bitvec::BitVec; +use aptos_consensus_types::{ + quorum_cert::QuorumCert, round_timeout::RoundTimeoutReason, timeout_2chain::TwoChainTimeout, +}; +use aptos_types::validator_verifier::{ + random_validator_verifier, random_validator_verifier_with_voting_power, +}; +use itertools::Itertools; + +#[test] +fn test_two_chain_timeout_votes_aggregation() { + let epoch = 1; + let round = 10; + let (signers, verifier) = random_validator_verifier(4, None, false); + let all_reasons = [ + RoundTimeoutReason::NoQC, + RoundTimeoutReason::ProposalNotReceived, + RoundTimeoutReason::Unknown, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: BitVec::with_num_bits(signers.len() as u16), + }, + ]; + + // Majority nodes timeout with same reason + for reason in &all_reasons { + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let mut two_chain_timeout_votes = TwoChainTimeoutVotes::new(timeout); + for signer in signers.iter().take(3) { + let author = signer.author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signer.sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add(author, timeout, signature, reason.clone()); + } + let (_, aggregate_timeout_reason) = two_chain_timeout_votes.unpack_aggregate(&verifier); + assert_eq!(aggregate_timeout_reason, reason.clone()); + } + + // Minority nodes timeout with same reason and one with different reason + for permut in all_reasons.iter().permutations(2) { + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let mut two_chain_timeout_votes = TwoChainTimeoutVotes::new(timeout); + for signer in signers.iter().take(2) { + let author = signer.author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signer.sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add(author, timeout, signature, permut[0].clone()); + } + + let author = signers[2].author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signers[2].sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add(author, timeout, signature, permut[1].clone()); + + let (_, aggregate_timeout_reason) = two_chain_timeout_votes.unpack_aggregate(&verifier); + assert_eq!(aggregate_timeout_reason, permut[0].clone()); + } +} + +#[test] +fn test_two_chain_timeout_aggregate_missing_authors() { + let epoch = 1; + let round = 10; + let (signers, verifier) = + random_validator_verifier_with_voting_power(4, None, false, &[3, 3, 2, 1]); + + let permutations = [true, true, false, false] + .iter() + .copied() + .permutations(4) + .unique(); + + // Minority nodes report the same set of missing authors + for permut in permutations { + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let mut two_chain_timeout_votes = TwoChainTimeoutVotes::new(timeout); + for signer in signers.iter().take(2) { + let author = signer.author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signer.sign(&timeout.signing_format()).unwrap(); + let reason = RoundTimeoutReason::PayloadUnavailable { + missing_authors: permut.clone().into(), + }; + two_chain_timeout_votes.add(author, timeout, signature, reason); + } + + let author = signers[2].author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signers[2].sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add(author, timeout, signature, RoundTimeoutReason::Unknown); + + let (_, aggregate_timeout_reason) = two_chain_timeout_votes.unpack_aggregate(&verifier); + + assert_eq!( + aggregate_timeout_reason, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: permut.clone().into() + } + ); + } + + // Not enough votes to form a valid timeout reason + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let mut two_chain_timeout_votes = TwoChainTimeoutVotes::new(timeout); + + let author = signers[2].author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signers[2].sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add( + author, + timeout, + signature, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: vec![true, false, false, false].into(), + }, + ); + + let (_, aggregate_timeout_reason) = two_chain_timeout_votes.unpack_aggregate(&verifier); + + assert_eq!(aggregate_timeout_reason, RoundTimeoutReason::Unknown); + + // Not enough nodes vote for the same node. + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let mut two_chain_timeout_votes = TwoChainTimeoutVotes::new(timeout); + + let author = signers[2].author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signers[2].sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add( + author, + timeout, + signature, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: vec![false, true, false, false].into(), + }, + ); + + let author = signers[3].author(); + let timeout = TwoChainTimeout::new(epoch, round, QuorumCert::dummy()); + let signature = signers[3].sign(&timeout.signing_format()).unwrap(); + two_chain_timeout_votes.add( + author, + timeout, + signature, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: vec![false, false, false, true].into(), + }, + ); + + let (_, aggregate_timeout_reason) = two_chain_timeout_votes.unpack_aggregate(&verifier); + + assert_eq!( + aggregate_timeout_reason, + RoundTimeoutReason::PayloadUnavailable { + missing_authors: BitVec::with_num_bits(4) + } + ); +} diff --git a/consensus/src/quorum_store/batch_proof_queue.rs b/consensus/src/quorum_store/batch_proof_queue.rs index cfff3bb9c7061..c542d97d7d9e3 100644 --- a/consensus/src/quorum_store/batch_proof_queue.rs +++ b/consensus/src/quorum_store/batch_proof_queue.rs @@ -7,7 +7,7 @@ use super::{ }; use crate::quorum_store::counters; use aptos_consensus_types::{ - common::TxnSummaryWithExpiration, + common::{Author, TxnSummaryWithExpiration}, payload::TDataInfo, proof_of_store::{BatchInfo, ProofOfStore}, utils::PayloadTxnsSize, @@ -69,10 +69,16 @@ pub struct BatchProofQueue { remaining_proofs: u64, remaining_local_txns: u64, remaining_local_proofs: u64, + + batch_expiry_gap_when_init_usecs: u64, } impl BatchProofQueue { - pub(crate) fn new(my_peer_id: PeerId, batch_store: Arc) -> Self { + pub(crate) fn new( + my_peer_id: PeerId, + batch_store: Arc, + batch_expiry_gap_when_init_usecs: u64, + ) -> Self { Self { my_peer_id, author_to_batches: HashMap::new(), @@ -85,6 +91,7 @@ impl BatchProofQueue { remaining_proofs: 0, remaining_local_txns: 0, remaining_local_proofs: 0, + batch_expiry_gap_when_init_usecs, } } @@ -389,11 +396,13 @@ impl BatchProofQueue { let (result, all_txns, unique_txns, is_full) = self.pull_internal( false, excluded_batches, + &HashSet::new(), max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, return_non_full, block_timestamp, + None, ); let proof_of_stores: Vec<_> = result .into_iter() @@ -429,20 +438,24 @@ impl BatchProofQueue { pub fn pull_batches( &mut self, excluded_batches: &HashSet, + exclude_authors: &HashSet, max_txns: PayloadTxnsSize, max_txns_after_filtering: u64, soft_max_txns_after_filtering: u64, return_non_full: bool, block_timestamp: Duration, + minimum_batch_age_usecs: Option, ) -> (Vec, PayloadTxnsSize, u64) { let (result, all_txns, unique_txns, _) = self.pull_internal( true, excluded_batches, + exclude_authors, max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, return_non_full, block_timestamp, + minimum_batch_age_usecs, ); let batches = result.into_iter().map(|item| item.info.clone()).collect(); (batches, all_txns, unique_txns) @@ -463,11 +476,13 @@ impl BatchProofQueue { ) { let (batches, all_txns, unique_txns) = self.pull_batches( excluded_batches, + &HashSet::new(), max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, return_non_full, block_timestamp, + None, ); let mut result = Vec::new(); for batch in batches.into_iter() { @@ -489,11 +504,13 @@ impl BatchProofQueue { &mut self, batches_without_proofs: bool, excluded_batches: &HashSet, + exclude_authors: &HashSet, max_txns: PayloadTxnsSize, max_txns_after_filtering: u64, soft_max_txns_after_filtering: u64, return_non_full: bool, block_timestamp: Duration, + min_batch_age_usecs: Option, ) -> (Vec<&QueueItem>, PayloadTxnsSize, u64, bool) { let mut result = Vec::new(); let mut cur_unique_txns = 0; @@ -515,10 +532,27 @@ impl BatchProofQueue { } } + let max_batch_creation_ts_usecs = min_batch_age_usecs + .map(|min_age| aptos_infallible::duration_since_epoch().as_micros() as u64 - min_age); let mut iters = vec![]; - for (_, batches) in self.author_to_batches.iter() { + for (_, batches) in self + .author_to_batches + .iter() + .filter(|(author, _)| !exclude_authors.contains(author)) + { let batch_iter = batches.iter().rev().filter_map(|(sort_key, info)| { if let Some(item) = self.items.get(&sort_key.batch_key) { + let batch_create_ts_usecs = + item.info.expiration() - self.batch_expiry_gap_when_init_usecs; + + // Ensure that the batch was created at least `min_batch_age_usecs` ago to + // reduce the chance of inline fetches. + if max_batch_creation_ts_usecs + .is_some_and(|max_create_ts| batch_create_ts_usecs > max_create_ts) + { + return None; + } + if item.is_committed() { return None; } diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index a33e0c1165292..7df9ab38b2783 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -34,7 +34,6 @@ pub struct ProofManager { back_pressure_total_proof_limit: u64, remaining_total_proof_num: u64, allow_batches_without_pos_in_proposal: bool, - enable_opt_quorum_store: bool, } impl ProofManager { @@ -44,16 +43,19 @@ impl ProofManager { back_pressure_total_proof_limit: u64, batch_store: Arc, allow_batches_without_pos_in_proposal: bool, - enable_opt_quorum_store: bool, + batch_expiry_gap_when_init_usecs: u64, ) -> Self { Self { - batch_proof_queue: BatchProofQueue::new(my_peer_id, batch_store), + batch_proof_queue: BatchProofQueue::new( + my_peer_id, + batch_store, + batch_expiry_gap_when_init_usecs, + ), back_pressure_total_txn_limit, remaining_total_txn_num: 0, back_pressure_total_proof_limit, remaining_total_proof_num: 0, allow_batches_without_pos_in_proposal, - enable_opt_quorum_store, } } @@ -106,10 +108,6 @@ impl ProofManager { PayloadFilter::InQuorumStore(proofs) => proofs, }; - let max_txns_with_proof = request - .max_txns - .compute_pct(100 - request.opt_batch_txns_pct); - let ( proof_block, txns_with_proof_size, @@ -117,7 +115,7 @@ impl ProofManager { proof_queue_fully_utilized, ) = self.batch_proof_queue.pull_proofs( &excluded_batches, - max_txns_with_proof, + request.max_txns, request.max_txns_after_filtering, request.soft_max_txns_after_filtering, request.return_non_full, @@ -129,26 +127,30 @@ impl ProofManager { counters::PROOF_QUEUE_FULLY_UTILIZED .observe(if proof_queue_fully_utilized { 1.0 } else { 0.0 }); - let (opt_batches, opt_batch_txns_size) = if self.enable_opt_quorum_store { + let (opt_batches, opt_batch_txns_size) = // TODO(ibalajiarun): Support unique txn calculation - let max_opt_batch_txns_size = request.max_txns - txns_with_proof_size; - let (opt_batches, opt_payload_size, _) = self.batch_proof_queue.pull_batches( - &excluded_batches - .iter() - .cloned() - .chain(proof_block.iter().map(|proof| proof.info().clone())) - .collect(), - max_opt_batch_txns_size, - request.max_txns_after_filtering, - request.soft_max_txns_after_filtering, - request.return_non_full, - request.block_timestamp, - ); + if let Some(ref params) = request.maybe_optqs_payload_pull_params { + let max_opt_batch_txns_size = request.max_txns - txns_with_proof_size; + let (opt_batches, opt_payload_size, _) = + self.batch_proof_queue.pull_batches( + &excluded_batches + .iter() + .cloned() + .chain(proof_block.iter().map(|proof| proof.info().clone())) + .collect(), + ¶ms.exclude_authors, + max_opt_batch_txns_size, + request.max_txns_after_filtering, + request.soft_max_txns_after_filtering, + request.return_non_full, + request.block_timestamp, + Some(params.minimum_batch_age_usecs), + ); - (opt_batches, opt_payload_size) - } else { - (Vec::new(), PayloadTxnsSize::zero()) - }; + (opt_batches, opt_payload_size) + } else { + (Vec::new(), PayloadTxnsSize::zero()) + }; let cur_txns = txns_with_proof_size + opt_batch_txns_size; let (inline_block, inline_block_size) = @@ -183,7 +185,7 @@ impl ProofManager { counters::NUM_INLINE_BATCHES.observe(inline_block.len() as f64); counters::NUM_INLINE_TXNS.observe(inline_block_size.count() as f64); - let response = if self.enable_opt_quorum_store { + let response = if request.maybe_optqs_payload_pull_params.is_some() { let inline_batches = inline_block.into(); Payload::OptQuorumStore(OptQuorumStorePayload::new( inline_batches, diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 74615b7a733ba..34eeb93233e98 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -365,7 +365,7 @@ impl InnerBuilder { * self.num_validators, self.batch_store.clone().unwrap(), self.config.allow_batches_without_pos_in_proposal, - self.config.enable_opt_quorum_store, + self.config.batch_expiry_gap_when_init_usecs, ); spawn_named!( "proof_manager", @@ -446,6 +446,7 @@ impl InnerBuilder { self.coordinator_tx.clone(), consensus_publisher, self.verifier.get_ordered_account_addresses(), + self.verifier.address_to_validator_index().clone(), )), Some(self.quorum_store_msg_tx.clone()), ) diff --git a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs index 2741ea3a6a912..96ab5414ab120 100644 --- a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs +++ b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs @@ -62,7 +62,7 @@ fn proof_of_store_with_size( fn test_proof_queue_sorting() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let author_0 = PeerId::random(); let author_1 = PeerId::random(); @@ -149,7 +149,7 @@ fn test_proof_queue_sorting() { fn test_proof_calculate_remaining_txns_and_proofs() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let now_in_secs = aptos_infallible::duration_since_epoch().as_secs() as u64; let now_in_usecs = aptos_infallible::duration_since_epoch().as_micros() as u64; let author_0 = PeerId::random(); @@ -409,7 +409,7 @@ fn test_proof_calculate_remaining_txns_and_proofs() { fn test_proof_pull_proofs_with_duplicates() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let now_in_secs = aptos_infallible::duration_since_epoch().as_secs() as u64; let now_in_usecs = now_in_secs * 1_000_000; let txns = vec![ @@ -660,7 +660,7 @@ fn test_proof_pull_proofs_with_duplicates() { fn test_proof_queue_soft_limit() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let author = PeerId::random(); @@ -702,7 +702,7 @@ fn test_proof_queue_soft_limit() { fn test_proof_queue_insert_after_commit() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let author = PeerId::random(); let author_batches = vec![ @@ -734,7 +734,7 @@ fn test_proof_queue_insert_after_commit() { fn test_proof_queue_pull_full_utilization() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024); - let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store); + let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); let author = PeerId::random(); let author_batches = vec![ diff --git a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs index 7f04c4abf71ca..aa90aa5f03546 100644 --- a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs +++ b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs @@ -35,11 +35,11 @@ async fn test_block_request_no_txns() { max_txns_after_filtering: 100, soft_max_txns_after_filtering: 100, max_inline_txns: PayloadTxnsSize::new(50, 500), - opt_batch_txns_pct: 0, return_non_full: true, filter: PayloadFilter::DirectMempool(vec![]), callback: consensus_callback, block_timestamp: aptos_infallible::duration_since_epoch(), + maybe_optqs_payload_pull_params: None, })) .unwrap(); diff --git a/consensus/src/quorum_store/tests/proof_manager_test.rs b/consensus/src/quorum_store/tests/proof_manager_test.rs index cf87abfecba84..3eebe4c667937 100644 --- a/consensus/src/quorum_store/tests/proof_manager_test.rs +++ b/consensus/src/quorum_store/tests/proof_manager_test.rs @@ -17,7 +17,7 @@ use std::{cmp::max, collections::HashSet}; fn create_proof_manager() -> ProofManager { let batch_store = batch_store_for_test(5 * 1024 * 1024); - ProofManager::new(PeerId::random(), 10, 10, batch_store, true, false) + ProofManager::new(PeerId::random(), 10, 10, batch_store, true, 1) } fn create_proof(author: PeerId, expiration: u64, batch_sequence: u64) -> ProofOfStore { @@ -62,8 +62,8 @@ async fn get_proposal( filter: PayloadFilter::InQuorumStore(filter_set), callback: callback_tx, block_timestamp: aptos_infallible::duration_since_epoch(), - opt_batch_txns_pct: 0, return_non_full: true, + maybe_optqs_payload_pull_params: None, }); proof_manager.handle_proposal_request(req); let GetPayloadResponse::GetPayloadResponse(payload) = callback_rx.await.unwrap().unwrap(); diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index fedfd10e30c53..3c8cdb6993159 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -16,6 +16,7 @@ use crate::{ error::{error_kind, VerifyError}, liveness::{ proposal_generator::ProposalGenerator, + proposal_status_tracker::TPastProposalStatusTracker, proposer_election::ProposerElection, round_state::{NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema}, unequivocal_proposer_election::UnequivocalProposerElection, @@ -267,6 +268,7 @@ pub struct RoundManager { futures: FuturesUnordered< Pin, Block, Instant)> + Send>>, >, + proposal_status_tracker: Arc, } impl RoundManager { @@ -286,6 +288,7 @@ impl RoundManager { randomness_config: OnChainRandomnessConfig, jwk_consensus_config: OnChainJWKConsensusConfig, fast_rand_config: Option, + proposal_status_tracker: Arc, ) -> Self { // when decoupled execution is false, // the counter is still static. @@ -316,6 +319,7 @@ impl RoundManager { pending_order_votes: PendingOrderVotes::new(), blocks_with_broadcasted_fast_shares: LruCache::new(5), futures: FuturesUnordered::new(), + proposal_status_tracker, } } @@ -356,7 +360,7 @@ impl RoundManager { NewRoundReason::QCReady => { counters::QC_ROUNDS_COUNT.inc(); }, - NewRoundReason::Timeout => { + NewRoundReason::Timeout(_) => { counters::TIMEOUT_ROUNDS_COUNT.inc(); }, }; @@ -367,6 +371,9 @@ impl RoundManager { self.pending_order_votes .garbage_collect(self.block_store.sync_info().highest_ordered_round()); + self.proposal_status_tracker + .push(new_round_event.reason.clone()); + if self .proposer_election .is_valid_proposer(self.proposal_generator.author(), new_round_event.round) @@ -405,10 +412,9 @@ impl RoundManager { safety_rules: Arc>, proposer_election: Arc, ) -> anyhow::Result<()> { - let epoch = epoch_state.epoch; Self::log_collected_vote_stats(epoch_state.clone(), &new_round_event); let proposal_msg = Self::generate_proposal( - epoch, + epoch_state.clone(), new_round_event, sync_info, network.clone(), @@ -514,9 +520,8 @@ impl RoundManager { &self, new_round_event: NewRoundEvent, ) -> anyhow::Result { - let epoch = self.epoch_state.epoch; Self::generate_proposal( - epoch, + self.epoch_state.clone(), new_round_event, self.block_store.sync_info(), self.network.clone(), @@ -528,7 +533,7 @@ impl RoundManager { } async fn generate_proposal( - epoch: u64, + epoch_state: Arc, new_round_event: NewRoundEvent, sync_info: SyncInfo, network: Arc, @@ -551,7 +556,11 @@ impl RoundManager { Block::new_proposal_from_block_data_and_signature(proposal, signature); observe_block(signed_proposal.timestamp_usecs(), BlockStage::SIGNED); info!( - Self::new_log_with_round_epoch(LogEvent::Propose, new_round_event.round, epoch), + Self::new_log_with_round_epoch( + LogEvent::Propose, + new_round_event.round, + epoch_state.epoch + ), "{}", signed_proposal ); Ok(ProposalMsg::new(signed_proposal, sync_info)) @@ -704,6 +713,23 @@ impl RoundManager { sync_or_not } + fn compute_timeout_reason(&self, round: Round) -> RoundTimeoutReason { + if self.round_state().vote_sent().is_some() { + return RoundTimeoutReason::NoQC; + } + + match self.block_store.get_block_for_round(round) { + None => RoundTimeoutReason::ProposalNotReceived, + Some(block) => { + if let Err(missing_authors) = self.block_store.check_payload(block.block()) { + RoundTimeoutReason::PayloadUnavailable { missing_authors } + } else { + RoundTimeoutReason::Unknown + } + }, + } + } + /// The replica broadcasts a "timeout vote message", which includes the round signature, which /// can be aggregated to a TimeoutCertificate. /// The timeout vote message can be one of the following three options: @@ -742,8 +768,8 @@ impl RoundManager { ) .context("[RoundManager] SafetyRules signs 2-chain timeout")?; - // TODO(ibalajiarun): placeholder, update with proper reason. - let timeout_reason = RoundTimeoutReason::Unknown; + let timeout_reason = self.compute_timeout_reason(round); + RoundTimeout::new( timeout, self.proposal_generator.author(), @@ -814,7 +840,11 @@ impl RoundManager { /// This function is called only after all the dependencies of the given QC have been retrieved. async fn process_certificates(&mut self) -> anyhow::Result<()> { let sync_info = self.block_store.sync_info(); - if let Some(new_round_event) = self.round_state.process_certificates(sync_info) { + let epoch_state = self.epoch_state.clone(); + if let Some(new_round_event) = self + .round_state + .process_certificates(sync_info, &epoch_state.verifier) + { self.process_new_round_event(new_round_event).await?; } Ok(()) @@ -941,16 +971,30 @@ impl RoundManager { observe_block(proposal.timestamp_usecs(), BlockStage::SYNCED); + // Since processing proposal is delayed due to backpressure or payload availability, we add + // the block to the block store so that we don't need to fetch it from remote once we + // are out of the backpressure. Please note that delayed processing of proposal is not + // guaranteed to add the block to the block store if we don't get out of the backpressure + // before the timeout, so this is needed to ensure that the proposed block is added to + // the block store irrespective. Also, it is possible that delayed processing of proposal + // tries to add the same block again, which is okay as `execute_and_insert_block` call + // is idempotent. + self.block_store + .insert_block(proposal.clone()) + .await + .context("[RoundManager] Failed to insert the block into BlockStore")?; + let block_store = self.block_store.clone(); - if !block_store.check_payload(&proposal) { + if block_store.check_payload(&proposal).is_err() { debug!("Payload not available locally for block: {}", proposal.id()); counters::CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY .with_label_values(&["missing"]) .inc(); let start_time = Instant::now(); + let deadline = self.round_state.current_round_deadline(); let future = async move { ( - block_store.wait_for_payload(&proposal).await, + block_store.wait_for_payload(&proposal, deadline).await, proposal, start_time, ) @@ -978,18 +1022,7 @@ impl RoundManager { if self.block_store.vote_back_pressure() { counters::CONSENSUS_WITHOLD_VOTE_BACKPRESSURE_TRIGGERED.observe(1.0); // In case of back pressure, we delay processing proposal. This is done by resending the - // same proposal to self after some time. Even if processing proposal is delayed, we add - // the block to the block store so that we don't need to fetch it from remote once we - // are out of the backpressure. Please note that delayed processing of proposal is not - // guaranteed to add the block to the block store if we don't get out of the backpressure - // before the timeout, so this is needed to ensure that the proposed block is added to - // the block store irrespective. Also, it is possible that delayed processing of proposal - // tries to add the same block again, which is okay as `execute_and_insert_block` call - // is idempotent. - self.block_store - .insert_block(proposal.clone()) - .await - .context("[RoundManager] Failed to execute_and_insert the block")?; + // same proposal to self after some time. Self::resend_verified_proposal_to_self( self.block_store.clone(), self.buffered_proposal_tx.clone(), @@ -1601,9 +1634,10 @@ impl RoundManager { /// To jump start new round with the current certificates we have. pub async fn init(&mut self, last_vote_sent: Option) { + let epoch_state = self.epoch_state.clone(); let new_round_event = self .round_state - .process_certificates(self.block_store.sync_info()) + .process_certificates(self.block_store.sync_info(), &epoch_state.verifier) .expect("Can not jump start a round_state from existing certificates."); if let Some(vote) = last_vote_sent { self.round_state.record_vote(vote); diff --git a/consensus/src/round_manager_fuzzing.rs b/consensus/src/round_manager_fuzzing.rs index 3132e644762c3..16e98b94a3f19 100644 --- a/consensus/src/round_manager_fuzzing.rs +++ b/consensus/src/round_manager_fuzzing.rs @@ -20,7 +20,9 @@ use crate::{ persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData}, pipeline::execution_client::DummyExecutionClient, round_manager::RoundManager, - test_utils::{MockPayloadManager, MockStorage}, + test_utils::{ + MockOptQSPayloadProvider, MockPastProposalStatusTracker, MockPayloadManager, MockStorage, + }, util::{mock_time_service::SimulatedTimeService, time_service::TimeService}, }; use aptos_channels::{self, aptos_channel, message_queues::QueueStyle}; @@ -180,6 +182,7 @@ fn create_node_for_fuzzing() -> RoundManager { false, ValidatorTxnConfig::default_disabled(), true, + Arc::new(MockOptQSPayloadProvider {}), ); // @@ -209,6 +212,7 @@ fn create_node_for_fuzzing() -> RoundManager { OnChainRandomnessConfig::default_enabled(), OnChainJWKConsensusConfig::default_enabled(), None, + Arc::new(MockPastProposalStatusTracker {}), ) } diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index b17787865dd9e..29716947991bd 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -23,8 +23,8 @@ use crate::{ round_manager::RoundManager, test_utils::{ consensus_runtime, create_vec_signed_transactions, - mock_execution_client::MockExecutionClient, timed_block_on, MockPayloadManager, - MockStorage, TreeInserter, + mock_execution_client::MockExecutionClient, timed_block_on, MockOptQSPayloadProvider, + MockPastProposalStatusTracker, MockPayloadManager, MockStorage, TreeInserter, }, util::time_service::{ClockTimeService, TimeService}, }; @@ -305,6 +305,7 @@ impl NodeSetup { false, onchain_consensus_config.effective_validator_txn_config(), true, + Arc::new(MockOptQSPayloadProvider {}), ); let round_state = Self::create_round_state(time_service); @@ -332,6 +333,7 @@ impl NodeSetup { onchain_randomness_config.clone(), onchain_jwk_consensus_config.clone(), None, + Arc::new(MockPastProposalStatusTracker {}), ); block_on(round_manager.init(last_vote_sent)); Self { @@ -995,13 +997,14 @@ fn sync_info_carried_on_timeout_vote() { .insert_single_quorum_cert(block_0_quorum_cert.clone()) .unwrap(); - node.round_manager - .round_state - .process_certificates(SyncInfo::new( + node.round_manager.round_state.process_certificates( + SyncInfo::new( block_0_quorum_cert.clone(), block_0_quorum_cert.into_wrapped_ledger_info(), None, - )); + ), + &generate_validator_verifier(&[node.signer.clone()]), + ); node.round_manager .process_local_timeout(2) .await diff --git a/consensus/src/test_utils/mock_payload_manager.rs b/consensus/src/test_utils/mock_payload_manager.rs index e62ec85b1ea9a..cfac0aaa458a3 100644 --- a/consensus/src/test_utils/mock_payload_manager.rs +++ b/consensus/src/test_utils/mock_payload_manager.rs @@ -3,13 +3,12 @@ use crate::{ error::QuorumStoreError, - payload_client::{ - user::quorum_store_client::QuorumStoreClient, PayloadClient, PayloadPullParameters, - }, + payload_client::{user::quorum_store_client::QuorumStoreClient, PayloadClient}, }; use anyhow::Result; use aptos_consensus_types::{ - block::block_test_utils::random_payload, common::Payload, request_response::GetPayloadCommand, + block::block_test_utils::random_payload, common::Payload, + payload_pull_params::PayloadPullParameters, request_response::GetPayloadCommand, }; use aptos_types::{ transaction::{ExecutionStatus, TransactionStatus}, diff --git a/consensus/src/test_utils/mod.rs b/consensus/src/test_utils/mod.rs index b556d9bfc7ed8..5744a1ab0b090 100644 --- a/consensus/src/test_utils/mod.rs +++ b/consensus/src/test_utils/mod.rs @@ -5,11 +5,16 @@ #![allow(clippy::unwrap_used)] use crate::{ block_storage::{BlockReader, BlockStore}, + liveness::{ + proposal_status_tracker::{TOptQSPullParamsProvider, TPastProposalStatusTracker}, + round_state::NewRoundReason, + }, payload_manager::DirectMempoolPayloadManager, }; use aptos_consensus_types::{ block::{block_test_utils::certificate_for_genesis, Block}, common::{Author, Round}, + payload_pull_params::OptQSPayloadPullParams, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, sync_info::SyncInfo, @@ -270,3 +275,17 @@ pub(crate) fn create_vec_signed_transactions_with_gas( .map(|_| create_signed_transaction(gas_unit_price)) .collect() } + +pub struct MockOptQSPayloadProvider {} + +impl TOptQSPullParamsProvider for MockOptQSPayloadProvider { + fn get_params(&self) -> Option { + None + } +} + +pub struct MockPastProposalStatusTracker {} + +impl TPastProposalStatusTracker for MockPastProposalStatusTracker { + fn push(&self, _status: NewRoundReason) {} +} diff --git a/crates/aptos-collections/src/bounded_vec_deque.rs b/crates/aptos-collections/src/bounded_vec_deque.rs index 6435b7371f331..5da8bdd16e047 100644 --- a/crates/aptos-collections/src/bounded_vec_deque.rs +++ b/crates/aptos-collections/src/bounded_vec_deque.rs @@ -52,6 +52,14 @@ impl BoundedVecDeque { pub fn iter(&self) -> Iter<'_, T> { self.inner.iter() } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl IntoIterator for BoundedVecDeque { diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 1e81da1a2692b..f9bb0aaf27da6 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -854,6 +854,13 @@ RoundTimeoutReason: Unknown: UNIT 1: ProposalNotReceived: UNIT + 2: + PayloadUnavailable: + STRUCT: + - missing_authors: + TYPENAME: BitVec + 3: + NoQC: UNIT Script: STRUCT: - code: BYTES diff --git a/types/src/validator_verifier.rs b/types/src/validator_verifier.rs index 95ebfa637e16f..50aa7b69c4767 100644 --- a/types/src/validator_verifier.rs +++ b/types/src/validator_verifier.rs @@ -627,6 +627,25 @@ pub fn random_validator_verifier( count: usize, custom_voting_power_quorum: Option, pseudo_random_account_address: bool, +) -> (Vec, ValidatorVerifier) { + random_validator_verifier_with_voting_power( + count, + custom_voting_power_quorum, + pseudo_random_account_address, + &[], + ) +} + +/// Helper function to get random validator signers and a corresponding validator verifier for +/// testing. If custom_voting_power_quorum is not None, set a custom voting power quorum amount. +/// With pseudo_random_account_address enabled, logs show `0 -> [0000]`, `1 -> [1000]` +/// `voting_power` is optional in that if it's empty then a voting power of 1 is used. +#[cfg(any(test, feature = "fuzzing"))] +pub fn random_validator_verifier_with_voting_power( + count: usize, + custom_voting_power_quorum: Option, + pseudo_random_account_address: bool, + voting_power: &[u64], ) -> (Vec, ValidatorVerifier) { let mut signers = Vec::new(); let mut validator_infos = vec![]; @@ -639,7 +658,7 @@ pub fn random_validator_verifier( validator_infos.push(ValidatorConsensusInfo::new( random_signer.author(), random_signer.public_key(), - 1, + *voting_power.get(i).unwrap_or(&1), )); signers.push(random_signer); }