Skip to content

Commit

Permalink
[dag] Smoke tests and related fixes (aptos-labs#11977)
Browse files Browse the repository at this point in the history
* [dag][bugfix] only fetch nodes in window
* [dag] try order all nodes after a fetch
* [dag] smoke tests
  • Loading branch information
ibalajiarun authored Feb 23, 2024
1 parent 271bdab commit ae70232
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 22 deletions.
5 changes: 2 additions & 3 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ impl OrderedNotifier for OrderedNotifierAdapter {

NUM_NODES_PER_BLOCK.observe(ordered_nodes.len() as f64);
let rounds_between = {
let anchor_node = ordered_nodes.first().map_or(0, |node| node.round());
let lowest_round_node = ordered_nodes.last().map_or(0, |node| node.round());
anchor_node.saturating_sub(lowest_round_node)
let lowest_round_node = ordered_nodes.first().map_or(0, |node| node.round());
round.saturating_sub(lowest_round_node)
};
NUM_ROUNDS_PER_BLOCK.observe((rounds_between + 1) as f64);

Expand Down
4 changes: 4 additions & 0 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ impl DagDriver {
observe_round(prev_round_timestamp, RoundStage::Finished);
}
}

pub fn try_order_all(&self) {
self.order_rule.lock().process_all();
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl DagFetcherService {
let remote_request = {
let dag_reader = self.dag.read();
ensure!(
node.round() > dag_reader.lowest_incomplete_round(),
node.round() >= dag_reader.lowest_incomplete_round(),
"Already synced beyond requested round {}, lowest incomplete round {}",
node.round(),
dag_reader.lowest_incomplete_round()
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ impl NetworkHandler {
Ok(certified_node) => {
if let Err(e) = dag_driver_clone.process(certified_node).await {
warn!(error = ?e, "error processing certified node fetch notification");
} else {
dag_driver_clone.try_order_all();
}
},
Err(e) => {
Expand Down
16 changes: 11 additions & 5 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,26 @@ impl InMemDag {
}

pub fn bitmask(&self, target_round: Round) -> DagSnapshotBitmask {
let lowest_round = self.lowest_incomplete_round();

let from_round = if self.is_empty() {
self.lowest_round()
} else {
target_round
.saturating_sub(self.window_size)
.max(self.lowest_incomplete_round())
.max(self.lowest_round())
};
let mut bitmask: Vec<_> = self
.nodes_by_round
.range(lowest_round..=target_round)
.range(from_round..=target_round)
.map(|(_, round_nodes)| round_nodes.iter().map(|node| node.is_some()).collect())
.collect();

bitmask.resize(
(target_round - lowest_round + 1) as usize,
(target_round - from_round + 1) as usize,
vec![false; self.author_to_index.len()],
);

DagSnapshotBitmask::new(lowest_round, bitmask)
DagSnapshotBitmask::new(from_round, bitmask)
}

pub(super) fn prune(&mut self) -> BTreeMap<u64, Vec<Option<NodeStatus>>> {
Expand Down
12 changes: 11 additions & 1 deletion consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ impl OrderRule {
{
return Some(anchor_node.clone());
}
} else {
debug!(
anchor = anchor_author,
"Anchor not found for round {}", start_round
);
}
start_round += 2;
}
Expand Down Expand Up @@ -218,8 +223,13 @@ impl OrderRule {
/// Check if this node can trigger anchors to be ordered
pub fn process_new_node(&self, node_metadata: &NodeMetadata) {
let lowest_unordered_anchor_round = *self.lowest_unordered_anchor_round.read();

let round = node_metadata.round();

debug!(
lowest_unordered_round = lowest_unordered_anchor_round,
node_round = round,
"Trigger Ordering"
);
// If the node comes from the proposal round in the current instance, it can't trigger any ordering
if round <= lowest_unordered_anchor_round
|| Self::check_parity(round, lowest_unordered_anchor_round)
Expand Down
24 changes: 15 additions & 9 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,42 @@ fn test_dag_bitmask() {
let (signers, epoch_state, dag, _) = setup();

assert_eq!(
dag.read().bitmask(15),
DagSnapshotBitmask::new(1, vec![vec![false; 4]; 15])
dag.read().bitmask(TEST_DAG_WINDOW),
DagSnapshotBitmask::new(1, vec![vec![false; 4]; TEST_DAG_WINDOW as usize])
);

for round in 1..5 {
let parents = dag
.read()
.get_strong_links_for_round(round, &epoch_state.verifier)
.get_strong_links_for_round(round - 1, &epoch_state.verifier)
.unwrap_or_default();
if round > 1 {
assert!(!parents.is_empty());
}
for signer in &signers[0..3] {
let node = new_certified_node(round, signer.author(), parents.clone());
assert!(dag.write().add_node_for_test(node).is_ok());
}
}
let mut bitmask = vec![vec![true, true, true, false]; 4];
bitmask.resize(15, vec![false; 4]);
assert_eq!(dag.read().bitmask(15), DagSnapshotBitmask::new(1, bitmask));
let mut bitmask = vec![vec![true, true, true, false]; 2];
bitmask.resize(TEST_DAG_WINDOW as usize + 1, vec![false; 4]);
assert_eq!(dag.read().bitmask(8), DagSnapshotBitmask::new(3, bitmask));

// Populate the fourth author for all rounds
for round in 1..5 {
let parents = dag
.read()
.get_strong_links_for_round(round, &epoch_state.verifier)
.get_strong_links_for_round(round - 1, &epoch_state.verifier)
.unwrap_or_default();
if round > 1 {
assert!(!parents.is_empty());
}
let node = new_certified_node(round, signers[3].author(), parents.clone());
assert!(dag.write().add_node_for_test(node).is_ok());
}
assert_eq!(
dag.read().bitmask(15),
DagSnapshotBitmask::new(5, vec![vec![false; 4]; 11])
dag.read().bitmask(10),
DagSnapshotBitmask::new(5, vec![vec![false; 4]; 6])
);
assert_eq!(
dag.read().bitmask(6),
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ impl NetworkSender {
msg: ConsensusMsg,
timeout_duration: Duration,
) -> anyhow::Result<ConsensusMsg> {
fail_point!("consensus::send::any", |_| {
Err(anyhow::anyhow!("Injected error in send_rpc"))
});
counters::CONSENSUS_SENT_MSGS
.with_label_values(&[msg.name()])
.inc();
Expand Down
10 changes: 7 additions & 3 deletions testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub async fn create_swarm(num_nodes: usize, max_block_txns: u64) -> LocalSwarm {
swarm
}

struct ActiveTrafficGuard {
finish_traffic: Arc<AtomicBool>,
pub struct ActiveTrafficGuard {
pub finish_traffic: Arc<AtomicBool>,
}

impl Drop for ActiveTrafficGuard {
Expand All @@ -70,7 +70,11 @@ impl Drop for ActiveTrafficGuard {
}
}

async fn start_traffic(num_accounts: usize, tps: f32, swarm: &mut dyn Swarm) -> ActiveTrafficGuard {
pub async fn start_traffic(
num_accounts: usize,
tps: f32,
swarm: &mut dyn Swarm,
) -> ActiveTrafficGuard {
let validator_clients = swarm.get_all_nodes_clients_with_names();

let finish = Arc::new(AtomicBool::new(false));
Expand Down
Loading

0 comments on commit ae70232

Please sign in to comment.