Skip to content

Commit

Permalink
refactor: update priority queue to use ThinPriorityTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
MohammadNassar1 committed Jun 12, 2024
1 parent 0f7462a commit ec0dae6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 60 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ workspace = true

[dependencies]
async-trait.workspace = true
derive_more.workspace = true
starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" }
starknet_api.workspace = true
starknet_mempool_types = { path = "../mempool_types", version = "0.0" }
Expand Down
60 changes: 27 additions & 33 deletions crates/mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod mempool_test;
pub struct Mempool {
// TODO: add docstring explaining visibility and coupling of the fields.
txs_queue: TransactionPriorityQueue,
// All transactions currently held in the mempool.
tx_store: TransactionStore,
state: HashMap<ContractAddress, AccountState>,
}
Expand All @@ -35,35 +36,25 @@ impl Mempool {
state: HashMap::default(),
};

mempool.txs_queue = TransactionPriorityQueue::from(
inputs
.into_iter()
.map(|input| {
// Attempts to insert a key-value pair into the mempool's state. Returns `None`
// if the key was not present, otherwise returns the old value while updating
// the new value.
let prev_value =
mempool.state.insert(input.account.sender_address, input.account.state);
assert!(
prev_value.is_none(),
"Sender address: {:?} already exists in the mempool. Can't add {:?} to \
the mempool.",
input.account.sender_address,
input.tx
);

// Insert the transaction into the tx_store.
let res = mempool.tx_store.push(input.tx.clone());
assert!(
res.is_ok(),
"Transaction: {:?} already exists in the mempool.",
input.tx.tx_hash
);

input.tx
})
.collect::<Vec<ThinTransaction>>(),
);
for MempoolInput { tx, account: Account { sender_address, state } } in inputs.into_iter() {
// Attempts to insert a key-value pair into the mempool's state. Returns `None`
// if the key was not present, otherwise returns the old value while updating
// the new value.
let existing_account_state = mempool.state.insert(sender_address, state);
assert!(
existing_account_state.is_none(),
"Sender address: {:?} already exists in the mempool. Can't add {:?} to the \
mempool.",
sender_address,
tx
);

// Insert the transaction into the tx_store.
let res = mempool.tx_store.push(tx.clone());
assert!(res.is_ok(), "Transaction: {:?} already exists in the mempool.", tx.tx_hash);

mempool.txs_queue.push(tx.clone().into());
}

