Skip to content

Commit

Permalink
fix applying max_user_txns_to_execute
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Nov 5, 2024
1 parent 9f90b71 commit 1f25ce0
Showing 1 changed file with 42 additions and 40 deletions.
82 changes: 42 additions & 40 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,51 +295,55 @@ impl ExecutionPipeline {
}
});

let (mut txns, blocking_txns_provider) =
monitor!("execute_filter_block_committed_transactions", {
// TODO: Find a better way to do this.
match block.transactions {
ExecutableTransactions::Unsharded(txns) => {
let transactions: Vec<_> = txns
.into_iter()
.filter(|txn| {
if let Valid(UserTransaction(user_txn)) = txn {
!committed_transactions.contains(&user_txn.committed_hash())
} else {
true
}
})
.collect();
let transactions_len = transactions.len();
(
transactions,
Arc::new(BlockingTxnsProvider::new(
max_txns_to_execute.unwrap_or(transactions_len as u64) as usize,
)),
)
},
ExecutableTransactions::UnshardedBlocking(_) => {
unimplemented!("Not expecting this yet.")
},
ExecutableTransactions::Sharded(_) => {
unimplemented!("Sharded transactions are not supported yet.")
},
}
});

let num_txns_to_execute = blocking_txns_provider.num_txns();
let blocking_txns_writer = blocking_txns_provider.clone();
let join_shuffle = tokio::task::spawn_blocking(move || {
// TODO: keep this previously split so we don't have to re-split it here
if let Some((first_user_txn_idx, _)) = txns.iter().find_position(|txn| {
let mut txns = monitor!("execute_filter_block_committed_transactions", {
// TODO: Find a better way to do this.
match block.transactions {
ExecutableTransactions::Unsharded(txns) => {
let transactions: Vec<_> = txns
.into_iter()
.filter(|txn| {
if let Valid(UserTransaction(user_txn)) = txn {
!committed_transactions.contains(&user_txn.committed_hash())
} else {
true
}
})
.collect();
transactions
},
ExecutableTransactions::UnshardedBlocking(_) => {
unimplemented!("Not expecting this yet.")
},
ExecutableTransactions::Sharded(_) => {
unimplemented!("Sharded transactions are not supported yet.")
},
}
});
let num_validator_txns = if let Some((first_user_txn_idx, _)) =
txns.iter().find_position(|txn| {
let txn = match txn {
Valid(txn) => txn,
Invalid(txn) => txn,
};
matches!(txn, UserTransaction(_))
}) {
first_user_txn_idx
} else {
txns.len()
};
let num_txns_to_execute = if let Some(max_user_txns_to_execute) = max_txns_to_execute {
txns.len()
.min(num_validator_txns.saturating_add(max_user_txns_to_execute as usize))
} else {
txns.len()
};
let blocking_txns_provider = Arc::new(BlockingTxnsProvider::new(num_txns_to_execute));
let blocking_txns_writer = blocking_txns_provider.clone();
let join_shuffle = tokio::task::spawn_blocking(move || {
// TODO: keep this previously split so we don't have to re-split it here
if num_txns_to_execute > num_validator_txns {
let timer = Instant::now();
let validator_txns: Vec<_> = txns.drain(0..first_user_txn_idx).collect();
let validator_txns: Vec<_> = txns.drain(0..num_validator_txns).collect();
info!(
"Execution: Split validator txns from user txns in {} micros",
timer.elapsed().as_micros()
Expand All @@ -359,8 +363,6 @@ impl ExecutionPipeline {
blocking_txns_writer.set_txn(idx as TxnIndex, txn);
}
} else {
// TODO: could num_txns_to_execute < number of validator txns?
// No user transactions in the block.
for (idx, txn) in txns.into_iter().take(num_txns_to_execute).enumerate() {
blocking_txns_writer.set_txn(idx as TxnIndex, txn);
}
Expand Down

0 comments on commit 1f25ce0

Please sign in to comment.