From fba5cb8c6f8766d5b6d1b565a1e9532af6ae8cfe Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 30 May 2024 16:52:35 +0300 Subject: [PATCH] refactor: update priority queue to use ThinPriorityTransaction --- crates/mempool/src/mempool.rs | 60 ++++++++++++------------- crates/mempool/src/mempool_test.rs | 23 ++++++---- crates/mempool/src/priority_queue.rs | 66 +++++++++++++++++++++------- 3 files changed, 91 insertions(+), 58 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index 13609878f..739c33b1b 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -23,6 +23,7 @@ pub mod mempool_test; pub struct Mempool { // TODO: add docstring explaining visibility and coupling of the fields. txs_queue: TransactionPriorityQueue, + // All transactions currently held in the mempool. tx_store: TransactionStore, state: HashMap, } @@ -35,35 +36,25 @@ impl Mempool { state: HashMap::default(), }; - mempool.txs_queue = TransactionPriorityQueue::from( - inputs - .into_iter() - .map(|input| { - // Attempts to insert a key-value pair into the mempool's state. Returns `None` - // if the key was not present, otherwise returns the old value while updating - // the new value. - let prev_value = - mempool.state.insert(input.account.sender_address, input.account.state); - assert!( - prev_value.is_none(), - "Sender address: {:?} already exists in the mempool. Can't add {:?} to \ - the mempool.", - input.account.sender_address, - input.tx - ); - - // Insert the transaction into the tx_store. - let res = mempool.tx_store.push(input.tx.clone()); - assert!( - res.is_ok(), - "Transaction: {:?} already exists in the mempool.", - input.tx.tx_hash - ); - - input.tx - }) - .collect::>(), - ); + for MempoolInput { tx, account: Account { sender_address, state } } in inputs.into_iter() { + // Attempts to insert a key-value pair into the mempool's state. Returns `None` + // if the key was not present, otherwise returns the old value while updating + // the new value. + let existing_account_state = mempool.state.insert(sender_address, state); + assert!( + existing_account_state.is_none(), + "Sender address: {:?} already exists in the mempool. Can't add {:?} to the \ + mempool.", + sender_address, + tx + ); + + // Insert the transaction into the tx_store. + let res = mempool.tx_store.push(tx.clone()); + assert!(res.is_ok(), "Transaction: {:?} already exists in the mempool.", tx.tx_hash); + + mempool.txs_queue.push(tx.clone().into()); + } mempool } @@ -78,10 +69,13 @@ impl Mempool { // back. TODO: Consider renaming to `pop_txs` to be more consistent with the standard // library. pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult> { - let txs = self.txs_queue.pop_last_chunk(n_txs); - for tx in &txs { + let pq_txs = self.txs_queue.pop_last_chunk(n_txs); + + let mut txs: Vec = Vec::default(); + for pq_tx in &pq_txs { + let tx = self.tx_store.remove(&pq_tx.tx_hash)?; self.state.remove(&tx.sender_address); - self.tx_store.remove(&tx.tx_hash)?; + txs.push(tx); } Ok(txs) @@ -96,7 +90,7 @@ impl Mempool { Vacant(entry) => { entry.insert(account.state); // TODO(Mohammad): use `handle_tx`. - self.txs_queue.push(tx.clone()); + self.txs_queue.push(tx.clone().into()); self.tx_store.push(tx)?; Ok(()) diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index cae92f2ee..ec2017711 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -9,6 +9,7 @@ use starknet_mempool_types::errors::MempoolError; use starknet_mempool_types::mempool_types::ThinTransaction; use crate::mempool::{Account, Mempool, MempoolInput}; +use crate::priority_queue::PrioritizedTransaction; /// Creates a valid input for mempool's `add_tx` with optional default value for /// `sender_address`. @@ -120,10 +121,10 @@ fn test_add_tx(mut mempool: Mempool) { mempool.state.contains_key(&account2.sender_address); mempool.state.contains_key(&account3.sender_address); - check_mempool_txs_eq( + assert!(check_mempool_txs_eq( &mempool, - &[tx_tip_50_address_0, tx_tip_80_address_2, tx_tip_100_address_1], - ) + &[tx_tip_50_address_0, tx_tip_80_address_2, tx_tip_100_address_1] + )); } #[rstest] @@ -137,15 +138,19 @@ fn test_add_same_tx(mut mempool: Mempool) { Err(MempoolError::DuplicateTransaction { tx_hash: TransactionHash(StarkFelt::ONE) }) ); // Assert that the original tx remains in the pool after the failed attempt. - check_mempool_txs_eq(&mempool, &[tx]) + assert!(check_mempool_txs_eq(&mempool, &[tx])); } // Asserts that the transactions in the mempool are in ascending order as per the expected // transactions. -fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) { +fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) -> bool { let mempool_txs = mempool.txs_queue.iter(); - // Deref the inner mempool tx type. - expected_txs.iter().zip(mempool_txs).all(|(a, b)| *a == **b); + + // Convert and compare transactions + expected_txs.iter().zip(mempool_txs).all(|(expected, actual)| { + let expected_converted: PrioritizedTransaction = expected.clone().into(); + expected_converted == *actual + }) } #[rstest] @@ -162,7 +167,7 @@ fn test_add_tx_with_identical_tip_succeeds(mut mempool: Mempool) { // TODO: currently hash comparison tie-breaks the two. Once more robust tie-breaks are added // replace this assertion with a dedicated test. - check_mempool_txs_eq(&mempool, &[tx2, tx1]); + assert!(check_mempool_txs_eq(&mempool, &[tx2, tx1])); } #[rstest] @@ -176,5 +181,5 @@ fn test_tip_priority_over_tx_hash(mut mempool: Mempool) { assert!(mempool.add_tx(tx_big_tip_small_hash.clone(), account1).is_ok()); assert!(mempool.add_tx(tx_small_tip_big_hash.clone(), account2).is_ok()); - check_mempool_txs_eq(&mempool, &[tx_big_tip_small_hash, tx_small_tip_big_hash]) + assert!(check_mempool_txs_eq(&mempool, &[tx_small_tip_big_hash, tx_big_tip_small_hash])); } diff --git a/crates/mempool/src/priority_queue.rs b/crates/mempool/src/priority_queue.rs index ad7c3a799..4e8f0a2c2 100644 --- a/crates/mempool/src/priority_queue.rs +++ b/crates/mempool/src/priority_queue.rs @@ -1,38 +1,61 @@ use std::cmp::Ordering; -use std::collections::BTreeSet; +#[cfg(any(feature = "testing", test))] +use std::collections::btree_set::Iter; +use std::collections::{BTreeSet, HashMap}; +use starknet_api::core::{ContractAddress, Nonce}; +use starknet_api::transaction::{Tip, TransactionHash}; use starknet_mempool_types::mempool_types::ThinTransaction; // Assumption: for the MVP only one transaction from the same contract class can be in the mempool // at a time. When this changes, saving the transactions themselves on the queu might no longer be // appropriate, because we'll also need to stores transactions without indexing them. For example, // transactions with future nonces will need to be stored, and potentially indexed on block commits. -#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut)] -pub struct TransactionPriorityQueue(BTreeSet); +#[derive(Clone, Debug, Default)] +pub struct TransactionPriorityQueue { + // Priority queue of transactions with associated priority. + queue: BTreeSet, + // FIX: Set of account addresses for efficient existence checks. + address_to_nonce: HashMap, +} impl TransactionPriorityQueue { /// Adds a transaction to the mempool, ensuring unique keys. /// Panics: if given a duplicate tx. - pub fn push(&mut self, tx: ThinTransaction) { - let mempool_tx = PrioritizedTransaction(tx); - assert!(self.insert(mempool_tx), "Keys should be unique; duplicates are checked prior."); + pub fn push(&mut self, tx: PrioritizedTransaction) { + self.address_to_nonce.insert(tx.address, tx.nonce); + assert!(self.queue.insert(tx), "Keys should be unique; duplicates are checked prior."); } // TODO(gilad): remove collect - pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec { - (0..n_txs).filter_map(|_| self.pop_last().map(|tx| tx.0)).collect() + pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec { + let txs: Vec = + (0..n_txs).filter_map(|_| self.queue.pop_last()).collect(); + for tx in txs.iter() { + self.address_to_nonce.remove(&tx.address); + } + txs + } + + #[cfg(any(feature = "testing", test))] + pub fn iter(&self) -> Iter<'_, PrioritizedTransaction> { + self.queue.iter() } -} -impl From> for TransactionPriorityQueue { - fn from(transactions: Vec) -> Self { - TransactionPriorityQueue(BTreeSet::from_iter( - transactions.into_iter().map(PrioritizedTransaction), - )) + // TODO(Mohammad): delete once the mempool is used. It will be used in Mempool's + // `get_next_eligible_tx`. + #[allow(dead_code)] + pub fn get_nonce(&self, address: &ContractAddress) -> Option<&Nonce> { + self.address_to_nonce.get(address) } } -#[derive(Clone, Debug, derive_more::Deref, derive_more::From)] -pub struct PrioritizedTransaction(pub ThinTransaction); +#[derive(Clone, Debug, Default)] +pub struct PrioritizedTransaction { + pub address: ContractAddress, + pub nonce: Nonce, + pub tx_hash: TransactionHash, + pub tip: Tip, +} /// Compare transactions based only on their tip, a uint, using the Eq trait. It ensures that two /// tips are either exactly equal or not. @@ -59,6 +82,17 @@ impl PartialOrd for PrioritizedTransaction { } } +impl From for PrioritizedTransaction { + fn from(tx: ThinTransaction) -> Self { + PrioritizedTransaction { + address: tx.sender_address, + nonce: tx.nonce, + tx_hash: tx.tx_hash, + tip: tx.tip, + } + } +} + // TODO: remove when is used. #[allow(dead_code)] // Assumption: there are no gaps, and the transactions are received in order.