diff --git a/crates/mempool/src/lib.rs b/crates/mempool/src/lib.rs index fb92ed2d..6d5d6f00 100644 --- a/crates/mempool/src/lib.rs +++ b/crates/mempool/src/lib.rs @@ -1,2 +1,3 @@ pub mod mempool; pub(crate) mod priority_queue; +pub mod transaction_store; diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index 6da16953..3916b80f 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -16,6 +16,7 @@ use starknet_mempool_types::mempool_types::{ use tokio::select; use crate::priority_queue::TransactionPriorityQueue; +use crate::transaction_store::TransactionStore; #[cfg(test)] #[path = "mempool_test.rs"] @@ -26,6 +27,7 @@ pub struct Mempool { pub gateway_network: MempoolNetworkComponent, batcher_network: BatcherToMempoolChannels, txs_queue: TransactionPriorityQueue, + tx_store: TransactionStore, state: HashMap, } @@ -37,6 +39,7 @@ impl Mempool { ) -> Self { let mut mempool = Mempool { txs_queue: TransactionPriorityQueue::default(), + tx_store: TransactionStore::default(), state: HashMap::default(), gateway_network, batcher_network, @@ -58,6 +61,15 @@ impl Mempool { input.account.address, input.tx ); + + // Insert the transaction into the tx_store. + let res = mempool.tx_store.push(input.tx.clone()); + assert!( + res.is_ok(), + "Transaction: {:?} already exists in the mempool.", + input.tx.tx_hash + ); + input.tx }) .collect::>(), @@ -82,6 +94,7 @@ impl Mempool { let txs = self.txs_queue.pop_last_chunk(n_txs); for tx in &txs { self.state.remove(&tx.sender_address); + self.tx_store.remove(&tx.tx_hash)?; } Ok(txs) @@ -95,7 +108,9 @@ impl Mempool { Occupied(_) => Err(MempoolError::DuplicateTransaction { tx_hash: tx.tx_hash }), Vacant(entry) => { entry.insert(account.state); - self.txs_queue.push(tx); + self.txs_queue.push(tx.clone()); + self.tx_store.push(tx)?; + Ok(()) } } diff --git a/crates/mempool/src/transaction_store.rs b/crates/mempool/src/transaction_store.rs new file mode 100644 index 00000000..a01a9667 --- /dev/null +++ b/crates/mempool/src/transaction_store.rs @@ -0,0 +1,46 @@ +use std::collections::{BTreeMap, HashMap}; + +use starknet_api::core::{ContractAddress, Nonce}; +use starknet_api::transaction::TransactionHash; +use starknet_mempool_types::errors::MempoolError; +use starknet_mempool_types::mempool_types::ThinTransaction; + +#[derive(Clone, Debug, Default)] +pub struct TransactionStore { + store: HashMap>, + tx_hash_2_tx: HashMap, +} + +impl TransactionStore { + pub fn push(&mut self, tx: ThinTransaction) -> Result<(), MempoolError> { + let account_store = self.store.entry(tx.sender_address).or_default(); + // TODO(Mohammad): Allow overriding a previous transaction. + if account_store.contains_key(&tx.nonce) { + return Err(MempoolError::DuplicateTransaction { tx_hash: tx.tx_hash }); + } + account_store.insert(tx.nonce, tx.clone()); + self.tx_hash_2_tx.insert(tx.tx_hash, (tx.sender_address, tx.nonce)); + Ok(()) + } + + pub fn remove(&mut self, tx_hash: &TransactionHash) -> Result { + if let Some((address, nonce)) = self.tx_hash_2_tx.remove(tx_hash) { + if let Some(tree_map) = self.store.get_mut(&address) { + if let Some(tx) = tree_map.remove(&nonce) { + return Ok(tx); + } + } + } + Err(MempoolError::TransactionNotFound { tx_hash: *tx_hash }) + } + + pub fn get(&mut self, tx_hash: &TransactionHash) -> Result<&ThinTransaction, MempoolError> { + let (address, nonce) = self.tx_hash_2_tx.get(tx_hash).unwrap(); + if let Some(tree_map) = self.store.get(address) { + if let Some(tx) = tree_map.get(nonce) { + return Ok(tx); + } + } + Err(MempoolError::TransactionNotFound { tx_hash: *tx_hash }) + } +} diff --git a/crates/mempool_types/src/errors.rs b/crates/mempool_types/src/errors.rs index bdc458b2..ff676545 100644 --- a/crates/mempool_types/src/errors.rs +++ b/crates/mempool_types/src/errors.rs @@ -5,4 +5,6 @@ use thiserror::Error; pub enum MempoolError { #[error("Duplicate transaction, of hash: {tx_hash}")] DuplicateTransaction { tx_hash: TransactionHash }, + #[error("Transaction of hash: {tx_hash} is not found")] + TransactionNotFound { tx_hash: TransactionHash }, }