Skip to content

Commit

Permalink
[pipeline] switch from broadcast channel to shared future
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and zekun000 committed Dec 3, 2024
1 parent 9afd783 commit 1b0fb83
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
8 changes: 4 additions & 4 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ impl PipelineFutures {
pub struct PipelineInputTx {
pub rand_tx: Option<oneshot::Sender<Option<Randomness>>>,
pub order_vote_tx: Option<oneshot::Sender<()>>,
pub order_proof_tx: tokio::sync::broadcast::Sender<()>,
pub commit_proof_tx: tokio::sync::broadcast::Sender<LedgerInfoWithSignatures>,
pub order_proof_tx: Option<oneshot::Sender<()>>,
pub commit_proof_tx: Option<oneshot::Sender<LedgerInfoWithSignatures>>,
}

pub struct PipelineInputRx {
pub rand_rx: oneshot::Receiver<Option<Randomness>>,
pub order_vote_rx: oneshot::Receiver<()>,
pub order_proof_rx: tokio::sync::broadcast::Receiver<()>,
pub commit_proof_rx: tokio::sync::broadcast::Receiver<LedgerInfoWithSignatures>,
pub order_proof_fut: TaskFuture<()>,
pub commit_proof_fut: TaskFuture<LedgerInfoWithSignatures>,
}

/// A representation of a block that has been added to the execution pipeline. It might either be in ordered
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/execution_schedule_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl StatelessPipeline for ExecutionSchedulePhase {
for b in &ordered_blocks {
if let Some(tx) = b.pipeline_tx().lock().as_mut() {
tx.rand_tx.take().map(|tx| tx.send(b.randomness().cloned()));
let _ = tx.order_proof_tx.send(());
tx.order_proof_tx.take().map(|tx| tx.send(()));
}
}

Expand Down
4 changes: 3 additions & 1 deletion consensus/src/pipeline/persisting_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ impl StatelessPipeline for PersistingPhase {
{
for b in &blocks {
if let Some(tx) = b.pipeline_tx().lock().as_mut() {
let _ = tx.commit_proof_tx.send(commit_ledger_info.clone());
tx.commit_proof_tx
.take()
.map(|tx| tx.send(commit_ledger_info.clone()));
}
b.wait_for_commit_ledger().await;
}
Expand Down
80 changes: 43 additions & 37 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,39 @@ impl PipelineBuilder {
}
}

fn channel() -> (PipelineInputTx, PipelineInputRx) {
fn channel(abort_handles: &mut Vec<AbortHandle>) -> (PipelineInputTx, PipelineInputRx) {
let (rand_tx, rand_rx) = oneshot::channel();
let (order_vote_tx, order_vote_rx) = oneshot::channel();
let (order_proof_tx, order_proof_rx) = tokio::sync::broadcast::channel(1);
let (commit_proof_tx, commit_proof_rx) = tokio::sync::broadcast::channel(1);
let (order_proof_tx, order_proof_fut) = oneshot::channel();
let (commit_proof_tx, commit_proof_fut) = oneshot::channel();
let order_proof_fut = spawn_shared_fut(
async move {
order_proof_fut
.await
.map_err(|_| TaskError::from(anyhow!("order proof tx cancelled")))
},
abort_handles,
);
let commit_proof_fut = spawn_shared_fut(
async move {
commit_proof_fut
.await
.map_err(|_| TaskError::from(anyhow!("commit proof tx cancelled")))
},
abort_handles,
);
(
PipelineInputTx {
rand_tx: Some(rand_tx),
order_vote_tx: Some(order_vote_tx),
order_proof_tx,
commit_proof_tx,
order_proof_tx: Some(order_proof_tx),
commit_proof_tx: Some(commit_proof_tx),
},
PipelineInputRx {
rand_rx,
order_vote_rx,
order_proof_rx,
commit_proof_rx,
order_proof_fut,
commit_proof_fut,
},
)
}
Expand Down Expand Up @@ -242,16 +258,15 @@ impl PipelineBuilder {
block: Arc<Block>,
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
) -> (PipelineFutures, PipelineInputTx, Vec<AbortHandle>) {
let (tx, rx) = Self::channel();
let mut abort_handles = vec![];
let (tx, rx) = Self::channel(&mut abort_handles);
let PipelineInputRx {
rand_rx,
order_vote_rx,
order_proof_rx,
commit_proof_rx,
order_proof_fut,
commit_proof_fut,
} = rx;

let mut abort_handles = vec![];

let prepare_fut = spawn_shared_fut(
Self::prepare(self.block_preparer.clone(), block.clone()),
&mut abort_handles,
Expand Down Expand Up @@ -282,8 +297,8 @@ impl PipelineBuilder {
Self::sign_commit_vote(
ledger_update_fut.clone(),
order_vote_rx,
order_proof_rx.resubscribe(),
commit_proof_rx.resubscribe(),
order_proof_fut.clone(),
commit_proof_fut.clone(),
self.signer.clone(),
block.clone(),
),
Expand All @@ -293,8 +308,8 @@ impl PipelineBuilder {
Self::pre_commit(
ledger_update_fut.clone(),
parent.pre_commit_fut.clone(),
order_proof_rx,
commit_proof_rx.resubscribe(),
order_proof_fut,
commit_proof_fut.clone(),
self.executor.clone(),
block.clone(),
),
Expand All @@ -303,7 +318,7 @@ impl PipelineBuilder {
let commit_ledger_fut = spawn_shared_fut(
Self::commit_ledger(
pre_commit_fut.clone(),
commit_proof_rx,
commit_proof_fut,
parent.commit_ledger_fut.clone(),
self.executor.clone(),
block.clone(),
Expand Down Expand Up @@ -530,19 +545,19 @@ impl PipelineBuilder {
async fn sign_commit_vote(
ledger_update_phase: TaskFuture<LedgerUpdateResult>,
order_vote_rx: oneshot::Receiver<()>,
mut order_proof_rx: tokio::sync::broadcast::Receiver<()>,
mut commit_proof_rx: tokio::sync::broadcast::Receiver<LedgerInfoWithSignatures>,
order_proof_fut: TaskFuture<()>,
commit_proof_fut: TaskFuture<LedgerInfoWithSignatures>,
signer: Arc<ValidatorSigner>,
block: Arc<Block>,
) -> TaskResult<CommitVoteResult> {
let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?;
// either order_vote_rx or order_proof_rx can trigger the next phase
// either order_vote_rx or order_proof_fut can trigger the next phase
select! {
Ok(_) = order_vote_rx => {
}
Ok(_) = order_proof_rx.recv() => {
Ok(_) = order_proof_fut => {
}
Ok(_) = commit_proof_rx.recv() => {
Ok(_) = commit_proof_fut => {
}
else => {
return Err(anyhow!("all receivers dropped"))?;
Expand Down Expand Up @@ -580,24 +595,18 @@ impl PipelineBuilder {
ledger_update_phase: TaskFuture<LedgerUpdateResult>,
// TODO bound parent_commit_ledger too
parent_block_pre_commit_phase: TaskFuture<PreCommitResult>,
mut order_proof_rx: tokio::sync::broadcast::Receiver<()>,
mut commit_proof_rx: tokio::sync::broadcast::Receiver<LedgerInfoWithSignatures>,
order_proof_fut: TaskFuture<()>,
commit_proof_fut: TaskFuture<LedgerInfoWithSignatures>,
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<PreCommitResult> {
let (compute_result, _, _) = ledger_update_phase.await?;
parent_block_pre_commit_phase.await?;

order_proof_rx
.recv()
.await
.map_err(|_| anyhow!("order proof tx cancelled"))?;
order_proof_fut.await?;

if compute_result.has_reconfiguration() {
commit_proof_rx
.recv()
.await
.map_err(|_| anyhow!("commit proof tx cancelled"))?;
commit_proof_fut.await?;
}

let _tracker = Tracker::new("pre_commit", &block);
Expand Down Expand Up @@ -644,17 +653,14 @@ impl PipelineBuilder {
/// What it does: Commit the ledger info to storage, this makes the data visible for clients
async fn commit_ledger(
pre_commit_fut: TaskFuture<PreCommitResult>,
mut commit_proof_rx: tokio::sync::broadcast::Receiver<LedgerInfoWithSignatures>,
commit_proof_fut: TaskFuture<LedgerInfoWithSignatures>,
parent_block_commit_phase: TaskFuture<CommitLedgerResult>,
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<CommitLedgerResult> {
parent_block_commit_phase.await?;
pre_commit_fut.await?;
let ledger_info_with_sigs = commit_proof_rx
.recv()
.await
.map_err(|_| anyhow!("commit rx cancelled"))?;
let ledger_info_with_sigs = commit_proof_fut.await?;

// it's committed as prefix
if ledger_info_with_sigs.commit_info().id() != block.id() {
Expand Down

0 comments on commit 1b0fb83

Please sign in to comment.