Skip to content

Commit

Permalink
feat(katana): retain transactions in pool until mined (#2630)
Browse files Browse the repository at this point in the history
Every block interval, the node would take transactions from the pool - removing it directly from the pool. this creates a small window (depending on the machine) that the transaction appears nonexistent. this is due to how the tx flows from the pool and executor. this applies for both instant and interval block production mode. 

For instant mining, the window is between tx being picked up from the pool and the tx being committed to db. while for interval, tx being picked up from the pool and the tx being [inserted into the pending block](https://github.com/dojoengine/dojo/blob/d09cbcffd8c8f2745770888f9d3f30d07b8555ae/crates/katana/executor/src/implementation/blockifier/mod.rs#L208).

When a tx is being queried thru the rpc, the node will first check if the it exist in the db, else find in the pending block (if interval mode). this pr adds a new (last) step, which is to try finding the tx in the pool if it doesn't exist anywhere else.
  • Loading branch information
kariy authored Nov 5, 2024
1 parent 24964ce commit 7bf9be3
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 110 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 43 additions & 32 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
// TODO: remove the messaging feature flag
// TODO: move the tasks to a separate module

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use block_producer::BlockProductionError;
use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::stream::StreamExt;
use katana_executor::ExecutorFactory;
use katana_pool::ordering::PoolOrd;
use katana_pool::pending::PendingTransactions;
use katana_pool::{TransactionPool, TxPool};
use katana_primitives::transaction::ExecutableTxWithHash;
use katana_primitives::Felt;
use tracing::{error, info};

use self::block_producer::BlockProducer;
Expand All @@ -30,24 +27,40 @@ pub(crate) const LOG_TARGET: &str = "node";
/// to construct a new block.
#[must_use = "BlockProductionTask does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct BlockProductionTask<EF: ExecutorFactory> {
pub struct BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
/// creates new blocks
pub(crate) block_producer: BlockProducer<EF>,
/// the miner responsible to select transactions from the `pool´
pub(crate) miner: TransactionMiner,
pub(crate) miner: TransactionMiner<O>,
/// the pool that holds all transactions
pub(crate) pool: TxPool,
/// Metrics for recording the service operations
metrics: BlockProducerMetrics,
}

impl<EF: ExecutorFactory> BlockProductionTask<EF> {
pub fn new(pool: TxPool, miner: TransactionMiner, block_producer: BlockProducer<EF>) -> Self {
impl<EF, O> BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pub fn new(
pool: TxPool,
miner: TransactionMiner<O>,
block_producer: BlockProducer<EF>,
) -> Self {
Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() }
}
}

impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
impl<EF, O> Future for BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
type Output = Result<(), BlockProductionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -65,6 +78,9 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
let steps_used = outcome.stats.cairo_steps_used;
this.metrics.l1_gas_processed_total.increment(gas_used as u64);
this.metrics.cairo_steps_processed_total.increment(steps_used as u64);

// remove mined transactions from the pool
this.pool.remove_transactions(&outcome.txs);
}

Err(error) => {
Expand All @@ -74,7 +90,7 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
}
}

