Skip to content

Commit

Permalink
revert changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp committed Oct 25, 2024
1 parent 8458b33 commit 09a492f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
101 changes: 74 additions & 27 deletions crates/taiko/consensus/proposer/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{Storage, TaskArgs};
use futures_util::{future::BoxFuture, FutureExt};
use reth_chainspec::ChainSpec;
use reth_evm::execute::BlockExecutorProvider;
use reth_primitives::IntoRecoveredTransaction;
use reth_provider::{BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::TransactionPool;
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
Expand All @@ -19,8 +21,16 @@ pub struct ProposerTask<Provider, Pool: TransactionPool, Executor> {
chain_spec: Arc<ChainSpec>,
/// The client used to interact with the state
provider: Provider,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, ()>>,
/// Pool where transactions are stored
pool: Pool,
/// backlog of sets of transactions ready to be mined
#[allow(clippy::type_complexity)]
queued: VecDeque<(
TaskArgs,
Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
)>,
/// The type used for block execution
block_executor: Executor,
trigger_args_rx: UnboundedReceiver<TaskArgs>,
Expand All @@ -38,7 +48,15 @@ impl<Executor, Provider, Pool: TransactionPool> ProposerTask<Provider, Pool, Exe
block_executor: Executor,
trigger_args_rx: UnboundedReceiver<TaskArgs>,
) -> Self {
Self { chain_spec, provider, pool, block_executor, trigger_args_rx }
Self {
chain_spec,
provider,
insert_task: None,
pool,
queued: Default::default(),
block_executor,
trigger_args_rx,
}
}
}

Expand All @@ -56,32 +74,50 @@ where

// this drives block production and
loop {
match this.trigger_args_rx.poll_recv(cx) {
Poll::Pending => return Poll::Pending,
if let Some(trigger_args) = match this.trigger_args_rx.poll_recv(cx) {
Poll::Ready(Some(args)) => Some(args),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(args)) => {
let mut best_txs = this.pool.best_transactions();
best_txs.skip_blobs();
debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions");
let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs
.filter(|tx| {
tx.effective_tip_per_gas(args.base_fee)
.map_or(false, |tip| tip >= args.min_tip as u128)
})
.partition(|tx| {
args.local_accounts
.as_ref()
.map(|local_accounts| local_accounts.contains(&tx.sender()))
.unwrap_or_default()
});
local_txs.extend(remote_txs);
debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions");
_ => None,
} {
let mut best_txs = this.pool.best_transactions();
best_txs.skip_blobs();
debug!(target: "taiko::proposer", txs = ?best_txs.size_hint(), "Proposer get best transactions");
let (mut local_txs, remote_txs): (Vec<_>, Vec<_>) = best_txs
.filter(|tx| {
tx.effective_tip_per_gas(trigger_args.base_fee)
.map_or(false, |tip| tip >= trigger_args.min_tip as u128)
})
.partition(|tx| {
trigger_args
.local_accounts
.as_ref()
.map(|local_accounts| local_accounts.contains(&tx.sender()))
.unwrap_or_default()
});
local_txs.extend(remote_txs);
debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions");

// miner returned a set of transaction that we feed to the producer
this.queued.push_back((trigger_args, local_txs));
};

if this.insert_task.is_none() {
if this.queued.is_empty() {
// nothing to insert
break;
}

// ready to queue in new insert task;
let (trigger_args, txs) = this.queued.pop_front().expect("not empty");

let client = this.provider.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let executor = this.block_executor.clone();
let client = this.provider.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let executor = this.block_executor.clone();

let txs: Vec<_> = local_txs
// Create the mining future that creates a block, notifies the engine that drives
// the pipeline
this.insert_task = Some(Box::pin(async move {
let txs: Vec<_> = txs
.into_iter()
.map(|tx| tx.to_recovered_transaction().into_signed())
.collect();
Expand All @@ -95,8 +131,7 @@ where
max_transactions_lists,
base_fee,
..
} = args;

} = trigger_args;
let res = Storage::build_and_execute(
txs.clone(),
ommers,
Expand All @@ -110,9 +145,21 @@ where
base_fee,
);
let _ = tx.send(res);
}));
}

if let Some(mut fut) = this.insert_task.take() {
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
this.insert_task = Some(fut);
break;
}
}
}
}

Poll::Pending
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/taiko/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use taiko_reth_beacon_consensus::{
use alloc::{sync::Arc, vec, vec::Vec};
use std::io;
use std::io::Write;
use tracing::{debug, info};
use tracing::debug;

use reth_evm::execute::TaskResult;
use reth_primitives::transaction::TransactionSignedList;
Expand Down

0 comments on commit 09a492f

Please sign in to comment.