diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index ed07b2aca19ca..c46d6d867f9ae 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -295,51 +295,55 @@ impl ExecutionPipeline { } }); - let (mut txns, blocking_txns_provider) = - monitor!("execute_filter_block_committed_transactions", { - // TODO: Find a better way to do this. - match block.transactions { - ExecutableTransactions::Unsharded(txns) => { - let transactions: Vec<_> = txns - .into_iter() - .filter(|txn| { - if let Valid(UserTransaction(user_txn)) = txn { - !committed_transactions.contains(&user_txn.committed_hash()) - } else { - true - } - }) - .collect(); - let transactions_len = transactions.len(); - ( - transactions, - Arc::new(BlockingTxnsProvider::new( - max_txns_to_execute.unwrap_or(transactions_len as u64) as usize, - )), - ) - }, - ExecutableTransactions::UnshardedBlocking(_) => { - unimplemented!("Not expecting this yet.") - }, - ExecutableTransactions::Sharded(_) => { - unimplemented!("Sharded transactions are not supported yet.") - }, - } - }); - - let num_txns_to_execute = blocking_txns_provider.num_txns(); - let blocking_txns_writer = blocking_txns_provider.clone(); - let join_shuffle = tokio::task::spawn_blocking(move || { - // TODO: keep this previously split so we don't have to re-split it here - if let Some((first_user_txn_idx, _)) = txns.iter().find_position(|txn| { + let mut txns = monitor!("execute_filter_block_committed_transactions", { + // TODO: Find a better way to do this. + match block.transactions { + ExecutableTransactions::Unsharded(txns) => { + let transactions: Vec<_> = txns + .into_iter() + .filter(|txn| { + if let Valid(UserTransaction(user_txn)) = txn { + !committed_transactions.contains(&user_txn.committed_hash()) + } else { + true + } + }) + .collect(); + transactions + }, + ExecutableTransactions::UnshardedBlocking(_) => { + unimplemented!("Not expecting this yet.") + }, + ExecutableTransactions::Sharded(_) => { + unimplemented!("Sharded transactions are not supported yet.") + }, + } + }); + let num_validator_txns = if let Some((first_user_txn_idx, _)) = + txns.iter().find_position(|txn| { let txn = match txn { Valid(txn) => txn, Invalid(txn) => txn, }; matches!(txn, UserTransaction(_)) }) { + first_user_txn_idx + } else { + txns.len() + }; + let num_txns_to_execute = if let Some(max_user_txns_to_execute) = max_txns_to_execute { + txns.len() + .min(num_validator_txns.saturating_add(max_user_txns_to_execute as usize)) + } else { + txns.len() + }; + let blocking_txns_provider = Arc::new(BlockingTxnsProvider::new(num_txns_to_execute)); + let blocking_txns_writer = blocking_txns_provider.clone(); + let join_shuffle = tokio::task::spawn_blocking(move || { + // TODO: keep this previously split so we don't have to re-split it here + if num_txns_to_execute > num_validator_txns { let timer = Instant::now(); - let validator_txns: Vec<_> = txns.drain(0..first_user_txn_idx).collect(); + let validator_txns: Vec<_> = txns.drain(0..num_validator_txns).collect(); info!( "Execution: Split validator txns from user txns in {} micros", timer.elapsed().as_micros() @@ -359,8 +363,6 @@ impl ExecutionPipeline { blocking_txns_writer.set_txn(idx as TxnIndex, txn); } } else { - // TODO: could num_txns_to_execute < number of validator txns? - // No user transactions in the block. for (idx, txn) in txns.into_iter().take(num_txns_to_execute).enumerate() { blocking_txns_writer.set_txn(idx as TxnIndex, txn); }