if let Poll::Ready(pool_txs) = this.miner.poll(&this.pool, cx) {
if let Poll::Ready(pool_txs) = this.miner.poll(cx) {
// miner returned a set of transaction that we feed to the producer
this.block_producer.queue(pool_txs);
} else {
Expand All @@ -89,37 +105,32 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {

/// The type which takes the transaction from the pool and feeds them to the block producer.
#[derive(Debug)]
pub struct TransactionMiner {
/// stores whether there are pending transacions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready from the pool
rx: Fuse<Receiver<Felt>>,
pub struct TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pending_txs: PendingTransactions<ExecutableTxWithHash, O>,
}

impl TransactionMiner {
pub fn new(rx: Receiver<Felt>) -> Self {
Self { rx: rx.fuse(), has_pending_txs: None }
impl<O> TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pub fn new(pending_txs: PendingTransactions<ExecutableTxWithHash, O>) -> Self {
Self { pending_txs }
}

fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<Vec<ExecutableTxWithHash>> {
// drain the notification stream
while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Vec<ExecutableTxWithHash>> {
let mut transactions = Vec::new();

if self.has_pending_txs == Some(false) {
return Poll::Pending;
while let Poll::Ready(Some(tx)) = self.pending_txs.poll_next_unpin(cx) {
transactions.push(tx.tx.as_ref().clone());
}

// take all the transactions from the pool
let transactions =
pool.take_transactions().map(|tx| tx.tx.as_ref().clone()).collect::<Vec<_>>();

if transactions.is_empty() {
return Poll::Pending;
}

self.has_pending_txs = Some(false);
Poll::Ready(transactions)
}
}
7 changes: 3 additions & 4 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
}

fn run_block_production(&self) -> TaskHandle<Result<(), BlockProductionError>> {
let pool = self.pool.clone();
let miner = TransactionMiner::new(pool.add_listener());
// Create a new transaction miner with a subscription to the pool's pending transactions.
let miner = TransactionMiner::new(self.pool.pending_transactions());
let block_producer = self.block_producer.clone();

let service = BlockProductionTask::new(pool, miner, block_producer);
let service = BlockProductionTask::new(self.pool.clone(), miner, block_producer);
self.task_spawner.build_task().name("Block production").spawn(service)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/katana/pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ katana-primitives.workspace = true
katana-provider.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "sync" ] }
tracing.workspace = true

[dev-dependencies]
futures-util.workspace = true
rand.workspace = true
tokio.workspace = true
14 changes: 10 additions & 4 deletions crates/katana/pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod ordering;
pub mod pending;
pub mod pool;
pub mod subscription;
pub mod tx;
pub mod validation;

Expand All @@ -10,8 +12,9 @@ use std::sync::Arc;
use futures::channel::mpsc::Receiver;
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash};
use ordering::{FiFo, PoolOrd};
use pending::PendingTransactions;
use pool::Pool;
use tx::{PendingTx, PoolTransaction};
use tx::PoolTransaction;
use validation::error::InvalidTransactionError;
use validation::stateful::TxValidator;
use validation::Validator;
Expand Down Expand Up @@ -44,9 +47,9 @@ pub trait TransactionPool {
/// Add a new transaction to the pool.
fn add_transaction(&self, tx: Self::Transaction) -> PoolResult<TxHash>;

fn take_transactions(
&self,
) -> impl Iterator<Item = PendingTx<Self::Transaction, Self::Ordering>>;
/// Returns a [`Stream`](futures::Stream) which yields pending transactions - transactions that
/// can be executed - from the pool.
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering>;

/// Check if the pool contains a transaction with the given hash.
fn contains(&self, hash: TxHash) -> bool;
Expand All @@ -56,6 +59,9 @@ pub trait TransactionPool {

fn add_listener(&self) -> Receiver<TxHash>;

/// Removes a list of transactions from the pool according to their hashes.
fn remove_transactions(&self, hashes: &[TxHash]);

/// Get the total number of transactions in the pool.
fn size(&self) -> usize;

Expand Down
66 changes: 38 additions & 28 deletions crates/katana/pool/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,16 @@ impl<T> Default for TipOrdering<T> {
#[cfg(test)]
mod tests {

use futures::StreamExt;

use crate::ordering::{self, FiFo};
use crate::pool::test_utils::*;
use crate::tx::PoolTransaction;
use crate::validation::NoopValidator;
use crate::{Pool, TransactionPool};

#[test]
fn fifo_ordering() {
#[tokio::test]
async fn fifo_ordering() {
// Create mock transactions
let txs = [PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new()];

Expand All @@ -145,16 +147,17 @@ mod tests {
});

// Get pending transactions
let pendings = pool.take_transactions().collect::<Vec<_>>();
let mut pendings = pool.pending_transactions();

// Assert that the transactions are in the order they were added (first to last)
pendings.iter().zip(txs).for_each(|(pending, tx)| {
for tx in txs {
let pending = pendings.next().await.unwrap();
assert_eq!(pending.tx.as_ref(), &tx);
});
}
}

#[test]
fn tip_based_ordering() {
#[tokio::test]
async fn tip_based_ordering() {
// Create mock transactions with different tips and in random order
let txs = [
PoolTx::new().with_tip(2),
Expand All @@ -176,36 +179,43 @@ mod tests {
let _ = pool.add_transaction(tx.clone());
});

// Get pending transactions
let pending = pool.take_transactions().collect::<Vec<_>>();
assert_eq!(pending.len(), txs.len());
let mut pending = pool.pending_transactions();

// Assert that the transactions are ordered by tip (highest to lowest)
assert_eq!(pending[0].tx.tip(), 7);
assert_eq!(pending[0].tx.hash(), txs[8].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 7);
assert_eq!(tx.tx.hash(), txs[8].hash());

assert_eq!(pending[1].tx.tip(), 6);
assert_eq!(pending[1].tx.hash(), txs[2].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 6);
assert_eq!(tx.tx.hash(), txs[2].hash());

assert_eq!(pending[2].tx.tip(), 5);
assert_eq!(pending[2].tx.hash(), txs[6].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 5);
assert_eq!(tx.tx.hash(), txs[6].hash());

assert_eq!(pending[3].tx.tip(), 4);
assert_eq!(pending[3].tx.hash(), txs[7].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 4);
assert_eq!(tx.tx.hash(), txs[7].hash());

assert_eq!(pending[4].tx.tip(), 3);
assert_eq!(pending[4].tx.hash(), txs[3].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 3);
assert_eq!(tx.tx.hash(), txs[3].hash());

assert_eq!(pending[5].tx.tip(), 2);
assert_eq!(pending[5].tx.hash(), txs[0].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[0].hash());

assert_eq!(pending[6].tx.tip(), 2);
assert_eq!(pending[6].tx.hash(), txs[4].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[4].hash());

assert_eq!(pending[7].tx.tip(), 2);
assert_eq!(pending[7].tx.hash(), txs[5].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[5].hash());

assert_eq!(pending[8].tx.tip(), 1);
assert_eq!(pending[8].tx.hash(), txs[1].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 1);
assert_eq!(tx.tx.hash(), txs[1].hash());
}
}
Loading

0 comments on commit 7bf9be3

Please sign in to comment.