diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index ced1903f767a..e262b07e94f5 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -1,10 +1,12 @@ use crate::{Storage, TaskArgs}; +use futures_util::{future::BoxFuture, FutureExt}; use reth_chainspec::ChainSpec; use reth_evm::execute::BlockExecutorProvider; use reth_primitives::IntoRecoveredTransaction; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; use std::{ + collections::VecDeque, future::Future, pin::Pin, sync::Arc, @@ -19,8 +21,16 @@ pub struct ProposerTask { chain_spec: Arc, /// The client used to interact with the state provider: Provider, + /// Single active future that inserts a new block into `storage` + insert_task: Option>, /// Pool where transactions are stored pool: Pool, + /// backlog of sets of transactions ready to be mined + #[allow(clippy::type_complexity)] + queued: VecDeque<( + TaskArgs, + Vec::Transaction>>>, + )>, /// The type used for block execution block_executor: Executor, trigger_args_rx: UnboundedReceiver, @@ -38,7 +48,15 @@ impl ProposerTask, ) -> Self { - Self { chain_spec, provider, pool, block_executor, trigger_args_rx } + Self { + chain_spec, + provider, + insert_task: None, + pool, + queued: Default::default(), + block_executor, + trigger_args_rx, + } } } @@ -56,32 +74,50 @@ where // this drives block production and loop { - match this.trigger_args_rx.poll_recv(cx) { - Poll::Pending => return Poll::Pending, + if let Some(trigger_args) = match this.trigger_args_rx.poll_recv(cx) { + Poll::Ready(Some(args)) => Some(args), Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(args)) => { - let mut best_txs = this.pool.best_transactions(); - best_txs.skip_blobs(); - debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); - let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs - .filter(|tx| { - tx.effective_tip_per_gas(args.base_fee) - .map_or(false, |tip| tip >= args.min_tip as u128) - }) - .partition(|tx| { - args.local_accounts - .as_ref() - .map(|local_accounts| local_accounts.contains(&tx.sender())) - .unwrap_or_default() - }); - local_txs.extend(remote_txs); - debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); + _ => None, + } { + let mut best_txs = this.pool.best_transactions(); + best_txs.skip_blobs(); + debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions"); + let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs + .filter(|tx| { + tx.effective_tip_per_gas(trigger_args.base_fee) + .map_or(false, |tip| tip >= trigger_args.min_tip as u128) + }) + .partition(|tx| { + trigger_args + .local_accounts + .as_ref() + .map(|local_accounts| local_accounts.contains(&tx.sender())) + .unwrap_or_default() + }); + local_txs.extend(remote_txs); + debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); + + // miner returned a set of transaction that we feed to the producer + this.queued.push_back((trigger_args, local_txs)); + }; + + if this.insert_task.is_none() { + if this.queued.is_empty() { + // nothing to insert + break; + } + + // ready to queue in new insert task; + let (trigger_args, txs) = this.queued.pop_front().expect("not empty"); - let client = this.provider.clone(); - let chain_spec = Arc::clone(&this.chain_spec); - let executor = this.block_executor.clone(); + let client = this.provider.clone(); + let chain_spec = Arc::clone(&this.chain_spec); + let executor = this.block_executor.clone(); - let txs: Vec<_> = local_txs + // Create the mining future that creates a block, notifies the engine that drives + // the pipeline + this.insert_task = Some(Box::pin(async move { + let txs: Vec<_> = txs .into_iter() .map(|tx| tx.to_recovered_transaction().into_signed()) .collect(); @@ -95,8 +131,7 @@ where max_transactions_lists, base_fee, .. - } = args; - + } = trigger_args; let res = Storage::build_and_execute( txs.clone(), ommers, @@ -110,9 +145,21 @@ where base_fee, ); let _ = tx.send(res); + })); + } + + if let Some(mut fut) = this.insert_task.take() { + match fut.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + this.insert_task = Some(fut); + break; + } } } } + + Poll::Pending } } diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 94d3365984a2..ef13df84e916 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -42,7 +42,7 @@ use taiko_reth_beacon_consensus::{ use alloc::{sync::Arc, vec, vec::Vec}; use std::io; use std::io::Write; -use tracing::{debug, info}; +use tracing::debug; use reth_evm::execute::TaskResult; use reth_primitives::transaction::TransactionSignedList;