Skip to content

Commit

Permalink
fix subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Nov 5, 2024
1 parent 942f6f1 commit bc83f1a
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 5 additions & 35 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
// TODO: remove the messaging feature flag
// TODO: move the tasks to a separate module

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use block_producer::BlockProductionError;
use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::stream::StreamExt;
use katana_executor::ExecutorFactory;
use katana_pool::ordering::PoolOrd;
use katana_pool::pending::PendingTransactions;
use katana_pool::{TransactionPool, TxPool};
use katana_primitives::transaction::ExecutableTxWithHash;
use katana_primitives::Felt;
use tracing::{error, info};

use self::block_producer::BlockProducer;
Expand Down Expand Up @@ -114,53 +109,28 @@ pub struct TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
/// stores whether there are pending transacions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready from the pool
rx: Fuse<Receiver<Felt>>,

pending_txs: PendingTransactions<ExecutableTxWithHash, O>,
}

impl<O> TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pub fn new(
pending_txs: PendingTransactions<ExecutableTxWithHash, O>,
rx: Receiver<Felt>,
) -> Self {
Self { pending_txs, rx: rx.fuse(), has_pending_txs: None }
pub fn new(pending_txs: PendingTransactions<ExecutableTxWithHash, O>) -> Self {
Self { pending_txs }
}

fn poll(
&mut self,
// pool: &TxPool,
cx: &mut Context<'_>,
) -> Poll<Vec<ExecutableTxWithHash>> {
// drain the notification stream
while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}

if self.has_pending_txs == Some(false) {
return Poll::Pending;
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Vec<ExecutableTxWithHash>> {
let mut transactions = Vec::new();

while let Poll::Ready(Some(tx)) = self.pending_txs.poll_next_unpin(cx) {
transactions.push(tx.tx.as_ref().clone());
}

// take all the transactions from the pool
// let transactions =
// pool.take_transactions().map(|tx| tx.tx.as_ref().clone()).collect::<Vec<_>>();

if transactions.is_empty() {
return Poll::Pending;
}

self.has_pending_txs = Some(false);
Poll::Ready(transactions)
}
}
7 changes: 3 additions & 4 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
}

