Skip to content

Commit

Permalink
feat(katana): add torii_getTransactions rpc (#1529)
Browse files Browse the repository at this point in the history
  • Loading branch information
tarrencev authored Feb 20, 2024
1 parent 716520e commit 64992c9
Show file tree
Hide file tree
Showing 18 changed files with 716 additions and 52 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/katana/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl KatanaArgs {
}

pub fn server_config(&self) -> ServerConfig {
let mut apis = vec![ApiKind::Starknet, ApiKind::Katana];
let mut apis = vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Torii];
// only enable `katana` API in dev mode
if self.dev {
apis.push(ApiKind::Dev);
Expand Down
2 changes: 1 addition & 1 deletion crates/dojo-test-utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl TestSequencer {
port: 0,
host: "127.0.0.1".into(),
max_connections: 100,
apis: vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Dev],
apis: vec![ApiKind::Starknet, ApiKind::Katana, ApiKind::Torii, ApiKind::Dev],
},
)
.await
Expand Down
129 changes: 113 additions & 16 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
use katana_executor::blockifier::outcome::TxReceiptWithExecInfo;
Expand All @@ -24,7 +25,7 @@ use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::state::StateFactoryProvider;
use parking_lot::RwLock;
use tokio::time::{interval_at, Instant, Interval};
use tracing::trace;
use tracing::{trace, warn};

use crate::backend::Backend;

Expand All @@ -42,6 +43,9 @@ type ServiceFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;

type BlockProductionResult = Result<MinedBlockOutcome, BlockProductionError>;
type BlockProductionFuture = ServiceFuture<BlockProductionResult>;
type BlockProductionWithTxnsFuture =
ServiceFuture<Result<(Vec<TxWithHashAndReceiptPair>, MinedBlockOutcome), BlockProductionError>>;
pub type TxWithHashAndReceiptPair = (TxWithHash, Receipt);

/// The type which responsible for block production.
#[must_use = "BlockProducer does nothing unless polled"]
Expand Down Expand Up @@ -161,6 +165,8 @@ pub struct IntervalBlockProducer {
queued: VecDeque<Vec<ExecutableTxWithHash>>,
/// The state of the pending block after executing all the transactions within the interval.
state: Arc<PendingState>,
/// Listeners notified when a new executed tx is added.
tx_execution_listeners: RwLock<Vec<Sender<Vec<TxWithHashAndReceiptPair>>>>,
}

impl IntervalBlockProducer {
Expand All @@ -185,6 +191,7 @@ impl IntervalBlockProducer {
block_mining: None,
interval: Some(interval),
queued: VecDeque::default(),
tx_execution_listeners: RwLock::new(vec![]),
}
}

Expand All @@ -198,7 +205,14 @@ impl IntervalBlockProducer {
) -> Self {
let state = Arc::new(PendingState::new(db, block_exec_envs.0, block_exec_envs.1));

Self { state, backend, interval: None, block_mining: None, queued: VecDeque::default() }
Self {
state,
backend,
interval: None,
block_mining: None,
queued: VecDeque::default(),
tx_execution_listeners: RwLock::new(vec![]),
}
}

pub fn state(&self) -> Arc<PendingState> {
Expand Down Expand Up @@ -267,7 +281,41 @@ impl IntervalBlockProducer {
.collect::<Vec<_>>()
};

self.state.executed_txs.write().extend(results);
self.state.executed_txs.write().extend(results.clone());
self.notify_listener(results.into_iter().map(|(tx, info)| (tx, info.receipt)).collect());
}

pub fn add_listener(&self) -> Receiver<Vec<TxWithHashAndReceiptPair>> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.tx_execution_listeners.write().push(tx);
rx
}

/// notifies all listeners about the transaction
fn notify_listener(&self, txs: Vec<TxWithHashAndReceiptPair>) {
let mut listener = self.tx_execution_listeners.write();
// this is basically a retain but with mut reference
for n in (0..listener.len()).rev() {
let mut listener_tx = listener.swap_remove(n);
let retain = match listener_tx.try_send(txs.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(
target: "miner",
"failed to send new txs notification because channel is full",
);
true
} else {
false
}
}
};
if retain {
listener.push(listener_tx)
}
}
}

