From eca416574cf5bb4bca9a65cce3fefb6a3c4a399f Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 30 May 2024 16:52:35 +0300 Subject: [PATCH] refactor: update priority queue to use PrioritizedTransaction --- crates/mempool/src/mempool.rs | 53 ++++++++++++++++++-------- crates/mempool/src/mempool_test.rs | 9 +++-- crates/mempool/src/priority_queue.rs | 42 ++++++++++---------- crates/mempool/src/transaction_pool.rs | 30 +++++++-------- 4 files changed, 79 insertions(+), 55 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index f59619ff..75e96e90 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -1,14 +1,14 @@ use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; -use starknet_api::core::ContractAddress; -use starknet_api::transaction::TransactionHash; +use starknet_api::core::{ContractAddress, Nonce}; +use starknet_api::transaction::{Tip, TransactionHash}; use starknet_mempool_types::errors::MempoolError; use starknet_mempool_types::mempool_types::{ Account, AccountState, MempoolInput, MempoolResult, ThinTransaction, }; -use crate::priority_queue::TransactionPriorityQueue; +use crate::priority_queue::TransactionQueue; use crate::transaction_pool::TransactionPool; #[cfg(test)] @@ -18,7 +18,9 @@ pub mod mempool_test; #[derive(Debug, Default)] pub struct Mempool { // TODO: add docstring explaining visibility and coupling of the fields. - txs_queue: TransactionPriorityQueue, + // Transactions eligible for sequencing. + tx_queue: TransactionQueue, + // All transactions currently held in the mempool. tx_pool: TransactionPool, state: HashMap, } @@ -27,7 +29,7 @@ impl Mempool { pub fn new(inputs: impl IntoIterator) -> Self { let mut mempool = Mempool::empty(); - for MempoolInput { tx, account: Account { sender_address, state } } in inputs.into_iter() { + for MempoolInput { tx, account: Account { sender_address, state } } in inputs { // Attempts to insert a key-value pair into the mempool's state. Returns `None` // if the key was not present, otherwise returns the old value while updating // the new value. @@ -38,15 +40,16 @@ impl Mempool { sender_address, tx ); } - // Attempt to push the transaction into the tx_pool - if let Err(err) = mempool.tx_pool.push(tx.clone()) { + + mempool.tx_queue.push((&tx).into()); + + let tx_hash = tx.tx_hash; + if let Err(err) = mempool.tx_pool.push(tx) { panic!( "Transaction: {:?} already exists in the mempool. Error: {:?}", - tx.tx_hash, err + tx_hash, err ); } - - mempool.txs_queue.push(tx); } mempool @@ -62,13 +65,14 @@ impl Mempool { // back. TODO: Consider renaming to `pop_txs` to be more consistent with the standard // library. pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult> { - let txs = self.txs_queue.pop_last_chunk(n_txs); - for tx in &txs { + let mut eligible_txs: Vec = Vec::with_capacity(n_txs); + for tx_hash in self.tx_queue.pop_last_chunk(n_txs).into_iter() { + let tx = self.tx_pool.remove(tx_hash)?; self.state.remove(&tx.sender_address); - self.tx_pool.remove(tx.tx_hash)?; + eligible_txs.push(tx); } - Ok(txs) + Ok(eligible_txs) } /// Adds a new transaction to the mempool. @@ -80,7 +84,7 @@ impl Mempool { Vacant(entry) => { entry.insert(account.state); // TODO(Mohammad): use `handle_tx`. - self.txs_queue.push(tx.clone()); + self.tx_queue.push((&tx).into()); self.tx_pool.push(tx)?; Ok(()) @@ -101,3 +105,22 @@ impl Mempool { todo!() } } + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct TransactionReference { + pub sender_address: ContractAddress, + pub nonce: Nonce, + pub tx_hash: TransactionHash, + pub tip: Tip, +} + +impl From<&ThinTransaction> for TransactionReference { + fn from(tx: &ThinTransaction) -> Self { + TransactionReference { + sender_address: tx.sender_address, + nonce: tx.nonce, + tx_hash: tx.tx_hash, + tip: tx.tip, + } + } +} diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index 887c87dc..47adbe3b 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -9,7 +9,7 @@ use starknet_api::{contract_address, patricia_key}; use starknet_mempool_types::errors::MempoolError; use starknet_mempool_types::mempool_types::ThinTransaction; -use crate::mempool::{Account, Mempool, MempoolInput}; +use crate::mempool::{Account, Mempool, MempoolInput, TransactionReference}; /// Creates a valid input for mempool's `add_tx` with optional default value for /// `sender_address`. @@ -51,12 +51,13 @@ fn mempool() -> Mempool { // transactions. #[track_caller] fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) { - let mempool_txs = mempool.txs_queue.iter(); + let mempool_txs = mempool.tx_queue.iter(); + let expected_txs = expected_txs.iter().map(TransactionReference::from); assert!( zip_eq(expected_txs, mempool_txs) // Deref the inner mempool tx type. - .all(|(expected_tx, mempool_tx)| *expected_tx == **mempool_tx) + .all(|(expected_tx, mempool_tx)| expected_tx == *mempool_tx) ); } @@ -169,7 +170,7 @@ fn test_add_tx_with_identical_tip_succeeds(mut mempool: Mempool) { // TODO: currently hash comparison tie-breaks the two. Once more robust tie-breaks are added // replace this assertion with a dedicated test. - check_mempool_txs_eq(&mempool, &[tx2, tx1]); + check_mempool_txs_eq(&mempool, &[tx2, tx1]) } #[rstest] diff --git a/crates/mempool/src/priority_queue.rs b/crates/mempool/src/priority_queue.rs index ff53086b..789fc4a8 100644 --- a/crates/mempool/src/priority_queue.rs +++ b/crates/mempool/src/priority_queue.rs @@ -1,43 +1,43 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, VecDeque}; +use starknet_api::transaction::TransactionHash; use starknet_mempool_types::mempool_types::ThinTransaction; + +use crate::mempool::TransactionReference; // Assumption: for the MVP only one transaction from the same contract class can be in the mempool // at a time. When this changes, saving the transactions themselves on the queu might no longer be // appropriate, because we'll also need to stores transactions without indexing them. For example, // transactions with future nonces will need to be stored, and potentially indexed on block commits. -#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut)] -pub struct TransactionPriorityQueue(BTreeSet); -impl TransactionPriorityQueue { +#[derive(Clone, Debug, Default)] +pub struct TransactionQueue(BTreeSet); + +impl TransactionQueue { /// Adds a transaction to the mempool, ensuring unique keys. /// Panics: if given a duplicate tx. - pub fn push(&mut self, tx: ThinTransaction) { - let mempool_tx = PrioritizedTransaction(tx); - assert!(self.insert(mempool_tx), "Keys should be unique; duplicates are checked prior."); + pub fn push(&mut self, tx: TransactionReference) { + assert!(self.0.insert(tx.into()), "Keys should be unique; duplicates are checked prior."); } // TODO(gilad): remove collect - pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec { - (0..n_txs).filter_map(|_| self.pop_last().map(|tx| tx.0)).collect() + pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec { + (0..n_txs).filter_map(|_| self.0.pop_last().map(|tx| tx.tx_hash)).collect() } -} -impl From> for TransactionPriorityQueue { - fn from(transactions: Vec) -> Self { - TransactionPriorityQueue(BTreeSet::from_iter( - transactions.into_iter().map(PrioritizedTransaction), - )) + #[cfg(any(feature = "testing", test))] + pub fn iter(&self) -> impl Iterator { + self.0.iter().map(|queued_tx| &queued_tx.0) } } -#[derive(Clone, Debug, derive_more::Deref, derive_more::From)] -pub struct PrioritizedTransaction(pub ThinTransaction); +#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut, derive_more::From)] +struct QueuedTransaction(TransactionReference); /// Compare transactions based only on their tip, a uint, using the Eq trait. It ensures that two /// tips are either exactly equal or not. -impl PartialEq for PrioritizedTransaction { - fn eq(&self, other: &PrioritizedTransaction) -> bool { +impl PartialEq for QueuedTransaction { + fn eq(&self, other: &QueuedTransaction) -> bool { self.tip == other.tip && self.tx_hash == other.tx_hash } } @@ -45,15 +45,15 @@ impl PartialEq for PrioritizedTransaction { /// Marks this struct as capable of strict equality comparisons, signaling to the compiler it /// adheres to equality semantics. // Note: this depends on the implementation of `PartialEq`, see its docstring. -impl Eq for PrioritizedTransaction {} +impl Eq for QueuedTransaction {} -impl Ord for PrioritizedTransaction { +impl Ord for QueuedTransaction { fn cmp(&self, other: &Self) -> Ordering { self.tip.cmp(&other.tip).then_with(|| self.tx_hash.cmp(&other.tx_hash)) } } -impl PartialOrd for PrioritizedTransaction { +impl PartialOrd for QueuedTransaction { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } diff --git a/crates/mempool/src/transaction_pool.rs b/crates/mempool/src/transaction_pool.rs index 382266b7..3d6cbe62 100644 --- a/crates/mempool/src/transaction_pool.rs +++ b/crates/mempool/src/transaction_pool.rs @@ -5,7 +5,7 @@ use starknet_api::transaction::TransactionHash; use starknet_mempool_types::errors::MempoolError; use starknet_mempool_types::mempool_types::{MempoolResult, ThinTransaction}; -use crate::priority_queue::PrioritizedTransaction; +use crate::mempool::TransactionReference; /// Contains all transactions currently held in the mempool. /// Invariant: both data structures are consistent regarding the existence of transactions: @@ -16,33 +16,33 @@ pub struct TransactionPool { // Holds the complete transaction objects; it should be the sole entity that does so. tx_pool: HashMap, // Transactions organized by account address, sorted by ascending nonce values. - txs_by_account: HashMap>, + txs_by_account: HashMap>, } impl TransactionPool { - // TODO(Mohammad): Remove the cloning of tx once the PrioritizedTransaction is updated. pub fn push(&mut self, tx: ThinTransaction) -> MempoolResult<()> { let tx_hash = tx.tx_hash; - // Insert transaction to pool, if it is new. - if let hash_map::Entry::Vacant(entry) = self.tx_pool.entry(tx_hash) { - entry.insert(tx.clone()); - } else { - return Err(MempoolError::DuplicateTransaction { tx_hash }); - } - let txs_from_account_entry = self.txs_by_account.entry(tx.sender_address).or_default(); match txs_from_account_entry.entry(tx.nonce) { btree_map::Entry::Vacant(txs_from_account) => { - txs_from_account.insert(tx.into()); + txs_from_account.insert((&tx).into()); } btree_map::Entry::Occupied(_) => { - panic!( - "Transaction pool consistency error: transaction with hash {tx_hash} does not \ - appear in main mapping, but it appears in the account mapping" - ); + return Err(MempoolError::DuplicateTransaction { tx_hash }); } } + + // Insert transaction to pool, if it is new. + if let hash_map::Entry::Vacant(entry) = self.tx_pool.entry(tx_hash) { + entry.insert(tx); + } else { + panic!( + "Transaction pool consistency error: transaction with hash {tx_hash} does not \ + appear in the account mapping, but it appears in the main mapping" + ); + } + Ok(()) }