fn run_block_production(&self) -> TaskHandle<Result<(), BlockProductionError>> {
let pool = self.pool.clone();
let miner = TransactionMiner::new(pool.pending_transactions(), pool.add_listener());
// Create a new transaction miner with a subscription to the pool's pending transactions.
let miner = TransactionMiner::new(self.pool.pending_transactions());
let block_producer = self.block_producer.clone();

let service = BlockProductionTask::new(pool, miner, block_producer);
let service = BlockProductionTask::new(self.pool.clone(), miner, block_producer);
self.task_spawner.build_task().name("Block production").spawn(service)
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/katana/pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ tokio = { workspace = true, features = [ "sync" ] }
tracing.workspace = true

[dev-dependencies]
futures-util.workspace = true
rand.workspace = true
3 changes: 3 additions & 0 deletions crates/katana/pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub trait TransactionPool {
/// Add a new transaction to the pool.
fn add_transaction(&self, tx: Self::Transaction) -> PoolResult<TxHash>;

/// Returns a [`Stream`](futures::Stream) which yields pending transactions - transactions that
/// can be executed - from the pool.
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering>;

/// Check if the pool contains a transaction with the given hash.
Expand All @@ -57,6 +59,7 @@ pub trait TransactionPool {

fn add_listener(&self) -> Receiver<TxHash>;

/// Removes a list of transactions from the pool according to their hashes.
fn remove_transactions(&self, hashes: &[TxHash]);

/// Get the total number of transactions in the pool.
Expand Down
8 changes: 5 additions & 3 deletions crates/katana/pool/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl<T> Default for TipOrdering<T> {
#[cfg(test)]
mod tests {

use futures::executor;

use crate::ordering::{self, FiFo};
use crate::pool::test_utils::*;
use crate::tx::PoolTransaction;
Expand All @@ -145,10 +147,10 @@ mod tests {
});

// Get pending transactions
let pendings = pool.pending_transactions().collect::<Vec<_>>();
let pendings = executor::block_on_stream(pool.pending_transactions());

// Assert that the transactions are in the order they were added (first to last)
pendings.iter().zip(txs).for_each(|(pending, tx)| {
pendings.into_iter().zip(txs).for_each(|(pending, tx)| {
assert_eq!(pending.tx.as_ref(), &tx);
});
}
Expand Down Expand Up @@ -177,7 +179,7 @@ mod tests {
});

// Get pending transactions
let pending = pool.pending_transactions().collect::<Vec<_>>();
let pending = executor::block_on_stream(pool.pending_transactions()).collect::<Vec<_>>();
assert_eq!(pending.len(), txs.len());

// Assert that the transactions are ordered by tip (highest to lowest)
Expand Down
116 changes: 104 additions & 12 deletions crates/katana/pool/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ use std::task::{Context, Poll};
use futures::{Stream, StreamExt};

use crate::ordering::PoolOrd;
use crate::subscription::PoolSubscription;
use crate::subscription::Subscription;
use crate::tx::{PendingTx, PoolTransaction};

/// an iterator that yields transactions from the pool that can be included in a block, sorted by
/// An iterator that yields transactions from the pool that can be included in a block, sorted by
/// by its priority.
#[derive(Debug)]
pub struct PendingTransactions<T, O: PoolOrd> {
/// Iterator over all the pending transactions at the time of the creation of this struct.
pub(crate) all: IntoIter<PendingTx<T, O>>,
pub(crate) subscription: PoolSubscription<T, O>,
/// Subscription to the pool to get notified when new transactions are added. This is used to
/// wait on the new transactions after exhausting the `all` iterator.
pub(crate) subscription: Subscription<T, O>,
}

impl<T, O> Stream for PendingTransactions<T, O>
Expand All @@ -25,7 +28,6 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(tx) = this.all.next() {
Poll::Ready(Some(tx))
} else {
Expand All @@ -34,14 +36,104 @@ where
}
}

impl<T, O> Iterator for PendingTransactions<T, O>
where
T: PoolTransaction,
O: PoolOrd<Transaction = T>,
{
type Item = PendingTx<T, O>;
#[cfg(test)]
mod tests {

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use futures::StreamExt;
use tokio::task::yield_now;

use crate::pool::test_utils::PoolTx;
use crate::pool::Pool;
use crate::validation::NoopValidator;
use crate::{ordering, PoolTransaction, TransactionPool};

#[tokio::test]
async fn pending_transactions() {
let pool = Pool::new(NoopValidator::<PoolTx>::new(), ordering::FiFo::new());

let first_batch = [
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
];

for tx in &first_batch {
pool.add_transaction(tx.clone()).expect("failed to add tx");
}

let mut pendings = pool.pending_transactions();

// exhaust all the first batch transactions
for expected in &first_batch {
let actual = pendings.next().await.map(|t| t.tx).unwrap();
assert_eq!(expected, actual.as_ref());
}

fn next(&mut self) -> Option<Self::Item> {
self.all.next()
let second_batch = [
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
PoolTx::new(),
];

for tx in &second_batch {
pool.add_transaction(tx.clone()).expect("failed to add tx");
}

// exhaust all the first batch transactions
for expected in &second_batch {
let actual = pendings.next().await.map(|t| t.tx).unwrap();
assert_eq!(expected, actual.as_ref());
}

// Check that all the added transaction is still in the pool because we haven't removed it
// yet.
let all = [first_batch, second_batch].concat();
for tx in all {
assert!(pool.contains(tx.hash()));
}
}

#[tokio::test(flavor = "multi_thread")]
async fn subscription_stream_wakeup() {
let pool = Pool::new(NoopValidator::<PoolTx>::new(), ordering::FiFo::new());
let mut pending = pool.pending_transactions();

// Spawn a task that will add a transaction after a delay
let pool_clone = pool.clone();

let txs = [PoolTx::new(), PoolTx::new(), PoolTx::new()];
let txs_clone = txs.clone();

let has_polled_once = Arc::new(AtomicBool::new(false));
let has_polled_once_clone = has_polled_once.clone();

tokio::spawn(async move {
while !has_polled_once_clone.load(Ordering::SeqCst) {
yield_now().await;
}

for tx in txs_clone {
pool_clone.add_transaction(tx).expect("failed to add tx");
}
});

// Check that first poll_next returns Pending because no pending transaction has been added
// to the pool yet
assert!(futures_util::poll!(pending.next()).is_pending());
has_polled_once.store(true, Ordering::SeqCst);

for tx in txs {
let received = pending.next().await.unwrap();
assert_eq!(&tx, received.tx.as_ref());
}
}
}
Loading

0 comments on commit bc83f1a

Please sign in to comment.