From 64992c9ce2ca941ccdde91acea222725f13ed7f7 Mon Sep 17 00:00:00 2001 From: Tarrence van As Date: Tue, 20 Feb 2024 05:36:09 -0500 Subject: [PATCH] feat(katana): add `torii_getTransactions` rpc (#1529) --- Cargo.lock | 2 + bin/katana/src/args.rs | 2 +- crates/dojo-test-utils/src/sequencer.rs | 2 +- .../katana/core/src/service/block_producer.rs | 129 ++++++++-- crates/katana/executor/Cargo.toml | 1 + .../katana/executor/src/blockifier/outcome.rs | 1 + crates/katana/rpc/rpc-api/src/lib.rs | 2 + crates/katana/rpc/rpc-api/src/torii.rs | 11 + crates/katana/rpc/rpc-types/Cargo.toml | 3 +- crates/katana/rpc/rpc-types/src/error/mod.rs | 1 + .../katana/rpc/rpc-types/src/error/torii.rs | 70 +++++ .../katana/rpc/rpc-types/src/transaction.rs | 14 + crates/katana/rpc/rpc/Cargo.toml | 2 + crates/katana/rpc/rpc/src/lib.rs | 6 + crates/katana/rpc/rpc/src/torii.rs | 208 +++++++++++++++ crates/katana/rpc/rpc/tests/common/mod.rs | 35 +++ crates/katana/rpc/rpc/tests/starknet.rs | 40 +-- crates/katana/rpc/rpc/tests/torii.rs | 239 ++++++++++++++++++ 18 files changed, 716 insertions(+), 52 deletions(-) create mode 100644 crates/katana/rpc/rpc-api/src/torii.rs create mode 100644 crates/katana/rpc/rpc-types/src/error/torii.rs create mode 100644 crates/katana/rpc/rpc/src/torii.rs create mode 100644 crates/katana/rpc/rpc/tests/common/mod.rs create mode 100644 crates/katana/rpc/rpc/tests/torii.rs diff --git a/Cargo.lock b/Cargo.lock index 0038279ed1..dd3aa6e355 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6617,6 +6617,7 @@ dependencies = [ "anyhow", "blockifier", "convert_case 0.6.0", + "futures", "katana-primitives", "katana-provider", "parking_lot 0.12.1", @@ -6728,6 +6729,7 @@ dependencies = [ "anyhow", "derive_more", "ethers", + "futures", "jsonrpsee 0.16.3", "katana-core", "katana-primitives", diff --git a/bin/katana/src/args.rs b/bin/katana/src/args.rs index e5c062f90c..940d4f75b5 100644 --- a/bin/katana/src/args.rs +++ b/bin/katana/src/args.rs @@ -212,7 +212,7 @@ impl KatanaArgs { } pub fn server_config(&self) -> ServerConfig { - let mut apis = vec![ApiKind::Starknet, ApiKind::Katana]; + let mut apis = vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Torii]; // only enable `katana` API in dev mode if self.dev { apis.push(ApiKind::Dev); diff --git a/crates/dojo-test-utils/src/sequencer.rs b/crates/dojo-test-utils/src/sequencer.rs index f675eceb75..9e006c0e59 100644 --- a/crates/dojo-test-utils/src/sequencer.rs +++ b/crates/dojo-test-utils/src/sequencer.rs @@ -43,7 +43,7 @@ impl TestSequencer { port: 0, host: "127.0.0.1".into(), max_connections: 100, - apis: vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Dev], + apis: vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Torii, ApiKind::Dev], }, ) .await diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 53d9714373..30a67fc5b2 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; +use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use katana_executor::blockifier::outcome::TxReceiptWithExecInfo; @@ -24,7 +25,7 @@ use katana_provider::traits::env::BlockEnvProvider; use katana_provider::traits::state::StateFactoryProvider; use parking_lot::RwLock; use tokio::time::{interval_at, Instant, Interval}; -use tracing::trace; +use tracing::{trace, warn}; use crate::backend::Backend; @@ -42,6 +43,9 @@ type ServiceFuture = Pin + Send + Sync>>; type BlockProductionResult = Result; type BlockProductionFuture = ServiceFuture; +type BlockProductionWithTxnsFuture = + ServiceFuture, MinedBlockOutcome), BlockProductionError>>; +pub type TxWithHashAndReceiptPair = (TxWithHash, Receipt); /// The type which responsible for block production. #[must_use = "BlockProducer does nothing unless polled"] @@ -161,6 +165,8 @@ pub struct IntervalBlockProducer { queued: VecDeque>, /// The state of the pending block after executing all the transactions within the interval. state: Arc, + /// Listeners notified when a new executed tx is added. + tx_execution_listeners: RwLock>>>, } impl IntervalBlockProducer { @@ -185,6 +191,7 @@ impl IntervalBlockProducer { block_mining: None, interval: Some(interval), queued: VecDeque::default(), + tx_execution_listeners: RwLock::new(vec![]), } } @@ -198,7 +205,14 @@ impl IntervalBlockProducer { ) -> Self { let state = Arc::new(PendingState::new(db, block_exec_envs.0, block_exec_envs.1)); - Self { state, backend, interval: None, block_mining: None, queued: VecDeque::default() } + Self { + state, + backend, + interval: None, + block_mining: None, + queued: VecDeque::default(), + tx_execution_listeners: RwLock::new(vec![]), + } } pub fn state(&self) -> Arc { @@ -267,7 +281,41 @@ impl IntervalBlockProducer { .collect::>() }; - self.state.executed_txs.write().extend(results); + self.state.executed_txs.write().extend(results.clone()); + self.notify_listener(results.into_iter().map(|(tx, info)| (tx, info.receipt)).collect()); + } + + pub fn add_listener(&self) -> Receiver> { + const TX_LISTENER_BUFFER_SIZE: usize = 2048; + let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); + self.tx_execution_listeners.write().push(tx); + rx + } + + /// notifies all listeners about the transaction + fn notify_listener(&self, txs: Vec) { + let mut listener = self.tx_execution_listeners.write(); + // this is basically a retain but with mut reference + for n in (0..listener.len()).rev() { + let mut listener_tx = listener.swap_remove(n); + let retain = match listener_tx.try_send(txs.clone()) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + warn!( + target: "miner", + "failed to send new txs notification because channel is full", + ); + true + } else { + false + } + } + }; + if retain { + listener.push(listener_tx) + } + } } fn outcome(&self) -> StateUpdatesWithDeclaredClasses { @@ -320,14 +368,21 @@ pub struct InstantBlockProducer { /// Holds the backend if no block is being mined backend: Arc, /// Single active future that mines a new block - block_mining: Option, + block_mining: Option, /// Backlog of sets of transactions ready to be mined queued: VecDeque>, + /// Listeners notified when a new executed tx is added. + tx_execution_listeners: RwLock>>>, } impl InstantBlockProducer { pub fn new(backend: Arc) -> Self { - Self { backend, block_mining: None, queued: VecDeque::default() } + Self { + backend, + block_mining: None, + queued: VecDeque::default(), + tx_execution_listeners: RwLock::new(vec![]), + } } pub fn force_mine(&mut self) { @@ -342,7 +397,7 @@ impl InstantBlockProducer { fn do_mine( backend: Arc, transactions: Vec, - ) -> Result { + ) -> Result<(Vec, MinedBlockOutcome), BlockProductionError> { trace!(target: "miner", "creating new block"); let provider = backend.blockchain.provider(); @@ -359,7 +414,7 @@ impl InstantBlockProducer { let txs = transactions.iter().map(TxWithHash::from); - let tx_receipt_pairs: Vec<(TxWithHash, Receipt)> = TransactionExecutor::new( + let tx_receipt_pairs: Vec = TransactionExecutor::new( &state, &block_context, !backend.config.disable_fee, @@ -372,8 +427,8 @@ impl InstantBlockProducer { .zip(txs) .filter_map(|(res, tx)| { if let Ok(info) = res { - let receipt = TxReceiptWithExecInfo::new(&tx, info); - Some((tx, receipt.receipt)) + let info = TxReceiptWithExecInfo::new(&tx, info); + Some((tx, info.receipt)) } else { None } @@ -382,19 +437,52 @@ impl InstantBlockProducer { let outcome = backend.do_mine_block( &block_env, - tx_receipt_pairs, + tx_receipt_pairs.clone(), get_state_update_from_cached_state(&state), )?; trace!(target: "miner", "created new block: {}", outcome.block_number); - Ok(outcome) + Ok((tx_receipt_pairs, outcome)) + } + + pub fn add_listener(&self) -> Receiver> { + const TX_LISTENER_BUFFER_SIZE: usize = 2048; + let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); + self.tx_execution_listeners.write().push(tx); + rx + } + + /// notifies all listeners about the transaction + fn notify_listener(&self, txs: Vec) { + let mut listener = self.tx_execution_listeners.write(); + // this is basically a retain but with mut reference + for n in (0..listener.len()).rev() { + let mut listener_tx = listener.swap_remove(n); + let retain = match listener_tx.try_send(txs.clone()) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + warn!( + target: "miner", + "failed to send new txs notification because channel is full", + ); + true + } else { + false + } + } + }; + if retain { + listener.push(listener_tx) + } + } } } impl Stream for InstantBlockProducer { // mined block outcome and the new state - type Item = BlockProductionResult; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let pin = self.get_mut(); @@ -410,10 +498,19 @@ impl Stream for InstantBlockProducer { // poll the mining future if let Some(mut mining) = pin.block_mining.take() { - if let Poll::Ready(outcome) = mining.poll_unpin(cx) { - return Poll::Ready(Some(outcome)); - } else { - pin.block_mining = Some(mining) + match mining.poll_unpin(cx) { + Poll::Ready(Ok((txs, outcome))) => { + pin.notify_listener(txs); + return Poll::Ready(Some(Ok(outcome))); + } + + Poll::Ready(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + + Poll::Pending => { + pin.block_mining = Some(mining); + } } } diff --git a/crates/katana/executor/Cargo.toml b/crates/katana/executor/Cargo.toml index 10240980a7..a862e7a71a 100644 --- a/crates/katana/executor/Cargo.toml +++ b/crates/katana/executor/Cargo.toml @@ -12,6 +12,7 @@ katana-provider = { path = "../storage/provider" } anyhow.workspace = true convert_case.workspace = true +futures.workspace = true parking_lot.workspace = true starknet.workspace = true tracing.workspace = true diff --git a/crates/katana/executor/src/blockifier/outcome.rs b/crates/katana/executor/src/blockifier/outcome.rs index d3cfe82581..eb87e0b82f 100644 --- a/crates/katana/executor/src/blockifier/outcome.rs +++ b/crates/katana/executor/src/blockifier/outcome.rs @@ -9,6 +9,7 @@ use katana_primitives::transaction::Tx; use super::utils::{events_from_exec_info, l2_to_l1_messages_from_exec_info}; +#[derive(Clone)] pub struct TxReceiptWithExecInfo { pub receipt: Receipt, pub execution_info: TransactionExecutionInfo, diff --git a/crates/katana/rpc/rpc-api/src/lib.rs b/crates/katana/rpc/rpc-api/src/lib.rs index c027a5dd79..6f381ac7be 100644 --- a/crates/katana/rpc/rpc-api/src/lib.rs +++ b/crates/katana/rpc/rpc-api/src/lib.rs @@ -1,11 +1,13 @@ pub mod dev; pub mod katana; pub mod starknet; +pub mod torii; /// List of APIs supported by Katana. #[derive(Debug, Copy, Clone)] pub enum ApiKind { Starknet, Katana, + Torii, Dev, } diff --git a/crates/katana/rpc/rpc-api/src/torii.rs b/crates/katana/rpc/rpc-api/src/torii.rs new file mode 100644 index 0000000000..272497041a --- /dev/null +++ b/crates/katana/rpc/rpc-api/src/torii.rs @@ -0,0 +1,11 @@ +use jsonrpsee::core::RpcResult; +use jsonrpsee::proc_macros::rpc; +use katana_rpc_types::transaction::{TransactionsPage, TransactionsPageCursor}; + +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "torii"))] +#[cfg_attr(feature = "client", rpc(client, server, namespace = "torii"))] +pub trait ToriiApi { + #[method(name = "getTransactions")] + async fn get_transactions(&self, cursor: TransactionsPageCursor) + -> RpcResult; +} diff --git a/crates/katana/rpc/rpc-types/Cargo.toml b/crates/katana/rpc/rpc-types/Cargo.toml index cc1542b1eb..4bff449aaa 100644 --- a/crates/katana/rpc/rpc-types/Cargo.toml +++ b/crates/katana/rpc/rpc-types/Cargo.toml @@ -11,9 +11,10 @@ katana-core = { path = "../../core" } katana-primitives = { path = "../../primitives" } katana-provider = { path = "../../storage/provider" } -ethers = "2.0.11" anyhow.workspace = true derive_more.workspace = true +ethers = "2.0.11" +futures.workspace = true jsonrpsee = { workspace = true, features = [ "macros", "server" ] } serde.workspace = true serde_with.workspace = true diff --git a/crates/katana/rpc/rpc-types/src/error/mod.rs b/crates/katana/rpc/rpc-types/src/error/mod.rs index 608feeefd5..4ba97df32c 100644 --- a/crates/katana/rpc/rpc-types/src/error/mod.rs +++ b/crates/katana/rpc/rpc-types/src/error/mod.rs @@ -1,2 +1,3 @@ pub mod katana; pub mod starknet; +pub mod torii; diff --git a/crates/katana/rpc/rpc-types/src/error/torii.rs b/crates/katana/rpc/rpc-types/src/error/torii.rs new file mode 100644 index 0000000000..8beeb260c4 --- /dev/null +++ b/crates/katana/rpc/rpc-types/src/error/torii.rs @@ -0,0 +1,70 @@ +use futures::channel::mpsc::Receiver; +use jsonrpsee::core::Error; +use jsonrpsee::types::error::CallError; +use jsonrpsee::types::ErrorObject; +use katana_core::sequencer_error::SequencerError; +use katana_primitives::receipt::Receipt; +use katana_primitives::transaction::TxWithHash; +use katana_provider::error::ProviderError; + +use crate::transaction::TransactionsPageCursor; + +#[derive(Debug, thiserror::Error)] +#[repr(i32)] +pub enum ToriiApiError { + #[error("Block not found")] + BlockNotFound, + #[error("Transaction index out of bounds")] + TransactionOutOfBounds, + #[error("Transaction not found")] + TransactionNotFound, + #[error("Transaction receipt not found")] + TransactionReceiptNotFound, + #[error("Transactions not ready")] + TransactionsNotReady { + rx: Receiver>, + cursor: TransactionsPageCursor, + }, + #[error("Long poll expired")] + ChannelDisconnected, + #[error("An unexpected error occured: {reason}")] + UnexpectedError { reason: String }, +} + +impl ToriiApiError { + fn code(&self) -> i32 { + match self { + ToriiApiError::BlockNotFound => 24, + ToriiApiError::TransactionOutOfBounds => 34, + ToriiApiError::TransactionNotFound => 35, + ToriiApiError::TransactionReceiptNotFound => 36, + ToriiApiError::TransactionsNotReady { .. } => 37, + ToriiApiError::ChannelDisconnected => 42, + ToriiApiError::UnexpectedError { .. } => 63, + } + } +} + +impl From for ToriiApiError { + fn from(value: ProviderError) -> Self { + ToriiApiError::UnexpectedError { reason: value.to_string() } + } +} + +impl From for ToriiApiError { + fn from(value: SequencerError) -> Self { + match value { + SequencerError::BlockNotFound(_) => ToriiApiError::BlockNotFound, + err => ToriiApiError::UnexpectedError { reason: err.to_string() }, + } + } +} + +impl From for Error { + fn from(err: ToriiApiError) -> Self { + let code = err.code(); + let message = err.to_string(); + let err = ErrorObject::owned(code, message, None::<()>); + Error::Call(CallError::Custom(err)) + } +} diff --git a/crates/katana/rpc/rpc-types/src/transaction.rs b/crates/katana/rpc/rpc-types/src/transaction.rs index a7445ffcaf..6cb30fdbe6 100644 --- a/crates/katana/rpc/rpc-types/src/transaction.rs +++ b/crates/katana/rpc/rpc-types/src/transaction.rs @@ -21,6 +21,8 @@ use starknet::core::types::{ }; use starknet::core::utils::get_contract_address; +use crate::receipt::MaybePendingTxReceipt; + #[derive(Debug, Clone, Serialize, Deserialize, Deref)] #[serde(transparent)] pub struct BroadcastedInvokeTx(BroadcastedInvokeTransaction); @@ -306,3 +308,15 @@ impl From for DeployAccountTx { } } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransactionsPageCursor { + pub block_number: u64, + pub transaction_index: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransactionsPage { + pub transactions: Vec<(TxWithHash, MaybePendingTxReceipt)>, + pub cursor: TransactionsPageCursor, +} diff --git a/crates/katana/rpc/rpc/Cargo.toml b/crates/katana/rpc/rpc/Cargo.toml index 99051c2006..2feade480e 100644 --- a/crates/katana/rpc/rpc/Cargo.toml +++ b/crates/katana/rpc/rpc/Cargo.toml @@ -37,4 +37,6 @@ tracing.workspace = true [dev-dependencies] assert_matches = "1.5.0" dojo-test-utils = { path = "../../../dojo-test-utils" } +jsonrpsee = { version = "0.16.2", features = [ "client" ] } +katana-rpc-api = { workspace = true, features = [ "client" ] } url.workspace = true diff --git a/crates/katana/rpc/rpc/src/lib.rs b/crates/katana/rpc/rpc/src/lib.rs index ff5b57092c..2a536b8aa2 100644 --- a/crates/katana/rpc/rpc/src/lib.rs +++ b/crates/katana/rpc/rpc/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod dev; pub mod katana; pub mod starknet; +pub mod torii; use std::net::SocketAddr; use std::sync::Arc; @@ -20,12 +21,14 @@ use katana_core::sequencer::KatanaSequencer; use katana_rpc_api::dev::DevApiServer; use katana_rpc_api::katana::KatanaApiServer; use katana_rpc_api::starknet::StarknetApiServer; +use katana_rpc_api::torii::ToriiApiServer; use katana_rpc_api::ApiKind; use tower_http::cors::{Any, CorsLayer}; use crate::dev::DevApi; use crate::katana::KatanaApi; use crate::starknet::StarknetApi; +use crate::torii::ToriiApi; pub async fn spawn(sequencer: Arc, config: ServerConfig) -> Result { let mut methods = RpcModule::new(()); @@ -42,6 +45,9 @@ pub async fn spawn(sequencer: Arc, config: ServerConfig) -> Res ApiKind::Dev => { methods.merge(DevApi::new(sequencer.clone()).into_rpc())?; } + ApiKind::Torii => { + methods.merge(ToriiApi::new(sequencer.clone()).into_rpc())?; + } } } diff --git a/crates/katana/rpc/rpc/src/torii.rs b/crates/katana/rpc/rpc/src/torii.rs new file mode 100644 index 0000000000..a3a253f4af --- /dev/null +++ b/crates/katana/rpc/rpc/src/torii.rs @@ -0,0 +1,208 @@ +use std::sync::Arc; + +use futures::StreamExt; +use jsonrpsee::core::{async_trait, RpcResult}; +use katana_core::sequencer::KatanaSequencer; +use katana_core::service::block_producer::BlockProducerMode; +use katana_primitives::block::BlockHashOrNumber; +use katana_provider::traits::transaction::TransactionProvider; +use katana_rpc_api::torii::ToriiApiServer; +use katana_rpc_types::error::torii::ToriiApiError; +use katana_rpc_types::receipt::{MaybePendingTxReceipt, PendingTxReceipt}; +use katana_rpc_types::transaction::{TransactionsPage, TransactionsPageCursor}; +use katana_rpc_types_builder::ReceiptBuilder; +use katana_tasks::TokioTaskSpawner; + +const MAX_PAGE_SIZE: usize = 100; + +#[derive(Clone)] +pub struct ToriiApi { + sequencer: Arc, +} + +impl ToriiApi { + pub fn new(sequencer: Arc) -> Self { + Self { sequencer } + } + + async fn on_io_blocking_task(&self, func: F) -> T + where + F: FnOnce(Self) -> T + Send + 'static, + T: Send + 'static, + { + let this = self.clone(); + TokioTaskSpawner::new().unwrap().spawn_blocking(move || func(this)).await.unwrap() + } +} + +#[async_trait] +impl ToriiApiServer for ToriiApi { + async fn get_transactions( + &self, + cursor: TransactionsPageCursor, + ) -> RpcResult { + match self + .on_io_blocking_task(move |this| { + let mut transactions = Vec::new(); + let mut next_cursor = cursor.clone(); + + let provider = this.sequencer.backend.blockchain.provider(); + let latest_block_number = + this.sequencer.block_number().map_err(ToriiApiError::from)?; + + if cursor.block_number > latest_block_number + 1 { + return Err(ToriiApiError::BlockNotFound); + } + + if latest_block_number >= cursor.block_number { + for block_number in cursor.block_number..=latest_block_number { + let mut block_transactions = provider + .transactions_by_block(BlockHashOrNumber::Num(block_number)) + .map_err(ToriiApiError::from)? + .ok_or(ToriiApiError::BlockNotFound)?; + + // If the block_number is the cursor block, slice the transactions from the + // txn offset + if block_number == cursor.block_number { + block_transactions = block_transactions + .into_iter() + .skip(cursor.transaction_index as usize) + .collect(); + } + + let block_transactions = block_transactions + .into_iter() + .map(|tx| { + let receipt = ReceiptBuilder::new(tx.hash, provider) + .build() + .expect("Receipt should exist for tx") + .expect("Receipt should exist for tx"); + (tx, MaybePendingTxReceipt::Receipt(receipt)) + }) + .collect::>(); + + // Add transactions to the total and break if MAX_PAGE_SIZE is reached + for transaction in block_transactions { + transactions.push(transaction); + if transactions.len() >= MAX_PAGE_SIZE { + next_cursor.block_number = block_number; + next_cursor.transaction_index = MAX_PAGE_SIZE as u64; + return Ok(TransactionsPage { transactions, cursor: next_cursor }); + } + } + } + } + + if let Some(pending_state) = this.sequencer.pending_state() { + let remaining = MAX_PAGE_SIZE - transactions.len(); + + // If cursor is in the pending block + if cursor.block_number == latest_block_number + 1 { + let pending_transactions = pending_state + .executed_txs + .read() + .iter() + .skip(cursor.transaction_index as usize) + .take(remaining) + .map(|(tx, info)| { + ( + tx.clone(), + MaybePendingTxReceipt::Pending(PendingTxReceipt::new( + tx.hash, + info.receipt.clone(), + )), + ) + }) + .collect::>(); + + // If there are no transactions after the index in the pending block + if pending_transactions.is_empty() { + // Wait for a new transaction to be executed + let inner = this.sequencer.block_producer().inner.read(); + let block_producer = match &*inner { + BlockProducerMode::Interval(block_producer) => block_producer, + _ => panic!( + "Expected BlockProducerMode::Interval, found something else" + ), + }; + + return Err(ToriiApiError::TransactionsNotReady { + rx: block_producer.add_listener(), + cursor: next_cursor, + }); + } + + next_cursor.transaction_index += pending_transactions.len() as u64; + transactions.extend(pending_transactions); + } else { + let pending_transactions = pending_state + .executed_txs + .read() + .iter() + .take(remaining) + .map(|(tx, info)| { + ( + tx.clone(), + MaybePendingTxReceipt::Pending(PendingTxReceipt::new( + tx.hash, + info.receipt.clone(), + )), + ) + }) + .collect::>(); + next_cursor.block_number += 1; + next_cursor.transaction_index = pending_transactions.len() as u64; + transactions.extend(pending_transactions); + }; + } else { + // If there is no pending state, we are instant mining. + next_cursor.block_number += 1; + next_cursor.transaction_index = 0; + + if transactions.is_empty() { + // Wait for a new transaction to be executed + let inner = this.sequencer.block_producer().inner.read(); + let block_producer = match &*inner { + BlockProducerMode::Instant(block_producer) => block_producer, + _ => { + panic!("Expected BlockProducerMode::Instant, found something else") + } + }; + + return Err(ToriiApiError::TransactionsNotReady { + rx: block_producer.add_listener(), + cursor: next_cursor, + }); + } + } + + Ok(TransactionsPage { transactions, cursor: next_cursor }) + }) + .await + { + Ok(result) => Ok(result), + Err(e) => match e { + ToriiApiError::TransactionsNotReady { mut rx, cursor } => { + let transactions = rx + .next() + .await + .ok_or(ToriiApiError::ChannelDisconnected)? + .into_iter() + .map(|(tx, receipt)| { + ( + tx.clone(), + MaybePendingTxReceipt::Pending(PendingTxReceipt::new( + tx.hash, receipt, + )), + ) + }) + .collect::>(); + let mut next_cursor = cursor; + next_cursor.transaction_index += transactions.len() as u64; + Ok(TransactionsPage { transactions, cursor: next_cursor }) + } + _ => Err(e.into()), + }, + } + } +} diff --git a/crates/katana/rpc/rpc/tests/common/mod.rs b/crates/katana/rpc/rpc/tests/common/mod.rs new file mode 100644 index 0000000000..1d214569b5 --- /dev/null +++ b/crates/katana/rpc/rpc/tests/common/mod.rs @@ -0,0 +1,35 @@ +use std::fs::File; +use std::path::PathBuf; + +use anyhow::{anyhow, Result}; +use cairo_lang_starknet::casm_contract_class::CasmContractClass; +use cairo_lang_starknet::contract_class::ContractClass; +use katana_primitives::conversion::rpc::CompiledClass; +use starknet::core::types::contract::SierraClass; +use starknet::core::types::{FieldElement, FlattenedSierraClass}; + +pub fn prepare_contract_declaration_params( + artifact_path: &PathBuf, +) -> Result<(FlattenedSierraClass, FieldElement)> { + let flattened_class = get_flattened_class(artifact_path) + .map_err(|e| anyhow!("error flattening the contract class: {e}"))?; + let compiled_class_hash = get_compiled_class_hash(artifact_path) + .map_err(|e| anyhow!("error computing compiled class hash: {e}"))?; + Ok((flattened_class, compiled_class_hash)) +} + +fn get_flattened_class(artifact_path: &PathBuf) -> Result { + let file = File::open(artifact_path)?; + let contract_artifact: SierraClass = serde_json::from_reader(&file)?; + Ok(contract_artifact.flatten()?) +} + +fn get_compiled_class_hash(artifact_path: &PathBuf) -> Result { + let file = File::open(artifact_path)?; + let casm_contract_class: ContractClass = serde_json::from_reader(file)?; + let casm_contract = CasmContractClass::from_contract_class(casm_contract_class, true) + .map_err(|e| anyhow!("CasmContractClass from ContractClass error: {e}"))?; + let res = serde_json::to_string_pretty(&casm_contract)?; + let compiled_class: CompiledClass = serde_json::from_str(&res)?; + Ok(compiled_class.class_hash()?) +} diff --git a/crates/katana/rpc/rpc/tests/starknet.rs b/crates/katana/rpc/rpc/tests/starknet.rs index 08d29ca2a7..9daff5b2fa 100644 --- a/crates/katana/rpc/rpc/tests/starknet.rs +++ b/crates/katana/rpc/rpc/tests/starknet.rs @@ -1,23 +1,23 @@ -use std::fs::{self, File}; +use std::fs::{self}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, Result}; -use cairo_lang_starknet::casm_contract_class::CasmContractClass; -use cairo_lang_starknet::contract_class::ContractClass; use dojo_test_utils::sequencer::{get_default_test_starknet_config, TestSequencer}; use katana_core::sequencer::SequencerConfig; use starknet::accounts::{Account, Call, ConnectedAccount}; use starknet::core::types::contract::legacy::LegacyContractClass; -use starknet::core::types::contract::{CompiledClass, SierraClass}; use starknet::core::types::{ - BlockId, BlockTag, DeclareTransactionReceipt, FieldElement, FlattenedSierraClass, - MaybePendingTransactionReceipt, TransactionFinalityStatus, TransactionReceipt, + BlockId, BlockTag, DeclareTransactionReceipt, FieldElement, MaybePendingTransactionReceipt, + TransactionFinalityStatus, TransactionReceipt, }; use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::providers::Provider; +use crate::common::prepare_contract_declaration_params; + +mod common; + const WAIT_TX_DELAY_MILLIS: u64 = 1000; #[tokio::test(flavor = "multi_thread")] @@ -175,29 +175,3 @@ async fn test_send_declare_and_deploy_legacy_contract() { sequencer.stop().expect("failed to stop sequencer"); } - -fn prepare_contract_declaration_params( - artifact_path: &PathBuf, -) -> Result<(FlattenedSierraClass, FieldElement)> { - let flattened_class = get_flattened_class(artifact_path) - .map_err(|e| anyhow!("error flattening the contract class: {e}"))?; - let compiled_class_hash = get_compiled_class_hash(artifact_path) - .map_err(|e| anyhow!("error computing compiled class hash: {e}"))?; - Ok((flattened_class, compiled_class_hash)) -} - -fn get_flattened_class(artifact_path: &PathBuf) -> Result { - let file = File::open(artifact_path)?; - let contract_artifact: SierraClass = serde_json::from_reader(&file)?; - Ok(contract_artifact.flatten()?) -} - -fn get_compiled_class_hash(artifact_path: &PathBuf) -> Result { - let file = File::open(artifact_path)?; - let casm_contract_class: ContractClass = serde_json::from_reader(file)?; - let casm_contract = CasmContractClass::from_contract_class(casm_contract_class, true) - .map_err(|e| anyhow!("CasmContractClass from ContractClass error: {e}"))?; - let res = serde_json::to_string_pretty(&casm_contract)?; - let compiled_class: CompiledClass = serde_json::from_str(&res)?; - Ok(compiled_class.class_hash()?) -} diff --git a/crates/katana/rpc/rpc/tests/torii.rs b/crates/katana/rpc/rpc/tests/torii.rs new file mode 100644 index 0000000000..05781323b3 --- /dev/null +++ b/crates/katana/rpc/rpc/tests/torii.rs @@ -0,0 +1,239 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use dojo_test_utils::sequencer::{get_default_test_starknet_config, TestSequencer}; +use jsonrpsee::http_client::HttpClientBuilder; +use katana_core::sequencer::SequencerConfig; +use katana_rpc_api::dev::DevApiClient; +use katana_rpc_api::torii::ToriiApiClient; +use katana_rpc_types::transaction::{TransactionsPage, TransactionsPageCursor}; +use starknet::accounts::{Account, Call}; +use starknet::core::types::FieldElement; +use starknet::core::utils::get_selector_from_name; +use tokio::time::sleep; + +use crate::common::prepare_contract_declaration_params; + +mod common; + +pub const ENOUGH_GAS: &str = "0x100000000000000000"; + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_transactions() { + let sequencer = TestSequencer::start( + SequencerConfig { block_time: None, no_mining: true, ..Default::default() }, + get_default_test_starknet_config(), + ) + .await; + + let client = HttpClientBuilder::default().build(sequencer.url()).unwrap(); + + let account = sequencer.account(); + + let path: PathBuf = PathBuf::from("tests/test_data/cairo1_contract.json"); + let (contract, compiled_class_hash) = prepare_contract_declaration_params(&path).unwrap(); + let contract = Arc::new(contract); + + // Should return successfully when no transactions have been mined. + let cursor = TransactionsPageCursor { block_number: 0, transaction_index: 0 }; + + let response: TransactionsPage = client.get_transactions(cursor).await.unwrap(); + + assert!(response.transactions.is_empty()); + assert!(response.cursor.block_number == 1); + assert!(response.cursor.transaction_index == 0); + + let declare_res = account.declare(contract.clone(), compiled_class_hash).send().await.unwrap(); + + // Should return successfully with single pending txn. + let response: TransactionsPage = client.get_transactions(response.cursor).await.unwrap(); + + assert!(response.transactions.len() == 1); + assert!(response.cursor.block_number == 1); + assert!(response.cursor.transaction_index == 1); + + // Create block 1. + let _: () = client.generate_block().await.unwrap(); + + // Should properly increment to new empty pending block + let response: TransactionsPage = client.get_transactions(response.cursor).await.unwrap(); + + assert!(response.transactions.is_empty()); + assert!(response.cursor.block_number == 2); + assert!(response.cursor.transaction_index == 0); + + // Should block on cursor at end of page and return on new txn + let long_poll_future = client.get_transactions(response.cursor); + let deploy_call = build_deploy_contract_call(declare_res.class_hash, FieldElement::ZERO); + let deploy_txn = account.execute(vec![deploy_call]); + let deploy_txn_future = deploy_txn.send(); + + tokio::select! { + result = long_poll_future => { + let long_poll_result = result.unwrap(); + assert!(long_poll_result.transactions.len() == 1); + assert!(long_poll_result.cursor.block_number == 2); + assert!(long_poll_result.cursor.transaction_index == 1); + } + result = deploy_txn_future => { + // The declare transaction has completed, but we don't need to do anything with it here. + result.expect("Should succeed"); + } + } + + // Create block 2. + let _: () = client.generate_block().await.unwrap(); + + let deploy_call = build_deploy_contract_call(declare_res.class_hash, FieldElement::ONE); + let deploy_txn = account.execute(vec![deploy_call]); + let deploy_txn_future = deploy_txn.send().await.unwrap(); + + // Should properly increment to new pending block + let response: TransactionsPage = client + .get_transactions(TransactionsPageCursor { block_number: 2, transaction_index: 1 }) + .await + .unwrap(); + + assert!(response.transactions.len() == 1); + assert!(response.transactions[0].0.hash == deploy_txn_future.transaction_hash); + assert!(response.cursor.block_number == 3); + assert!(response.cursor.transaction_index == 1); + + // Create block 3. + let _: () = client.generate_block().await.unwrap(); + + let max_fee = FieldElement::from_hex_be(ENOUGH_GAS).unwrap(); + let mut nonce = FieldElement::THREE; + // Test only returns first 100 txns from pending block + for i in 0..101 { + let deploy_call = build_deploy_contract_call(declare_res.class_hash, (i + 2_u32).into()); + let deploy_txn = account.execute(vec![deploy_call]).nonce(nonce).max_fee(max_fee); + deploy_txn.send().await.unwrap(); + nonce += FieldElement::ONE; + } + + // Wait until all pending txs have been mined. + // @kairy is there a more deterministic approach here? + sleep(Duration::from_millis(5000)).await; + + let start_cursor = response.cursor; + let response: TransactionsPage = client.get_transactions(start_cursor.clone()).await.unwrap(); + assert!(response.transactions.len() == 100); + assert!(response.cursor.block_number == 4); + assert!(response.cursor.transaction_index == 100); + + // Should get one more + let response: TransactionsPage = client.get_transactions(response.cursor).await.unwrap(); + assert!(response.transactions.len() == 1); + assert!(response.cursor.block_number == 4); + assert!(response.cursor.transaction_index == 101); + + // Create block 4. + let _: () = client.generate_block().await.unwrap(); + + let response: TransactionsPage = client.get_transactions(start_cursor.clone()).await.unwrap(); + assert!(response.transactions.len() == 100); + assert!(response.cursor.block_number == 4); + assert!(response.cursor.transaction_index == 100); + + // Should get one more + let response: TransactionsPage = client.get_transactions(response.cursor).await.unwrap(); + assert!(response.transactions.len() == 1); + assert!(response.cursor.block_number == 5); + assert!(response.cursor.transaction_index == 0); + + sequencer.stop().expect("failed to stop sequencer"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_transactions_with_instant_mining() { + let sequencer = TestSequencer::start( + SequencerConfig { block_time: None, no_mining: false, ..Default::default() }, + get_default_test_starknet_config(), + ) + .await; + + let client = HttpClientBuilder::default().build(sequencer.url()).unwrap(); + + let account = sequencer.account(); + + let path: PathBuf = PathBuf::from("tests/test_data/cairo1_contract.json"); + let (contract, compiled_class_hash) = prepare_contract_declaration_params(&path).unwrap(); + let contract = Arc::new(contract); + + // Should return successfully when no transactions have been mined. + let cursor = TransactionsPageCursor { block_number: 0, transaction_index: 0 }; + + let declare_res = account.declare(contract.clone(), compiled_class_hash).send().await.unwrap(); + + sleep(Duration::from_millis(1000)).await; + + // Should return successfully with single txn. + let response: TransactionsPage = client.get_transactions(cursor).await.unwrap(); + + assert!(response.transactions.len() == 1); + assert!(response.cursor.block_number == 1); + assert!(response.cursor.transaction_index == 0); + + // Should block on cursor at end of page and return on new txn + let long_poll_future = client.get_transactions(response.cursor); + let deploy_call = build_deploy_contract_call(declare_res.class_hash, FieldElement::ZERO); + let deploy_txn = account.execute(vec![deploy_call]); + let deploy_txn_future = deploy_txn.send(); + + tokio::select! { + result = long_poll_future => { + let long_poll_result = result.unwrap(); + assert!(long_poll_result.transactions.len() == 1); + assert!(long_poll_result.cursor.block_number == 2); + assert!(long_poll_result.cursor.transaction_index == 0); + } + result = deploy_txn_future => { + // The declare transaction has completed, but we don't need to do anything with it here. + result.expect("Should succeed"); + } + } + + let deploy_call = build_deploy_contract_call(declare_res.class_hash, FieldElement::ONE); + let deploy_txn = account.execute(vec![deploy_call]); + let deploy_txn_future = deploy_txn.send().await.unwrap(); + + // Should properly increment to new pending block + let response: TransactionsPage = client + .get_transactions(TransactionsPageCursor { block_number: 2, transaction_index: 1 }) + .await + .unwrap(); + + assert!(response.transactions.len() == 1); + assert!(response.transactions[0].0.hash == deploy_txn_future.transaction_hash); + assert!(response.cursor.block_number == 3); + assert!(response.cursor.transaction_index == 1); + + sequencer.stop().expect("failed to stop sequencer"); +} + +fn build_deploy_contract_call(class_hash: FieldElement, salt: FieldElement) -> Call { + let constructor_calldata = vec![FieldElement::from(1_u32), FieldElement::from(2_u32)]; + + let calldata = [ + vec![ + class_hash, // class hash + salt, // salt + FieldElement::ZERO, // unique + FieldElement::from(constructor_calldata.len()), // constructor calldata len + ], + constructor_calldata.clone(), + ] + .concat(); + + Call { + calldata, + // devnet UDC address + to: FieldElement::from_hex_be( + "0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf", + ) + .unwrap(), + selector: get_selector_from_name("deployContract").unwrap(), + } +}