fn outcome(&self) -> StateUpdatesWithDeclaredClasses {
Expand Down Expand Up @@ -320,14 +368,21 @@ pub struct InstantBlockProducer {
/// Holds the backend if no block is being mined
backend: Arc<Backend>,
/// Single active future that mines a new block
block_mining: Option<BlockProductionFuture>,
block_mining: Option<BlockProductionWithTxnsFuture>,
/// Backlog of sets of transactions ready to be mined
queued: VecDeque<Vec<ExecutableTxWithHash>>,
/// Listeners notified when a new executed tx is added.
tx_execution_listeners: RwLock<Vec<Sender<Vec<TxWithHashAndReceiptPair>>>>,
}

impl InstantBlockProducer {
pub fn new(backend: Arc<Backend>) -> Self {
Self { backend, block_mining: None, queued: VecDeque::default() }
Self {
backend,
block_mining: None,
queued: VecDeque::default(),
tx_execution_listeners: RwLock::new(vec![]),
}
}

pub fn force_mine(&mut self) {
Expand All @@ -342,7 +397,7 @@ impl InstantBlockProducer {
fn do_mine(
backend: Arc<Backend>,
transactions: Vec<ExecutableTxWithHash>,
) -> Result<MinedBlockOutcome, BlockProductionError> {
) -> Result<(Vec<TxWithHashAndReceiptPair>, MinedBlockOutcome), BlockProductionError> {
trace!(target: "miner", "creating new block");

let provider = backend.blockchain.provider();
Expand All @@ -359,7 +414,7 @@ impl InstantBlockProducer {

let txs = transactions.iter().map(TxWithHash::from);

let tx_receipt_pairs: Vec<(TxWithHash, Receipt)> = TransactionExecutor::new(
let tx_receipt_pairs: Vec<TxWithHashAndReceiptPair> = TransactionExecutor::new(
&state,
&block_context,
!backend.config.disable_fee,
Expand All @@ -372,8 +427,8 @@ impl InstantBlockProducer {
.zip(txs)
.filter_map(|(res, tx)| {
if let Ok(info) = res {
let receipt = TxReceiptWithExecInfo::new(&tx, info);
Some((tx, receipt.receipt))
let info = TxReceiptWithExecInfo::new(&tx, info);
Some((tx, info.receipt))
} else {
None
}
Expand All @@ -382,19 +437,52 @@ impl InstantBlockProducer {

let outcome = backend.do_mine_block(
&block_env,
tx_receipt_pairs,
tx_receipt_pairs.clone(),
get_state_update_from_cached_state(&state),
)?;

trace!(target: "miner", "created new block: {}", outcome.block_number);

Ok(outcome)
Ok((tx_receipt_pairs, outcome))
}

pub fn add_listener(&self) -> Receiver<Vec<TxWithHashAndReceiptPair>> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.tx_execution_listeners.write().push(tx);
rx
}

/// notifies all listeners about the transaction
fn notify_listener(&self, txs: Vec<TxWithHashAndReceiptPair>) {
let mut listener = self.tx_execution_listeners.write();
// this is basically a retain but with mut reference
for n in (0..listener.len()).rev() {
let mut listener_tx = listener.swap_remove(n);
let retain = match listener_tx.try_send(txs.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(
target: "miner",
"failed to send new txs notification because channel is full",
);
true
} else {
false
}
}
};
if retain {
listener.push(listener_tx)
}
}
}
}

impl Stream for InstantBlockProducer {
// mined block outcome and the new state
type Item = BlockProductionResult;
type Item = Result<MinedBlockOutcome, BlockProductionError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();
Expand All @@ -410,10 +498,19 @@ impl Stream for InstantBlockProducer {

// poll the mining future
if let Some(mut mining) = pin.block_mining.take() {
if let Poll::Ready(outcome) = mining.poll_unpin(cx) {
return Poll::Ready(Some(outcome));
} else {
pin.block_mining = Some(mining)
match mining.poll_unpin(cx) {
Poll::Ready(Ok((txs, outcome))) => {
pin.notify_listener(txs);
return Poll::Ready(Some(Ok(outcome)));
}

Poll::Ready(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}

Poll::Pending => {
pin.block_mining = Some(mining);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ katana-provider = { path = "../storage/provider" }

anyhow.workspace = true
convert_case.workspace = true
futures.workspace = true
parking_lot.workspace = true
starknet.workspace = true
tracing.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/katana/executor/src/blockifier/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use katana_primitives::transaction::Tx;

use super::utils::{events_from_exec_info, l2_to_l1_messages_from_exec_info};

#[derive(Clone)]
pub struct TxReceiptWithExecInfo {
pub receipt: Receipt,
pub execution_info: TransactionExecutionInfo,
Expand Down
2 changes: 2 additions & 0 deletions crates/katana/rpc/rpc-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub mod dev;
pub mod katana;
pub mod starknet;
pub mod torii;

/// List of APIs supported by Katana.
#[derive(Debug, Copy, Clone)]
pub enum ApiKind {
Starknet,
Katana,
Torii,
Dev,
}
11 changes: 11 additions & 0 deletions crates/katana/rpc/rpc-api/src/torii.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use katana_rpc_types::transaction::{TransactionsPage, TransactionsPageCursor};

#[cfg_attr(not(feature = "client"), rpc(server, namespace = "torii"))]
#[cfg_attr(feature = "client", rpc(client, server, namespace = "torii"))]
pub trait ToriiApi {
#[method(name = "getTransactions")]
async fn get_transactions(&self, cursor: TransactionsPageCursor)
-> RpcResult<TransactionsPage>;
}
3 changes: 2 additions & 1 deletion crates/katana/rpc/rpc-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ katana-core = { path = "../../core" }
katana-primitives = { path = "../../primitives" }
katana-provider = { path = "../../storage/provider" }

ethers = "2.0.11"
anyhow.workspace = true
derive_more.workspace = true
ethers = "2.0.11"
futures.workspace = true
jsonrpsee = { workspace = true, features = [ "macros", "server" ] }
serde.workspace = true
serde_with.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/katana/rpc/rpc-types/src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod katana;
pub mod starknet;
pub mod torii;
70 changes: 70 additions & 0 deletions crates/katana/rpc/rpc-types/src/error/torii.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use futures::channel::mpsc::Receiver;
use jsonrpsee::core::Error;
use jsonrpsee::types::error::CallError;
use jsonrpsee::types::ErrorObject;
use katana_core::sequencer_error::SequencerError;
use katana_primitives::receipt::Receipt;
use katana_primitives::transaction::TxWithHash;
use katana_provider::error::ProviderError;

use crate::transaction::TransactionsPageCursor;

#[derive(Debug, thiserror::Error)]
#[repr(i32)]
pub enum ToriiApiError {
#[error("Block not found")]
BlockNotFound,
#[error("Transaction index out of bounds")]
TransactionOutOfBounds,
#[error("Transaction not found")]
TransactionNotFound,
#[error("Transaction receipt not found")]
TransactionReceiptNotFound,
#[error("Transactions not ready")]
TransactionsNotReady {
rx: Receiver<Vec<(TxWithHash, Receipt)>>,
cursor: TransactionsPageCursor,
},
#[error("Long poll expired")]
ChannelDisconnected,
#[error("An unexpected error occured: {reason}")]
UnexpectedError { reason: String },
}

impl ToriiApiError {
fn code(&self) -> i32 {
match self {
ToriiApiError::BlockNotFound => 24,
ToriiApiError::TransactionOutOfBounds => 34,
ToriiApiError::TransactionNotFound => 35,
ToriiApiError::TransactionReceiptNotFound => 36,
ToriiApiError::TransactionsNotReady { .. } => 37,
ToriiApiError::ChannelDisconnected => 42,
ToriiApiError::UnexpectedError { .. } => 63,
}
}
}

impl From<ProviderError> for ToriiApiError {
fn from(value: ProviderError) -> Self {
ToriiApiError::UnexpectedError { reason: value.to_string() }
}
}

impl From<SequencerError> for ToriiApiError {
fn from(value: SequencerError) -> Self {
match value {
SequencerError::BlockNotFound(_) => ToriiApiError::BlockNotFound,
err => ToriiApiError::UnexpectedError { reason: err.to_string() },
}
}
}

impl From<ToriiApiError> for Error {
fn from(err: ToriiApiError) -> Self {
let code = err.code();
let message = err.to_string();
let err = ErrorObject::owned(code, message, None::<()>);
Error::Call(CallError::Custom(err))
}
}
Loading

0 comments on commit 64992c9

Please sign in to comment.