Skip to content

Commit

Permalink
[Fastpath] execute transactions certified via fastpath (#19638)
Browse files Browse the repository at this point in the history
## Description 

- Add support to execute certified transactions from consensus output.
- Refactor how consensus commit handler processes blocks from consensus
commits, so the logic can be reused for consensus transaction handler.
- Small refactors in CommitConsumer and DagState.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mwtian authored Oct 8, 2024
1 parent a7863b7 commit ace69fa
Show file tree
Hide file tree
Showing 25 changed files with 677 additions and 273 deletions.
10 changes: 4 additions & 6 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ mod tests {
use std::{collections::BTreeSet, sync::Arc, time::Duration};

use consensus_config::{local_committee_and_keys, Parameters};
use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver};
use mysten_metrics::monitored_mpsc::UnboundedReceiver;
use prometheus::Registry;
use rstest::rstest;
use sui_protocol_config::ProtocolConfig;
Expand Down Expand Up @@ -457,8 +457,7 @@ mod tests {
let protocol_keypair = keypairs[own_index].1.clone();
let network_keypair = keypairs[own_index].0.clone();

let (sender, _receiver) = unbounded_channel("consensus_output");
let commit_consumer = CommitConsumer::new(sender, 0);
let (commit_consumer, _, _) = CommitConsumer::new(0);

let authority = ConsensusAuthority::start(
network_type,
Expand Down Expand Up @@ -735,8 +734,7 @@ mod tests {
let protocol_keypair = keypairs[index].1.clone();
let network_keypair = keypairs[index].0.clone();

let (sender, receiver) = unbounded_channel("consensus_output");
let commit_consumer = CommitConsumer::new(sender, 0);
let (commit_consumer, commit_receiver, _) = CommitConsumer::new(0);

let authority = ConsensusAuthority::start(
network_type,
Expand All @@ -753,6 +751,6 @@ mod tests {
)
.await;

(authority, receiver)
(authority, commit_receiver)
}
}
10 changes: 10 additions & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
block::{BlockAPI, BlockRef, BlockTimestampMs, Round, Slot, VerifiedBlock},
leader_scoring::ReputationScores,
storage::Store,
TransactionIndex,
};

/// Index of a commit among all consensus commits.
Expand Down Expand Up @@ -109,6 +110,7 @@ pub(crate) struct CommitV1 {
leader: BlockRef,
/// Refs to committed blocks, in the commit order.
blocks: Vec<BlockRef>,
// TODO(fastpath): record rejected transactions.
}

impl CommitAPI for CommitV1 {
Expand Down Expand Up @@ -293,6 +295,8 @@ pub struct CommittedSubDag {
pub leader: BlockRef,
/// All the committed blocks that are part of this sub-dag
pub blocks: Vec<VerifiedBlock>,
/// Indices of rejected transactions in each block.
pub rejected_transactions_by_block: Vec<Vec<TransactionIndex>>,
/// The timestamp of the commit, obtained from the timestamp of the leader block.
pub timestamp_ms: BlockTimestampMs,
/// The reference of the commit.
Expand All @@ -309,13 +313,16 @@ impl CommittedSubDag {
pub fn new(
leader: BlockRef,
blocks: Vec<VerifiedBlock>,
rejected_transactions_by_block: Vec<Vec<TransactionIndex>>,
timestamp_ms: BlockTimestampMs,
commit_ref: CommitRef,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> Self {
assert_eq!(blocks.len(), rejected_transactions_by_block.len());
Self {
leader,
blocks,
rejected_transactions_by_block,
timestamp_ms,
commit_ref,
reputation_scores_desc,
Expand Down Expand Up @@ -386,11 +393,14 @@ pub fn load_committed_subdag_from_store(
commit_block
})
.collect::<Vec<_>>();
// TODO(fastpath): recover rejected transaction indices from commit.
let rejected_transactions = vec![vec![]; blocks.len()];
let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
let leader_block_ref = blocks[leader_block_idx].reference();
CommittedSubDag::new(
leader_block_ref,
blocks,
rejected_transactions,
commit.timestamp_ms(),
commit.reference(),
reputation_scores_desc,
Expand Down
40 changes: 29 additions & 11 deletions consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
use std::sync::{Arc, RwLock};
use tokio::sync::watch;

use mysten_metrics::monitored_mpsc::UnboundedSender;
use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use crate::{CommitIndex, CommittedSubDag};
use crate::{CommitIndex, CommittedSubDag, TransactionIndex, VerifiedBlock};

#[derive(Clone)]
pub struct CommitConsumer {
// A channel to send the committed sub dags through
pub(crate) sender: UnboundedSender<CommittedSubDag>,
// A channel to output the committed sub dags.
pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
// A channel to output certified and rejected transactions by batches of blocks.
// Each tuple contains the block containing transactions, and indices of rejected transactions.
// In each block, transactions that are not rejected are considered certified.
// Batches of blocks are sent together, to improve efficiency.
#[allow(unused)]
pub(crate) transaction_sender: UnboundedSender<Vec<(VerifiedBlock, Vec<TransactionIndex>)>>,
// Index of the last commit that the consumer has processed. This is useful for
// crash/recovery so mysticeti can replay the commits from the next index.
// First commit in the replayed sequence will have index last_processed_commit_index + 1.
Expand All @@ -22,15 +29,26 @@ pub struct CommitConsumer {

impl CommitConsumer {
pub fn new(
sender: UnboundedSender<CommittedSubDag>,
last_processed_commit_index: CommitIndex,
) -> Self {
) -> (
Self,
UnboundedReceiver<CommittedSubDag>,
UnboundedReceiver<Vec<(VerifiedBlock, Vec<TransactionIndex>)>>,
) {
let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
let (transaction_sender, transaction_receiver) = unbounded_channel("consensus_certified");

let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
Self {
sender,
last_processed_commit_index,
monitor,
}
(
Self {
commit_sender,
transaction_sender,
last_processed_commit_index,
monitor,
},
commit_receiver,
transaction_receiver,
)
}

pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
Expand Down
50 changes: 28 additions & 22 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl CommitObserver {
let mut observer = Self {
context,
commit_interpreter: Linearizer::new(dag_state.clone(), leader_schedule.clone()),
sender: commit_consumer.sender,
sender: commit_consumer.commit_sender,
store,
leader_schedule,
};
Expand Down Expand Up @@ -207,7 +207,7 @@ impl CommitObserver {

#[cfg(test)]
mod tests {
use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver};
use mysten_metrics::monitored_mpsc::UnboundedReceiver;
use parking_lot::RwLock;

use super::*;
Expand All @@ -227,7 +227,8 @@ mod tests {
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel("consensus_output");
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);

let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
Expand All @@ -236,7 +237,7 @@ mod tests {

let mut observer = CommitObserver::new(
context.clone(),
CommitConsumer::new(sender, last_processed_commit_index),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
Expand Down Expand Up @@ -289,7 +290,7 @@ mod tests {

// Check commits sent over consensus output channel is accurate
let mut processed_subdag_index = 0;
while let Ok(subdag) = receiver.try_recv() {
while let Ok(subdag) = commit_receiver.try_recv() {
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
Expand All @@ -299,7 +300,7 @@ mod tests {
}
assert_eq!(processed_subdag_index, leaders.len());

verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);

// Check commits have been persisted to storage
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
Expand All @@ -326,16 +327,16 @@ mod tests {
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel("consensus_output");

let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));

let mut observer = CommitObserver::new(
context.clone(),
CommitConsumer::new(sender.clone(), last_processed_commit_index),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule.clone(),
Expand Down Expand Up @@ -370,7 +371,7 @@ mod tests {

// Check commits sent over consensus output channel is accurate
let mut processed_subdag_index = 0;
while let Ok(subdag) = receiver.try_recv() {
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
Expand All @@ -381,7 +382,7 @@ mod tests {
}
assert_eq!(processed_subdag_index, expected_last_processed_index);

verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);

// Check last stored commit is correct
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
Expand All @@ -406,7 +407,7 @@ mod tests {
);

let expected_last_sent_index = num_rounds as usize;
while let Ok(subdag) = receiver.try_recv() {
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("{subdag} was sent but not processed by consumer");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
Expand All @@ -417,7 +418,7 @@ mod tests {
}
assert_eq!(processed_subdag_index, expected_last_sent_index);

verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);

// Check last stored commit is correct. We should persist the last commit
// that was sent over the channel regardless of how the consumer handled
Expand All @@ -427,9 +428,11 @@ mod tests {

// Re-create commit observer starting from index 2 which represents the
// last processed index from the consumer over consensus output channel
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(expected_last_processed_index as CommitIndex);
let _observer = CommitObserver::new(
context.clone(),
CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
Expand All @@ -438,7 +441,7 @@ mod tests {
// Check commits sent over consensus output channel is accurate starting
// from last processed index of 2 and finishing at last sent index of 3.
processed_subdag_index = expected_last_processed_index;
while let Ok(subdag) = receiver.try_recv() {
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag} on resubmission");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
Expand All @@ -449,7 +452,7 @@ mod tests {
}
assert_eq!(processed_subdag_index, expected_last_sent_index);

verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);
}

#[tokio::test]
Expand All @@ -463,7 +466,8 @@ mod tests {
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel("consensus_output");
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);

let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
Expand All @@ -472,7 +476,7 @@ mod tests {

let mut observer = CommitObserver::new(
context.clone(),
CommitConsumer::new(sender.clone(), last_processed_commit_index),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule.clone(),
Expand All @@ -499,7 +503,7 @@ mod tests {

// Check commits sent over consensus output channel is accurate
let mut processed_subdag_index = 0;
while let Ok(subdag) = receiver.try_recv() {
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
Expand All @@ -510,7 +514,7 @@ mod tests {
}
assert_eq!(processed_subdag_index, expected_last_processed_index);

verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);

// Check last stored commit is correct
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
Expand All @@ -521,17 +525,19 @@ mod tests {

// Re-create commit observer starting from index 3 which represents the
// last processed index from the consumer over consensus output channel
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(expected_last_processed_index as CommitIndex);
let _observer = CommitObserver::new(
context.clone(),
CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
);

// No commits should be resubmitted as consensus store's last commit index
// is equal to last processed index by consumer
verify_channel_empty(&mut receiver);
verify_channel_empty(&mut commit_receiver);
}

/// After receiving all expected subdags, ensure channel is empty
Expand Down
Loading

0 comments on commit ace69fa

Please sign in to comment.