Skip to content

Commit

Permalink
refactor: update priority queue to use PrioritizedTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
MohammadNassar1 committed Jun 25, 2024
1 parent ccd75ea commit eca4165
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 55 deletions.
53 changes: 38 additions & 15 deletions crates/mempool/src/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;

use starknet_api::core::ContractAddress;
use starknet_api::transaction::TransactionHash;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::transaction::{Tip, TransactionHash};
use starknet_mempool_types::errors::MempoolError;
use starknet_mempool_types::mempool_types::{
Account, AccountState, MempoolInput, MempoolResult, ThinTransaction,
};

use crate::priority_queue::TransactionPriorityQueue;
use crate::priority_queue::TransactionQueue;
use crate::transaction_pool::TransactionPool;

#[cfg(test)]
Expand All @@ -18,7 +18,9 @@ pub mod mempool_test;
#[derive(Debug, Default)]
pub struct Mempool {
// TODO: add docstring explaining visibility and coupling of the fields.
txs_queue: TransactionPriorityQueue,
// Transactions eligible for sequencing.
tx_queue: TransactionQueue,
// All transactions currently held in the mempool.
tx_pool: TransactionPool,
state: HashMap<ContractAddress, AccountState>,
}
Expand All @@ -27,7 +29,7 @@ impl Mempool {
pub fn new(inputs: impl IntoIterator<Item = MempoolInput>) -> Self {
let mut mempool = Mempool::empty();

for MempoolInput { tx, account: Account { sender_address, state } } in inputs.into_iter() {
for MempoolInput { tx, account: Account { sender_address, state } } in inputs {
// Attempts to insert a key-value pair into the mempool's state. Returns `None`
// if the key was not present, otherwise returns the old value while updating
// the new value.
Expand All @@ -38,15 +40,16 @@ impl Mempool {
sender_address, tx
);
}
// Attempt to push the transaction into the tx_pool
if let Err(err) = mempool.tx_pool.push(tx.clone()) {

mempool.tx_queue.push((&tx).into());

let tx_hash = tx.tx_hash;
if let Err(err) = mempool.tx_pool.push(tx) {
panic!(
"Transaction: {:?} already exists in the mempool. Error: {:?}",
tx.tx_hash, err
tx_hash, err
);
}

mempool.txs_queue.push(tx);
}

mempool
Expand All @@ -62,13 +65,14 @@ impl Mempool {
// back. TODO: Consider renaming to `pop_txs` to be more consistent with the standard
// library.
pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult<Vec<ThinTransaction>> {
let txs = self.txs_queue.pop_last_chunk(n_txs);
for tx in &txs {
let mut eligible_txs: Vec<ThinTransaction> = Vec::with_capacity(n_txs);
for tx_hash in self.tx_queue.pop_last_chunk(n_txs).into_iter() {
let tx = self.tx_pool.remove(tx_hash)?;
self.state.remove(&tx.sender_address);
self.tx_pool.remove(tx.tx_hash)?;
eligible_txs.push(tx);
}

Ok(txs)
Ok(eligible_txs)
}

/// Adds a new transaction to the mempool.
Expand All @@ -80,7 +84,7 @@ impl Mempool {
Vacant(entry) => {
entry.insert(account.state);
// TODO(Mohammad): use `handle_tx`.
self.txs_queue.push(tx.clone());
self.tx_queue.push((&tx).into());
self.tx_pool.push(tx)?;

Ok(())
Expand All @@ -101,3 +105,22 @@ impl Mempool {
todo!()
}
}

#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct TransactionReference {
pub sender_address: ContractAddress,
pub nonce: Nonce,
pub tx_hash: TransactionHash,
pub tip: Tip,
}

impl From<&ThinTransaction> for TransactionReference {
fn from(tx: &ThinTransaction) -> Self {
TransactionReference {
sender_address: tx.sender_address,
nonce: tx.nonce,
tx_hash: tx.tx_hash,
tip: tx.tip,
}
}
}
9 changes: 5 additions & 4 deletions crates/mempool/src/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet_api::{contract_address, patricia_key};
use starknet_mempool_types::errors::MempoolError;
use starknet_mempool_types::mempool_types::ThinTransaction;

use crate::mempool::{Account, Mempool, MempoolInput};
use crate::mempool::{Account, Mempool, MempoolInput, TransactionReference};

/// Creates a valid input for mempool's `add_tx` with optional default value for
/// `sender_address`.
Expand Down Expand Up @@ -51,12 +51,13 @@ fn mempool() -> Mempool {
// transactions.
#[track_caller]
fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) {
let mempool_txs = mempool.txs_queue.iter();
let mempool_txs = mempool.tx_queue.iter();
let expected_txs = expected_txs.iter().map(TransactionReference::from);

assert!(
zip_eq(expected_txs, mempool_txs)
// Deref the inner mempool tx type.
.all(|(expected_tx, mempool_tx)| *expected_tx == **mempool_tx)
.all(|(expected_tx, mempool_tx)| expected_tx == *mempool_tx)
);
}

Expand Down Expand Up @@ -169,7 +170,7 @@ fn test_add_tx_with_identical_tip_succeeds(mut mempool: Mempool) {

// TODO: currently hash comparison tie-breaks the two. Once more robust tie-breaks are added
// replace this assertion with a dedicated test.
check_mempool_txs_eq(&mempool, &[tx2, tx1]);
check_mempool_txs_eq(&mempool, &[tx2, tx1])
}

#[rstest]
Expand Down
42 changes: 21 additions & 21 deletions crates/mempool/src/priority_queue.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
use std::cmp::Ordering;
use std::collections::{BTreeSet, VecDeque};

use starknet_api::transaction::TransactionHash;
use starknet_mempool_types::mempool_types::ThinTransaction;

use crate::mempool::TransactionReference;
// Assumption: for the MVP only one transaction from the same contract class can be in the mempool
// at a time. When this changes, saving the transactions themselves on the queu might no longer be
// appropriate, because we'll also need to stores transactions without indexing them. For example,
// transactions with future nonces will need to be stored, and potentially indexed on block commits.
#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut)]
pub struct TransactionPriorityQueue(BTreeSet<PrioritizedTransaction>);

impl TransactionPriorityQueue {
#[derive(Clone, Debug, Default)]
pub struct TransactionQueue(BTreeSet<QueuedTransaction>);

impl TransactionQueue {
/// Adds a transaction to the mempool, ensuring unique keys.
/// Panics: if given a duplicate tx.
pub fn push(&mut self, tx: ThinTransaction) {
let mempool_tx = PrioritizedTransaction(tx);
assert!(self.insert(mempool_tx), "Keys should be unique; duplicates are checked prior.");
pub fn push(&mut self, tx: TransactionReference) {
assert!(self.0.insert(tx.into()), "Keys should be unique; duplicates are checked prior.");
}

// TODO(gilad): remove collect
pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec<ThinTransaction> {
(0..n_txs).filter_map(|_| self.pop_last().map(|tx| tx.0)).collect()
pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec<TransactionHash> {
(0..n_txs).filter_map(|_| self.0.pop_last().map(|tx| tx.tx_hash)).collect()
}
}

impl From<Vec<ThinTransaction>> for TransactionPriorityQueue {
fn from(transactions: Vec<ThinTransaction>) -> Self {
TransactionPriorityQueue(BTreeSet::from_iter(
transactions.into_iter().map(PrioritizedTransaction),
))
#[cfg(any(feature = "testing", test))]
pub fn iter(&self) -> impl Iterator<Item = &TransactionReference> {
self.0.iter().map(|queued_tx| &queued_tx.0)
}
}

#[derive(Clone, Debug, derive_more::Deref, derive_more::From)]
pub struct PrioritizedTransaction(pub ThinTransaction);
#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut, derive_more::From)]
struct QueuedTransaction(TransactionReference);

/// Compare transactions based only on their tip, a uint, using the Eq trait. It ensures that two
/// tips are either exactly equal or not.
impl PartialEq for PrioritizedTransaction {
fn eq(&self, other: &PrioritizedTransaction) -> bool {
impl PartialEq for QueuedTransaction {
fn eq(&self, other: &QueuedTransaction) -> bool {
self.tip == other.tip && self.tx_hash == other.tx_hash
}
}

/// Marks this struct as capable of strict equality comparisons, signaling to the compiler it
/// adheres to equality semantics.
// Note: this depends on the implementation of `PartialEq`, see its docstring.
impl Eq for PrioritizedTransaction {}
impl Eq for QueuedTransaction {}

impl Ord for PrioritizedTransaction {
impl Ord for QueuedTransaction {
fn cmp(&self, other: &Self) -> Ordering {
self.tip.cmp(&other.tip).then_with(|| self.tx_hash.cmp(&other.tx_hash))
}
}

impl PartialOrd for PrioritizedTransaction {
impl PartialOrd for QueuedTransaction {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
Expand Down
30 changes: 15 additions & 15 deletions crates/mempool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use starknet_api::transaction::TransactionHash;
use starknet_mempool_types::errors::MempoolError;
use starknet_mempool_types::mempool_types::{MempoolResult, ThinTransaction};

use crate::priority_queue::PrioritizedTransaction;
use crate::mempool::TransactionReference;

/// Contains all transactions currently held in the mempool.
/// Invariant: both data structures are consistent regarding the existence of transactions:
Expand All @@ -16,33 +16,33 @@ pub struct TransactionPool {
// Holds the complete transaction objects; it should be the sole entity that does so.
tx_pool: HashMap<TransactionHash, ThinTransaction>,
// Transactions organized by account address, sorted by ascending nonce values.
txs_by_account: HashMap<ContractAddress, BTreeMap<Nonce, PrioritizedTransaction>>,
txs_by_account: HashMap<ContractAddress, BTreeMap<Nonce, TransactionReference>>,
}

impl TransactionPool {
// TODO(Mohammad): Remove the cloning of tx once the PrioritizedTransaction is updated.
pub fn push(&mut self, tx: ThinTransaction) -> MempoolResult<()> {
let tx_hash = tx.tx_hash;

// Insert transaction to pool, if it is new.
if let hash_map::Entry::Vacant(entry) = self.tx_pool.entry(tx_hash) {
entry.insert(tx.clone());
} else {
return Err(MempoolError::DuplicateTransaction { tx_hash });
}

let txs_from_account_entry = self.txs_by_account.entry(tx.sender_address).or_default();
match txs_from_account_entry.entry(tx.nonce) {
btree_map::Entry::Vacant(txs_from_account) => {
txs_from_account.insert(tx.into());
txs_from_account.insert((&tx).into());
}
btree_map::Entry::Occupied(_) => {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not \
appear in main mapping, but it appears in the account mapping"
);
return Err(MempoolError::DuplicateTransaction { tx_hash });
}
}

// Insert transaction to pool, if it is new.
if let hash_map::Entry::Vacant(entry) = self.tx_pool.entry(tx_hash) {
entry.insert(tx);
} else {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not \
appear in the account mapping, but it appears in the main mapping"
);
}

Ok(())
}

Expand Down

0 comments on commit eca4165

Please sign in to comment.