diff --git a/crates/katana/core/src/backend/mod.rs b/crates/katana/core/src/backend/mod.rs index 47908816bc..ff0a45914a 100644 --- a/crates/katana/core/src/backend/mod.rs +++ b/crates/katana/core/src/backend/mod.rs @@ -32,7 +32,7 @@ use crate::utils::get_current_timestamp; pub(crate) const LOG_TARGET: &str = "katana::core::backend"; #[derive(Debug)] -pub struct Backend { +pub struct Backend { pub chain_spec: Arc, /// stores all block related data in memory pub blockchain: Blockchain, diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 848ae60789..8a4455749f 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -12,16 +12,23 @@ use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFa use katana_pool::validation::stateful::TxValidator; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::da::L1DataAvailabilityMode; +use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash}; use katana_provider::error::ProviderError; use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; -use katana_provider::traits::state::StateFactoryProvider; +use katana_provider::traits::pending::PendingBlockProvider; +use katana_provider::traits::state::{StateFactoryProvider, StateProvider}; +use katana_provider::traits::transaction::{ + ReceiptProvider, TransactionProvider, TransactionTraceProvider, +}; +use katana_provider::ProviderResult; use katana_tasks::{BlockingTaskPool, BlockingTaskResult}; use parking_lot::lock_api::RawMutex; use parking_lot::{Mutex, RwLock}; +use starknet::core::types::{TransactionExecutionStatus, TransactionStatus}; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info, trace, warn}; @@ -699,3 +706,255 @@ impl Stream for InstantBlockProducer { Poll::Pending } } + +impl PendingBlockProvider for BlockProducer { + fn pending_block(&self) -> ProviderResult<()> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_block(), + BlockProducerMode::Interval(bp) => bp.pending_block(), + } + } + + fn pending_transaction(&self, hash: TxHash) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_transaction(hash), + BlockProducerMode::Interval(bp) => bp.pending_transaction(hash), + } + } + + fn pending_receipt(&self, hash: TxHash) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_receipt(hash), + BlockProducerMode::Interval(bp) => bp.pending_receipt(hash), + } + } + + fn pending_block_env(&self) -> ProviderResult { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_block_env(), + BlockProducerMode::Interval(bp) => bp.pending_block_env(), + } + } + + fn pending_state(&self) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_state(), + BlockProducerMode::Interval(bp) => bp.pending_state(), + } + } + + fn pending_transaction_status( + &self, + hash: TxHash, + ) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_transaction_status(hash), + BlockProducerMode::Interval(bp) => bp.pending_transaction_status(hash), + } + } + + fn pending_transactions(&self) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_transactions(), + BlockProducerMode::Interval(bp) => bp.pending_transactions(), + } + } + + fn pending_receipts(&self) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_receipts(), + BlockProducerMode::Interval(bp) => bp.pending_receipts(), + } + } + + fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_transaction_trace(hash), + BlockProducerMode::Interval(bp) => bp.pending_transaction_trace(hash), + } + } + + fn pending_transaction_traces(&self) -> ProviderResult> { + match &*self.producer.read() { + BlockProducerMode::Instant(bp) => bp.pending_transaction_traces(), + BlockProducerMode::Interval(bp) => bp.pending_transaction_traces(), + } + } +} + +impl PendingBlockProvider for InstantBlockProducer { + fn pending_transaction(&self, hash: TxHash) -> ProviderResult> { + self.backend.blockchain.provider().transaction_by_hash(hash) + } + + fn pending_receipt(&self, hash: TxHash) -> ProviderResult> { + self.backend.blockchain.provider().receipt_by_hash(hash) + } + + fn pending_block_env(&self) -> ProviderResult { + // In instant mining mode, we don't have a notion of 'pending' block as a block is mined + // instantly when a valid transaction is submitted. So we need to mimic how a new + // block env is created in interval mining mode. + // + // It is important that we update the block env here especially for the block timestamp. + // Because if a fee estimate is requested using the pending block in instant mode, + // and the timestamp is NOT UPDATED, it will be estimated using the block timestamp + // of the last mined block. This is not ideal as the fee estimate should be based on + // the block timestamp of the block that the transaction may + + let num = self.backend.blockchain.provider().latest_number()?; + let env = self.backend.blockchain.provider().block_env_at(num.into())?; + + let mut env = env.expect("latest block env must exist"); + self.backend.update_block_env(&mut env); + + Ok(env) + } + + fn pending_state(&self) -> ProviderResult> { + self.backend.blockchain.provider().latest() + } + + fn pending_block(&self) -> ProviderResult<()> { + todo!() + } + + fn pending_transaction_status( + &self, + hash: TxHash, + ) -> ProviderResult> { + if let Some(receipt) = dbg!(self.backend.blockchain.provider().receipt_by_hash(hash)?) { + if receipt.is_reverted() { + Ok(Some(TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Reverted))) + } else { + Ok(Some(TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Succeeded))) + } + } else { + Ok(None) + } + } + + fn pending_transactions(&self) -> ProviderResult> { + Ok(Vec::new()) + } + + fn pending_receipts(&self) -> ProviderResult> { + Ok(Vec::new()) + } + + fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult> { + self.backend.blockchain.provider().transaction_execution(hash) + } + + fn pending_transaction_traces(&self) -> ProviderResult> { + let number = self.backend.blockchain.provider().latest_number()?; + let traces = + self.backend.blockchain.provider().transaction_executions_by_block(number.into())?; + Ok(traces.expect("traces for latest block must exist")) + } +} + +impl PendingBlockProvider for IntervalBlockProducer { + fn pending_transaction(&self, hash: TxHash) -> ProviderResult> { + let transaction = self + .executor() + .read() + .transactions() + .iter() + .find_map(|(tx, _)| if tx.hash == hash { Some(tx.clone()) } else { None }); + + Ok(transaction) + } + + fn pending_receipt(&self, hash: TxHash) -> ProviderResult> { + let receipt = self + .executor() + .read() + .transactions() + .iter() + .find_map(|(tx, res)| if tx.hash == hash { res.receipt().cloned() } else { None }); + + Ok(receipt) + } + + fn pending_transactions(&self) -> ProviderResult> { + Ok(self.executor().read().transactions().iter().map(|(tx, _)| tx.clone()).collect()) + } + + fn pending_state(&self) -> ProviderResult> { + Ok(self.executor().read().state()) + } + + fn pending_block_env(&self) -> ProviderResult { + Ok(self.executor().read().block_env()) + } + + fn pending_block(&self) -> ProviderResult<()> { + todo!() + } + + fn pending_receipts(&self) -> ProviderResult> { + let receipts = self.executor().read().transactions().iter().fold( + Vec::new(), + |mut acc, (_, result)| { + if let Some(receipt) = result.receipt() { + acc.push(receipt.clone()); + } + acc + }, + ); + + Ok(receipts) + } + + fn pending_transaction_status( + &self, + hash: TxHash, + ) -> ProviderResult> { + if let Some((_, res)) = + self.executor().read().transactions().iter().find(|(tx, _)| tx.hash == hash) + { + // TODO: should impl From for TransactionStatus + let status = match res { + ExecutionResult::Failed { .. } => TransactionStatus::Rejected, + ExecutionResult::Success { receipt, .. } => { + if receipt.is_reverted() { + TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Reverted) + } else { + TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Succeeded) + } + } + }; + + Ok(Some(status)) + } else { + Ok(None) + } + } + + fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult> { + let result = self + .executor() + .read() + .transactions() + .iter() + .find(|(t, _)| t.hash == hash) + .map(|(_, res)| res) + .and_then(|res| res.trace()) + .cloned(); + + Ok(result) + } + + fn pending_transaction_traces(&self) -> ProviderResult> { + let traces = self + .executor() + .read() + .transactions() + .iter() + .filter_map(|(_, r)| r.trace().cloned()) + .collect::>(); + + Ok(traces) + } +} diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 8d8d822d06..a9b945cd61 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -279,7 +279,7 @@ pub async fn spawn( cfg, ) } else { - StarknetApi::new(backend.clone(), pool.clone(), Some(block_producer.clone()), cfg) + StarknetApi::new(backend.clone(), pool.clone(), block_producer.clone(), cfg) }; methods.merge(StarknetApiServer::into_rpc(server.clone()))?; diff --git a/crates/katana/rpc/rpc/src/starknet/mod.rs b/crates/katana/rpc/rpc/src/starknet/mod.rs index c0d4256987..a5f0379535 100644 --- a/crates/katana/rpc/rpc/src/starknet/mod.rs +++ b/crates/katana/rpc/rpc/src/starknet/mod.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use katana_core::backend::Backend; -use katana_core::service::block_producer::{BlockProducer, BlockProducerMode, PendingExecutor}; -use katana_executor::{ExecutionResult, ExecutorFactory}; +use katana_executor::ExecutorFactory; use katana_pool::{TransactionPool, TxPool}; use katana_primitives::block::{ BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber, BlockTag, FinalityStatus, @@ -20,6 +19,7 @@ use katana_primitives::Felt; use katana_provider::traits::block::{BlockHashProvider, BlockIdReader, BlockNumberProvider}; use katana_provider::traits::contract::ContractClassProvider; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::pending::PendingBlockProvider; use katana_provider::traits::state::{StateFactoryProvider, StateProvider}; use katana_provider::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionStatusProvider, @@ -62,68 +62,63 @@ type StarknetApiResult = Result; /// [write](katana_rpc_api::starknet::StarknetWriteApi), and /// [trace](katana_rpc_api::starknet::StarknetTraceApi) APIs. #[allow(missing_debug_implementations)] -pub struct StarknetApi -where - EF: ExecutorFactory, -{ - inner: Arc>, +pub struct StarknetApi { + inner: Arc>, } -struct StarknetApiInner -where - EF: ExecutorFactory, -{ +struct StarknetApiInner { pool: TxPool, backend: Arc>, + pending_provider: Option