mempool
}
Expand All @@ -78,10 +69,13 @@ impl Mempool {
// back. TODO: Consider renaming to `pop_txs` to be more consistent with the standard
// library.
pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult<Vec<ThinTransaction>> {
let txs = self.txs_queue.pop_last_chunk(n_txs);
for tx in &txs {
let pq_txs = self.txs_queue.pop_last_chunk(n_txs);

let mut txs: Vec<ThinTransaction> = Vec::default();
for pq_tx in &pq_txs {
let tx = self.tx_store.remove(&pq_tx.tx_hash)?;
self.state.remove(&tx.sender_address);
self.tx_store.remove(&tx.tx_hash)?;
txs.push(tx);
}

Ok(txs)
Expand All @@ -96,7 +90,7 @@ impl Mempool {
Vacant(entry) => {
entry.insert(account.state);
// TODO(Mohammad): use `handle_tx`.
self.txs_queue.push(tx.clone());
self.txs_queue.push(tx.clone().into());
self.tx_store.push(tx)?;

Ok(())
Expand Down
23 changes: 14 additions & 9 deletions crates/mempool/src/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use starknet_mempool_types::errors::MempoolError;
use starknet_mempool_types::mempool_types::ThinTransaction;

use crate::mempool::{Account, Mempool, MempoolInput};
use crate::priority_queue::PrioritizedTransaction;

/// Creates a valid input for mempool's `add_tx` with optional default value for
/// `sender_address`.
Expand Down Expand Up @@ -120,10 +121,10 @@ fn test_add_tx(mut mempool: Mempool) {
mempool.state.contains_key(&account2.sender_address);
mempool.state.contains_key(&account3.sender_address);

check_mempool_txs_eq(
assert!(check_mempool_txs_eq(
&mempool,
&[tx_tip_50_address_0, tx_tip_80_address_2, tx_tip_100_address_1],
)
&[tx_tip_50_address_0, tx_tip_80_address_2, tx_tip_100_address_1]
));
}

#[rstest]
Expand All @@ -137,15 +138,19 @@ fn test_add_same_tx(mut mempool: Mempool) {
Err(MempoolError::DuplicateTransaction { tx_hash: TransactionHash(StarkFelt::ONE) })
);
// Assert that the original tx remains in the pool after the failed attempt.
check_mempool_txs_eq(&mempool, &[tx])
assert!(check_mempool_txs_eq(&mempool, &[tx]));
}

// Asserts that the transactions in the mempool are in ascending order as per the expected
// transactions.
fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) {
fn check_mempool_txs_eq(mempool: &Mempool, expected_txs: &[ThinTransaction]) -> bool {
let mempool_txs = mempool.txs_queue.iter();
// Deref the inner mempool tx type.
expected_txs.iter().zip(mempool_txs).all(|(a, b)| *a == **b);

// Convert and compare transactions
expected_txs.iter().zip(mempool_txs).all(|(expected, actual)| {
let expected_converted: PrioritizedTransaction = expected.clone().into();
expected_converted == *actual
})
}

#[rstest]
Expand All @@ -162,7 +167,7 @@ fn test_add_tx_with_identical_tip_succeeds(mut mempool: Mempool) {

// TODO: currently hash comparison tie-breaks the two. Once more robust tie-breaks are added
// replace this assertion with a dedicated test.
check_mempool_txs_eq(&mempool, &[tx2, tx1]);
assert!(check_mempool_txs_eq(&mempool, &[tx2, tx1]));
}

#[rstest]
Expand All @@ -176,5 +181,5 @@ fn test_tip_priority_over_tx_hash(mut mempool: Mempool) {

assert!(mempool.add_tx(tx_big_tip_small_hash.clone(), account1).is_ok());
assert!(mempool.add_tx(tx_small_tip_big_hash.clone(), account2).is_ok());
check_mempool_txs_eq(&mempool, &[tx_big_tip_small_hash, tx_small_tip_big_hash])
assert!(check_mempool_txs_eq(&mempool, &[tx_small_tip_big_hash, tx_big_tip_small_hash]));
}
66 changes: 50 additions & 16 deletions crates/mempool/src/priority_queue.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,61 @@
use std::cmp::Ordering;
use std::collections::BTreeSet;
#[cfg(any(feature = "testing", test))]
use std::collections::btree_set::Iter;
use std::collections::{BTreeSet, HashMap};

use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::transaction::{Tip, TransactionHash};
use starknet_mempool_types::mempool_types::ThinTransaction;
// Assumption: for the MVP only one transaction from the same contract class can be in the mempool
// at a time. When this changes, saving the transactions themselves on the queu might no longer be
// appropriate, because we'll also need to stores transactions without indexing them. For example,
// transactions with future nonces will need to be stored, and potentially indexed on block commits.
#[derive(Clone, Debug, Default, derive_more::Deref, derive_more::DerefMut)]
pub struct TransactionPriorityQueue(BTreeSet<PrioritizedTransaction>);
#[derive(Clone, Debug, Default)]
pub struct TransactionPriorityQueue {
// Priority queue of transactions with associated priority.
queue: BTreeSet<PrioritizedTransaction>,
// FIX: Set of account addresses for efficient existence checks.
address_to_nonce: HashMap<ContractAddress, Nonce>,
}

impl TransactionPriorityQueue {
/// Adds a transaction to the mempool, ensuring unique keys.
/// Panics: if given a duplicate tx.
pub fn push(&mut self, tx: ThinTransaction) {
let mempool_tx = PrioritizedTransaction(tx);
assert!(self.insert(mempool_tx), "Keys should be unique; duplicates are checked prior.");
pub fn push(&mut self, tx: PrioritizedTransaction) {
self.address_to_nonce.insert(tx.address, tx.nonce);
assert!(self.queue.insert(tx), "Keys should be unique; duplicates are checked prior.");
}

// TODO(gilad): remove collect
pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec<ThinTransaction> {
(0..n_txs).filter_map(|_| self.pop_last().map(|tx| tx.0)).collect()
pub fn pop_last_chunk(&mut self, n_txs: usize) -> Vec<PrioritizedTransaction> {
let txs: Vec<PrioritizedTransaction> =
(0..n_txs).filter_map(|_| self.queue.pop_last()).collect();
for tx in txs.iter() {
self.address_to_nonce.remove(&tx.address);
}
txs
}

#[cfg(any(feature = "testing", test))]
pub fn iter(&self) -> Iter<'_, PrioritizedTransaction> {
self.queue.iter()
}
}

impl From<Vec<ThinTransaction>> for TransactionPriorityQueue {
fn from(transactions: Vec<ThinTransaction>) -> Self {
TransactionPriorityQueue(BTreeSet::from_iter(
transactions.into_iter().map(PrioritizedTransaction),
))
// TODO(Mohammad): delete once the mempool is used. It will be used in Mempool's
// `get_next_eligible_tx`.
#[allow(dead_code)]
pub fn get_nonce(&self, address: &ContractAddress) -> Option<&Nonce> {
self.address_to_nonce.get(address)
}
}

#[derive(Clone, Debug, derive_more::Deref, derive_more::From)]
pub struct PrioritizedTransaction(pub ThinTransaction);
#[derive(Clone, Debug, Default)]
pub struct PrioritizedTransaction {
pub address: ContractAddress,
pub nonce: Nonce,
pub tx_hash: TransactionHash,
pub tip: Tip,
}

/// Compare transactions based only on their tip, a uint, using the Eq trait. It ensures that two
/// tips are either exactly equal or not.
Expand All @@ -59,6 +82,17 @@ impl PartialOrd for PrioritizedTransaction {
}
}

impl From<ThinTransaction> for PrioritizedTransaction {
fn from(tx: ThinTransaction) -> Self {
PrioritizedTransaction {
address: tx.sender_address,
nonce: tx.nonce,
tx_hash: tx.tx_hash,
tip: tx.tip,
}
}
}

// TODO: remove when is used.
#[allow(dead_code)]
// Assumption: there are no gaps, and the transactions are received in order.
Expand Down

0 comments on commit ec0dae6

Please sign in to comment.