, forked_client: Option, blocking_task_pool: BlockingTaskPool, - block_producer: Option>, config: StarknetApiConfig, } -impl StarknetApi +impl StarknetApi where EF: ExecutorFactory, + P: PendingBlockProvider, { pub fn new( backend: Arc>, pool: TxPool, - block_producer: Option>, + pending_provider: P, config: StarknetApiConfig, ) -> Self { - Self::new_inner(backend, pool, block_producer, None, config) + Self::new_inner(backend, pool, Some(pending_provider), None, config) } pub fn new_forked( backend: Arc>, pool: TxPool, - block_producer: BlockProducer, + pending_provider: P, forked_client: ForkedClient, config: StarknetApiConfig, ) -> Self { - Self::new_inner(backend, pool, Some(block_producer), Some(forked_client), config) + Self::new_inner(backend, pool, Some(pending_provider), Some(forked_client), config) } fn new_inner( backend: Arc>, pool: TxPool, - block_producer: Option>, + pending_provider: Option

, forked_client: Option, config: StarknetApiConfig, ) -> Self { let blocking_task_pool = BlockingTaskPool::new().expect("failed to create blocking task pool"); - let inner = StarknetApiInner { - pool, - backend, - block_producer, - blocking_task_pool, - forked_client, - config, - }; - - Self { inner: Arc::new(inner) } + Self { + inner: Arc::new(StarknetApiInner { + pool, + config, + backend, + forked_client, + pending_provider, + blocking_task_pool, + }), + } } async fn on_cpu_blocking_task(&self, func: F) -> T @@ -185,12 +180,8 @@ where Ok(estimates) } - /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. - fn pending_executor(&self) -> Option { - self.inner.block_producer.as_ref().and_then(|bp| match &*bp.producer.read() { - BlockProducerMode::Instant(_) => None, - BlockProducerMode::Interval(producer) => Some(producer.executor()), - }) + fn pending_provider(&self) -> Option<&P> { + self.inner.pending_provider.as_ref() } fn state(&self, block_id: &BlockIdOrTag) -> StarknetApiResult> { @@ -200,8 +191,8 @@ where BlockIdOrTag::Tag(BlockTag::Latest) => Some(provider.latest()?), BlockIdOrTag::Tag(BlockTag::Pending) => { - if let Some(exec) = self.pending_executor() { - Some(exec.read().state()) + if let Some(provider) = self.pending_provider() { + Some(provider.pending_state()?) } else { Some(provider.latest()?) } @@ -220,8 +211,8 @@ where let env = match block_id { BlockIdOrTag::Tag(BlockTag::Pending) => { // If there is a pending block, use the block env of the pending block. - if let Some(exec) = self.pending_executor() { - Some(exec.read().block_env()) + if let Some(provider) = self.pending_provider() { + Some(provider.pending_block_env()?) } // else, we create a new block env and update the values to reflect the current // state. @@ -315,9 +306,9 @@ where let provider = this.inner.backend.blockchain.provider(); let block_id: BlockHashOrNumber = match block_id { - BlockIdOrTag::Tag(BlockTag::Pending) => match this.pending_executor() { - Some(exec) => { - let count = exec.read().transactions().len() as u64; + BlockIdOrTag::Tag(BlockTag::Pending) => match this.pending_provider() { + Some(provider) => { + let count = provider.pending_transactions()?.len() as u64; return Ok(Some(count)); } None => provider.latest_hash()?.into(), @@ -381,13 +372,11 @@ where .on_io_blocking_task(move |this| { // TEMP: have to handle pending tag independently for now let tx = if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - let Some(executor) = this.pending_executor() else { + let Some(provider) = this.pending_provider() else { return Err(StarknetApiError::BlockNotFound); }; - let executor = executor.read(); - let pending_txs = executor.transactions(); - pending_txs.get(index as usize).map(|(tx, _)| tx.clone()) + provider.pending_transactions()?.get(index as usize).cloned() } else { let provider = &this.inner.backend.blockchain.provider(); @@ -426,13 +415,10 @@ where tx @ Some(_) => tx, None => { // check if the transaction is in the pending block - this.pending_executor().as_ref().and_then(|exec| { - exec.read() - .transactions() - .iter() - .find(|(tx, _)| tx.hash == hash) - .map(|(tx, _)| Tx::from(tx.clone())) - }) + this.pending_provider() + .and_then(|pv| pv.pending_transaction(hash).transpose()) + .transpose()? + .map(Tx::from) } }; @@ -461,27 +447,12 @@ where match receipt { Some(receipt) => Ok(Some(receipt)), None => { - let executor = this.pending_executor(); - // If there's a pending executor - let pending_receipt = executor.and_then(|executor| { - // Find the transaction in the pending block that matches the hash - executor.read().transactions().iter().find_map(|(tx, res)| { - if tx.hash == hash { - // If the transaction is found, only return the receipt if it's - // successful - match res { - ExecutionResult::Success { receipt, .. } => { - Some(receipt.clone()) - } - ExecutionResult::Failed { .. } => None, - } - } else { - None - } - }) - }); + let receipt = this + .pending_provider() + .and_then(|pv| pv.pending_receipt(hash).transpose()) + .transpose()?; - if let Some(receipt) = pending_receipt { + if let Some(receipt) = receipt { let receipt = TxReceiptWithBlockInfo::new( ReceiptBlock::Pending, hash, @@ -542,31 +513,8 @@ where } // seach in the pending block if the transaction is not found - if let Some(pending_executor) = this.pending_executor() { - let pending_executor = pending_executor.read(); - let pending_txs = pending_executor.transactions(); - let (_, res) = pending_txs - .iter() - .find(|(tx, _)| tx.hash == hash) - .ok_or(StarknetApiError::TxnHashNotFound)?; - - // TODO: should impl From for TransactionStatus - let status = match res { - ExecutionResult::Failed { .. } => TransactionStatus::Rejected, - ExecutionResult::Success { receipt, .. } => { - if receipt.is_reverted() { - TransactionStatus::AcceptedOnL2( - TransactionExecutionStatus::Reverted, - ) - } else { - TransactionStatus::AcceptedOnL2( - TransactionExecutionStatus::Succeeded, - ) - } - } - }; - - Ok(Some(status)) + if let Some(provider) = this.pending_provider() { + Ok(provider.pending_transaction_status(hash)?) } else { // Err(StarknetApiError::TxnHashNotFound) Ok(None) @@ -593,9 +541,9 @@ where let provider = this.inner.backend.blockchain.provider(); if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - if let Some(executor) = this.pending_executor() { - let block_env = executor.read().block_env(); - let latest_hash = provider.latest_hash().map_err(StarknetApiError::from)?; + if let Some(pending_provider) = this.pending_provider() { + let block_env = pending_provider.pending_block_env()?; + let latest_hash = provider.latest_hash()?; let l1_gas_prices = block_env.l1_gas_prices.clone(); let l1_data_gas_prices = block_env.l1_data_gas_prices.clone(); @@ -616,14 +564,7 @@ where // A block should only include successful transactions, we filter out the // failed ones (didn't pass validation stage). - let transactions = executor - .read() - .transactions() - .iter() - .filter(|(_, receipt)| receipt.is_success()) - .map(|(tx, _)| tx.clone()) - .collect::>(); - + let transactions = pending_provider.pending_transactions()?; let block = PendingBlockWithTxs::new(header, transactions); return Ok(Some(MaybePendingBlockWithTxs::Pending(block))); } @@ -659,8 +600,8 @@ where let provider = this.inner.backend.blockchain.provider(); if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - if let Some(executor) = this.pending_executor() { - let block_env = executor.read().block_env(); + if let Some(pending_provider) = this.pending_provider() { + let block_env = pending_provider.pending_block_env()?; let latest_hash = provider.latest_hash()?; let l1_gas_prices = block_env.l1_gas_prices.clone(); @@ -677,19 +618,11 @@ where protocol_version: this.inner.backend.chain_spec.version.clone(), }; - let receipts = executor - .read() - .transactions() - .iter() - .filter_map(|(tx, result)| match result { - ExecutionResult::Success { receipt, .. } => { - Some((tx.clone(), receipt.clone())) - } - ExecutionResult::Failed { .. } => None, - }) - .collect::>(); + let transactions = pending_provider.pending_transactions()?; + let receipts = pending_provider.pending_receipts()?; + let tx_receipt_pairs = transactions.into_iter().zip(receipts.into_iter()); - let block = PendingBlockWithReceipts::new(header, receipts.into_iter()); + let block = PendingBlockWithReceipts::new(header, tx_receipt_pairs); return Ok(Some(MaybePendingBlockWithReceipts::Pending(block))); } } @@ -724,8 +657,8 @@ where let provider = this.inner.backend.blockchain.provider(); if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - if let Some(executor) = this.pending_executor() { - let block_env = executor.read().block_env(); + if let Some(pending_provider) = this.pending_provider() { + let block_env = pending_provider.pending_block_env()?; let latest_hash = provider.latest_hash().map_err(StarknetApiError::from)?; let l1_gas_prices = block_env.l1_gas_prices.clone(); @@ -747,15 +680,13 @@ where // A block should only include successful transactions, we filter out the // failed ones (didn't pass validation stage). - let transactions = executor - .read() - .transactions() - .iter() - .filter(|(_, receipt)| receipt.is_success()) - .map(|(tx, _)| tx.hash) + let hashes = pending_provider + .pending_transactions()? + .into_iter() + .map(|tx| tx.hash) .collect::>(); - let block = PendingBlockWithTxHashes::new(header, transactions); + let block = PendingBlockWithTxHashes::new(header, hashes); return Ok(Some(MaybePendingBlockWithTxHashes::Pending(block))); } } @@ -1039,9 +970,9 @@ where return Ok(EventsPage { events, continuation_token }); } - if let Some(executor) = self.pending_executor() { + if let Some(provider) = self.pending_provider() { let cursor = utils::events::fetch_pending_events( - &executor, + provider, &filter, chunk_size, cursor, @@ -1058,10 +989,10 @@ where } (EventBlockId::Pending, EventBlockId::Pending) => { - if let Some(executor) = self.pending_executor() { + if let Some(provider) = self.pending_provider() { let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); let new_cursor = utils::events::fetch_pending_events( - &executor, + provider, &filter, chunk_size, cursor, @@ -1126,10 +1057,7 @@ where } } -impl Clone for StarknetApi -where - EF: ExecutorFactory, -{ +impl Clone for StarknetApi { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner) } } diff --git a/crates/katana/rpc/rpc/src/starknet/read.rs b/crates/katana/rpc/rpc/src/starknet/read.rs index 89322967ba..c53d42e535 100644 --- a/crates/katana/rpc/rpc/src/starknet/read.rs +++ b/crates/katana/rpc/rpc/src/starknet/read.rs @@ -3,6 +3,7 @@ use katana_executor::{EntryPointCall, ExecutorFactory}; use katana_primitives::block::BlockIdOrTag; use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash, TxHash}; use katana_primitives::Felt; +use katana_provider::traits::pending::PendingBlockProvider; use katana_rpc_api::starknet::StarknetApiServer; use katana_rpc_types::block::{ BlockHashAndNumber, MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, @@ -21,7 +22,11 @@ use starknet::core::types::TransactionStatus; use super::StarknetApi; #[async_trait] -impl StarknetApiServer for StarknetApi { +impl StarknetApiServer for StarknetApi +where + EF: ExecutorFactory, + P: PendingBlockProvider, +{ async fn chain_id(&self) -> RpcResult { Ok(self.inner.backend.chain_spec.id.id().into()) } diff --git a/crates/katana/rpc/rpc/src/starknet/trace.rs b/crates/katana/rpc/rpc/src/starknet/trace.rs index 5f0594bd6f..77c75dd42a 100644 --- a/crates/katana/rpc/rpc/src/starknet/trace.rs +++ b/crates/katana/rpc/rpc/src/starknet/trace.rs @@ -5,6 +5,7 @@ use katana_primitives::fee::TxFeeInfo; use katana_primitives::trace::{BuiltinCounters, TxExecInfo}; use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash, TxHash, TxType}; use katana_provider::traits::block::{BlockNumberProvider, BlockProvider}; +use katana_provider::traits::pending::PendingBlockProvider; use katana_provider::traits::transaction::{TransactionTraceProvider, TransactionsProviderExt}; use katana_rpc_api::starknet::StarknetTraceApiServer; use katana_rpc_types::error::starknet::StarknetApiError; @@ -20,7 +21,11 @@ use starknet::core::types::{ use super::StarknetApi; -impl StarknetApi { +impl StarknetApi +where + EF: ExecutorFactory, + P: PendingBlockProvider, +{ fn simulate_txs( &self, block_id: BlockIdOrTag, @@ -116,22 +121,22 @@ impl StarknetApi { let provider = self.inner.backend.blockchain.provider(); let block_id: BlockHashOrNumber = match block_id { - BlockIdOrTag::Tag(BlockTag::Pending) => match self.pending_executor() { - Some(state) => { - let pending_block = state.read(); - - // extract the txs from the pending block - let traces = pending_block.transactions().iter().filter_map(|(t, r)| { - if let Some(trace) = r.trace() { - let transaction_hash = t.hash; - let trace_root = to_rpc_trace(trace.clone()); - Some(TransactionTraceWithHash { transaction_hash, trace_root }) - } else { - None - } - }); - - return Ok(traces.collect::>()); + BlockIdOrTag::Tag(BlockTag::Pending) => match self.pending_provider() { + Some(pending_provider) => { + let transactions = pending_provider.pending_transactions()?; + let traces = pending_provider.pending_transaction_traces()?; + + let traces = transactions + .into_iter() + .map(|tx| tx.hash) + .zip(traces) + .map(|(hash, trace)| { + let trace_root = to_rpc_trace(trace); + TransactionTraceWithHash { transaction_hash: hash, trace_root } + }) + .collect::>(); + + return Ok(traces); } // if there is no pending block, return the latest block @@ -162,12 +167,9 @@ impl StarknetApi { use StarknetApiError::TxnHashNotFound; // Check in the pending block first - if let Some(state) = self.pending_executor() { - let pending_block = state.read(); - let tx = pending_block.transactions().iter().find(|(t, _)| t.hash == tx_hash); - - if let Some(trace) = tx.and_then(|(_, res)| res.trace()) { - return Ok(to_rpc_trace(trace.clone())); + if let Some(provider) = self.pending_provider() { + if let Some(trace) = provider.pending_transaction_trace(tx_hash)? { + return Ok(to_rpc_trace(trace)); } } @@ -180,7 +182,11 @@ impl StarknetApi { } #[async_trait] -impl StarknetTraceApiServer for StarknetApi { +impl StarknetTraceApiServer for StarknetApi +where + EF: ExecutorFactory, + P: PendingBlockProvider, +{ async fn trace_transaction(&self, transaction_hash: TxHash) -> RpcResult { self.on_io_blocking_task(move |this| Ok(this.trace(transaction_hash)?)).await } diff --git a/crates/katana/rpc/rpc/src/starknet/write.rs b/crates/katana/rpc/rpc/src/starknet/write.rs index 775b95fbce..dc80455634 100644 --- a/crates/katana/rpc/rpc/src/starknet/write.rs +++ b/crates/katana/rpc/rpc/src/starknet/write.rs @@ -2,6 +2,7 @@ use jsonrpsee::core::{async_trait, RpcResult}; use katana_executor::ExecutorFactory; use katana_pool::TransactionPool; use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash}; +use katana_provider::traits::pending::PendingBlockProvider; use katana_rpc_api::starknet::StarknetWriteApiServer; use katana_rpc_types::error::starknet::StarknetApiError; use katana_rpc_types::transaction::{ @@ -11,7 +12,11 @@ use katana_rpc_types::transaction::{ use super::StarknetApi; -impl StarknetApi { +impl StarknetApi +where + EF: ExecutorFactory, + P: PendingBlockProvider, +{ async fn add_invoke_transaction_impl( &self, tx: BroadcastedInvokeTx, @@ -74,7 +79,11 @@ impl StarknetApi { } #[async_trait] -impl StarknetWriteApiServer for StarknetApi { +impl StarknetWriteApiServer for StarknetApi +where + EF: ExecutorFactory, + P: PendingBlockProvider, +{ async fn add_invoke_transaction( &self, invoke_transaction: BroadcastedInvokeTx, diff --git a/crates/katana/rpc/rpc/src/utils/events.rs b/crates/katana/rpc/rpc/src/utils/events.rs index fe03ab9fd2..b1126f5b11 100644 --- a/crates/katana/rpc/rpc/src/utils/events.rs +++ b/crates/katana/rpc/rpc/src/utils/events.rs @@ -2,7 +2,6 @@ use std::cmp::Ordering; use std::ops::RangeInclusive; use anyhow::Context; -use katana_core::service::block_producer::PendingExecutor; use katana_primitives::block::{BlockHash, BlockNumber}; use katana_primitives::contract::ContractAddress; use katana_primitives::event::ContinuationToken; @@ -11,6 +10,7 @@ use katana_primitives::transaction::TxHash; use katana_primitives::Felt; use katana_provider::error::ProviderError; use katana_provider::traits::block::BlockProvider; +use katana_provider::traits::pending::PendingBlockProvider; use katana_provider::traits::transaction::ReceiptProvider; use katana_rpc_types::error::starknet::StarknetApiError; use starknet::core::types::EmittedEvent; @@ -86,23 +86,23 @@ impl PartialCursor { } pub fn fetch_pending_events( - pending_executor: &PendingExecutor, + pending_provider: &impl PendingBlockProvider, filter: &Filter, chunk_size: u64, cursor: Option, buffer: &mut Vec, ) -> EventQueryResult { - let pending_block = pending_executor.read(); - - let block_env = pending_block.block_env(); - let txs = pending_block.transactions(); + let block_env = pending_provider.pending_block_env()?; + let txs = pending_provider.pending_transactions()?; + let receipts = pending_provider.pending_receipts()?; let cursor = cursor.unwrap_or(Cursor::new_block(block_env.number)); // process individual transactions in the block. // the iterator will start with txn index == cursor.txn.idx for (tx_idx, (tx_hash, events)) in txs .iter() - .filter_map(|(tx, res)| res.receipt().map(|receipt| (tx.hash, receipt.events()))) + .zip(receipts.iter()) + .map(|(tx, receipt)| (tx.hash, receipt.events())) .enumerate() .skip(cursor.txn.idx) { diff --git a/crates/katana/storage/provider/src/traits/mod.rs b/crates/katana/storage/provider/src/traits/mod.rs index eb9f5df0eb..e8eaa07189 100644 --- a/crates/katana/storage/provider/src/traits/mod.rs +++ b/crates/katana/storage/provider/src/traits/mod.rs @@ -5,6 +5,7 @@ pub mod block; pub mod contract; pub mod env; +pub mod pending; pub mod stage; pub mod state; pub mod state_update; diff --git a/crates/katana/storage/provider/src/traits/pending.rs b/crates/katana/storage/provider/src/traits/pending.rs new file mode 100644 index 0000000000..7101c50cda --- /dev/null +++ b/crates/katana/storage/provider/src/traits/pending.rs @@ -0,0 +1,46 @@ +use katana_primitives::env::BlockEnv; +use katana_primitives::receipt::Receipt; +use katana_primitives::trace::TxExecInfo; +use katana_primitives::transaction::{TxHash, TxWithHash}; +use starknet::core::types::TransactionStatus; + +use super::state::StateProvider; +use crate::ProviderResult; + +/// A provider for pending block data ie., header, transactions, receipts, traces (if any). +// In the context of a full node, where the node doesn't produce the blocks itself, how it can +// provide the pending block data could be from a remote sequencer or feeder gateway. But, if the +// node itself is a sequencer, it can provide the pending block data from its own local state. So, +// the main motivation for this trait is to provide a common interface for both cases. +// +// TODO: Maybe more to rpc crate as this is mainly gonna be used in the rpc side. +#[auto_impl::auto_impl(&, Box, Arc)] +pub trait PendingBlockProvider: Send + Sync + 'static { + fn pending_block_env(&self) -> ProviderResult; + + fn pending_block(&self) -> ProviderResult<()>; + + /// Returns all the transactions that are currently in the pending block. + fn pending_transactions(&self) -> ProviderResult>; + + /// Returns all the receipts that are currently in the pending block. + fn pending_receipts(&self) -> ProviderResult>; + + /// Returns all the transaction traces that are currently in the pending block. + fn pending_transaction_traces(&self) -> ProviderResult>; + + /// Returns a transaction in the pending block by its hash. + fn pending_transaction(&self, hash: TxHash) -> ProviderResult>; + + /// Returns a receipt in the pending block by its hash. + fn pending_receipt(&self, hash: TxHash) -> ProviderResult>; + + /// Returns a transaction trace in the pending block by its hash. + fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult>; + + fn pending_transaction_status(&self, hash: TxHash) + -> ProviderResult>; + + /// Returns a [`StateProvider`] for the pending state. + fn pending_state(&self) -> ProviderResult>; +}