From 419d1cfdbbd1fe276e57a6f2bebc8811d65bf6e4 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 26 Aug 2024 10:41:29 -0700 Subject: [PATCH] feat(katana): pool validation (#2344) add a validator for validating incoming transaction on the rpc level. the validation will happen right before the transaction is added to the pool, and its result will determine whether the tx will get added to the pool or not. before this all txs regardless of their validity are added to the pool. the validation will happen asynchronously where txs will get validated the same time as it is being executed. this happen later in the execution stage of the pipeline which is detached from the rpc side, at this point the rpc has already finished its request and has no context of the tx validity (hence what it means by asyncly). this doesn't follow the same behaviour as mainnet, which would invalidate the txs on the rpc level. meaning if your tx, eg has invalid signatures, the sequencer would return the validation error as the response in the same `add_*_transaction` request you used to send the invalid tx. this change also requires some changes on the blockifier side. added the necessary changes in a new branch [**blockifier:cairo-2.7-new**](https://github.com/dojoengine/blockifier/tree/cairo-2.7-new) --- Cargo.lock | 6 +- .../katana/core/src/service/block_producer.rs | 142 +++++---- .../core/src/service/messaging/service.rs | 10 +- crates/katana/core/src/service/mod.rs | 3 + crates/katana/executor/Cargo.toml | 2 +- crates/katana/executor/benches/execution.rs | 2 +- .../executor/src/abstraction/executor.rs | 3 + crates/katana/executor/src/abstraction/mod.rs | 6 +- .../src/implementation/blockifier/mod.rs | 10 +- .../src/implementation/blockifier/state.rs | 20 +- .../src/implementation/blockifier/utils.rs | 2 +- .../executor/src/implementation/noop.rs | 5 + crates/katana/node/src/lib.rs | 12 +- crates/katana/pool/Cargo.toml | 1 + crates/katana/pool/src/lib.rs | 18 +- crates/katana/pool/src/pool.rs | 76 +++-- crates/katana/pool/src/tx.rs | 11 +- crates/katana/pool/src/validation.rs | 65 ----- crates/katana/pool/src/validation/mod.rs | 130 +++++++++ crates/katana/pool/src/validation/stateful.rs | 206 +++++++++++++ crates/katana/rpc/rpc-types/Cargo.toml | 1 + .../rpc/rpc-types/src/error/starknet.rs | 46 ++- crates/katana/rpc/rpc/Cargo.toml | 2 + crates/katana/rpc/rpc/src/dev.rs | 2 +- crates/katana/rpc/rpc/src/saya.rs | 2 +- crates/katana/rpc/rpc/src/starknet/mod.rs | 12 +- crates/katana/rpc/rpc/src/starknet/write.rs | 15 +- crates/katana/rpc/rpc/src/torii.rs | 6 +- crates/katana/rpc/rpc/tests/starknet.rs | 276 +++++++++++++++++- 29 files changed, 889 insertions(+), 203 deletions(-) delete mode 100644 crates/katana/pool/src/validation.rs create mode 100644 crates/katana/pool/src/validation/mod.rs create mode 100644 crates/katana/pool/src/validation/stateful.rs diff --git a/Cargo.lock b/Cargo.lock index d620677e21..f4a8b47779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,7 +1859,7 @@ dependencies = [ [[package]] name = "blockifier" version = "0.8.0-dev.2" -source = "git+https://github.com/dojoengine/blockifier?branch=cairo-2.7#031eef1b54766bc9799e97c43f63e36b63af30ee" +source = "git+https://github.com/dojoengine/blockifier?branch=cairo-2.7-new#42b2b5e28fd47bdfa0d807109360c41a92edafe4" dependencies = [ "anyhow", "ark-ec", @@ -7920,6 +7920,7 @@ dependencies = [ "futures", "katana-executor", "katana-primitives", + "katana-provider", "parking_lot 0.12.3", "rand", "thiserror", @@ -7990,6 +7991,7 @@ dependencies = [ "dojo-utils", "dojo-world", "futures", + "indexmap 2.2.6", "jsonrpsee 0.16.3", "katana-cairo", "katana-core", @@ -8006,6 +8008,7 @@ dependencies = [ "metrics", "num-traits 0.2.19", "rand", + "rstest 0.18.2", "serde", "serde_json", "starknet 0.11.0", @@ -8038,6 +8041,7 @@ dependencies = [ "katana-cairo", "katana-core", "katana-executor", + "katana-pool", "katana-primitives", "katana-provider", "num-traits 0.2.19", diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 2d72cefa3a..b8a47b3524 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -9,6 +9,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFactory}; +use katana_pool::validation::stateful::TxValidator; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; @@ -72,37 +73,36 @@ type BlockProductionWithTxnsFuture = #[allow(missing_debug_implementations)] pub struct BlockProducer { /// The inner mode of mining. - pub inner: RwLock>, + pub producer: RwLock>, + /// validator used in the tx pool + // the validator needs to always be built against the state of the block producer, so + // im putting here for now until we find a better way to handle this. + validator: TxValidator, } impl BlockProducer { /// Creates a block producer that mines a new block every `interval` milliseconds. pub fn interval(backend: Arc>, interval: u64) -> Self { - Self { - inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new( - backend, interval, - ))), - } + let (prod, validator) = IntervalBlockProducer::new(backend, Some(interval)); + Self { producer: BlockProducerMode::Interval(prod).into(), validator } } /// Creates a new block producer that will only be possible to mine by calling the /// `katana_generateBlock` RPC method. pub fn on_demand(backend: Arc>) -> Self { - Self { - inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new_no_mining( - backend, - ))), - } + let (prod, validator) = IntervalBlockProducer::new(backend, None); + Self { producer: BlockProducerMode::Interval(prod).into(), validator } } /// Creates a block producer that mines a new block as soon as there are ready transactions in /// the transactions pool. pub fn instant(backend: Arc>) -> Self { - Self { inner: RwLock::new(BlockProducerMode::Instant(InstantBlockProducer::new(backend))) } + let (prod, validator) = InstantBlockProducer::new(backend); + Self { producer: BlockProducerMode::Instant(prod).into(), validator } } pub(super) fn queue(&self, transactions: Vec) { - let mut mode = self.inner.write(); + let mut mode = self.producer.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.queued.push_back(transactions), BlockProducerMode::Interval(producer) => producer.queued.push_back(transactions), @@ -111,26 +111,57 @@ impl BlockProducer { /// Returns `true` if the block producer is running in _interval_ mode. Otherwise, `fales`. pub fn is_interval_mining(&self) -> bool { - matches!(*self.inner.read(), BlockProducerMode::Interval(_)) + matches!(*self.producer.read(), BlockProducerMode::Interval(_)) } /// Returns `true` if the block producer is running in _instant_ mode. Otherwise, `fales`. pub fn is_instant_mining(&self) -> bool { - matches!(*self.inner.read(), BlockProducerMode::Instant(_)) + matches!(*self.producer.read(), BlockProducerMode::Instant(_)) } // Handler for the `katana_generateBlock` RPC method. pub fn force_mine(&self) { trace!(target: LOG_TARGET, "Scheduling force block mining."); - let mut mode = self.inner.write(); + let mut mode = self.producer.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.force_mine(), BlockProducerMode::Interval(producer) => producer.force_mine(), } } + pub fn validator(&self) -> &TxValidator { + &self.validator + } + + pub fn update_validator(&self) -> Result<(), ProviderError> { + let mut mode = self.producer.write(); + + match &mut *mode { + BlockProducerMode::Instant(pd) => { + let provider = pd.backend.blockchain.provider(); + let state = provider.latest()?; + + let latest_num = provider.latest_number()?; + let block_env = provider.block_env_at(latest_num.into())?.expect("latest"); + + self.validator.update(state, &block_env) + } + + BlockProducerMode::Interval(pd) => { + let pending_state = pd.executor.0.read(); + + let state = pending_state.state(); + let block_env = pending_state.block_env(); + + self.validator.update(state, &block_env) + } + }; + + Ok(()) + } + pub(super) fn poll_next(&self, cx: &mut Context<'_>) -> Poll> { - let mut mode = self.inner.write(); + let mut mode = self.producer.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.poll_next_unpin(cx), BlockProducerMode::Interval(producer) => producer.poll_next_unpin(cx), @@ -184,13 +215,13 @@ pub struct IntervalBlockProducer { } impl IntervalBlockProducer { - pub fn new(backend: Arc>, interval: u64) -> Self { - let interval = { - let duration = Duration::from_millis(interval); + pub fn new(backend: Arc>, interval: Option) -> (Self, TxValidator) { + let interval = interval.map(|time| { + let duration = Duration::from_millis(time); let mut interval = interval_at(Instant::now() + duration, duration); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); interval - }; + }); let provider = backend.blockchain.provider(); @@ -199,49 +230,33 @@ impl IntervalBlockProducer { backend.update_block_env(&mut block_env); let state = provider.latest().unwrap(); - let executor = backend.executor_factory.with_state_and_block_env(state, block_env); - let executor = PendingExecutor::new(executor); + let executor = backend.executor_factory.with_state_and_block_env(state, block_env.clone()); - let blocking_task_spawner = BlockingTaskPool::new().unwrap(); + // -- build the validator using the same state and envs as the executor + let state = executor.state(); + let cfg = backend.executor_factory.cfg(); + let flags = backend.executor_factory.execution_flags(); + let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env); - Self { + let producer = Self { backend, - executor, + interval, ongoing_mining: None, - blocking_task_spawner, ongoing_execution: None, - interval: Some(interval), queued: VecDeque::default(), + executor: PendingExecutor::new(executor), tx_execution_listeners: RwLock::new(vec![]), - } + blocking_task_spawner: BlockingTaskPool::new().unwrap(), + }; + + (producer, validator) } /// Creates a new [IntervalBlockProducer] with no `interval`. This mode will not produce blocks /// for every fixed interval, although it will still execute all queued transactions and /// keep hold of the pending state. - pub fn new_no_mining(backend: Arc>) -> Self { - let provider = backend.blockchain.provider(); - - let latest_num = provider.latest_number().unwrap(); - let mut block_env = provider.block_env_at(latest_num.into()).unwrap().unwrap(); - backend.update_block_env(&mut block_env); - - let state = provider.latest().unwrap(); - let executor = backend.executor_factory.with_state_and_block_env(state, block_env); - let executor = PendingExecutor::new(executor); - - let blocking_task_spawner = BlockingTaskPool::new().unwrap(); - - Self { - backend, - executor, - interval: None, - ongoing_mining: None, - queued: VecDeque::default(), - blocking_task_spawner, - ongoing_execution: None, - tx_execution_listeners: RwLock::new(vec![]), - } + pub fn new_no_mining(backend: Arc>) -> (Self, TxValidator) { + Self::new(backend, None) } pub fn executor(&self) -> PendingExecutor { @@ -463,14 +478,29 @@ pub struct InstantBlockProducer { } impl InstantBlockProducer { - pub fn new(backend: Arc>) -> Self { - Self { + pub fn new(backend: Arc>) -> (Self, TxValidator) { + let provider = backend.blockchain.provider(); + + let latest_num = provider.latest_number().expect("latest block num"); + let block_env = provider + .block_env_at(latest_num.into()) + .expect("provider error") + .expect("latest block env"); + + let state = provider.latest().expect("latest state"); + let cfg = backend.executor_factory.cfg(); + let flags = backend.executor_factory.execution_flags(); + let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env); + + let producer = Self { backend, block_mining: None, queued: VecDeque::default(), blocking_task_pool: BlockingTaskPool::new().unwrap(), tx_execution_listeners: RwLock::new(vec![]), - } + }; + + (producer, validator) } pub fn force_mine(&mut self) { @@ -528,6 +558,8 @@ impl InstantBlockProducer { let outcome = backend.do_mine_block(&block_env, execution_output)?; + // update pool validator state here + trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block."); Ok((outcome, txs_outcomes)) diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 2850a11cf8..df7bc6f8a7 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -91,7 +91,10 @@ impl MessagingService { txs.into_iter().for_each(|tx| { let hash = tx.calculate_hash(); trace_l1_handler_tx_exec(hash, &tx); - pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() }) + + // ignore result because L1Handler tx will always be valid + let _ = + pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() }); }); Ok((block_num, txs_count)) @@ -106,7 +109,10 @@ impl MessagingService { txs.into_iter().for_each(|tx| { let hash = tx.calculate_hash(); trace_l1_handler_tx_exec(hash, &tx); - pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() }) + + // ignore result because L1Handler tx will always be valid + let tx = ExecutableTxWithHash { hash, transaction: tx.into() }; + let _ = pool.add_transaction(tx); }); Ok((block_num, txs_count)) diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index 0dce5669cd..3a9cbe541b 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -44,6 +44,7 @@ pub struct NodeService { pub(crate) messaging: Option>, /// Metrics for recording the service operations metrics: ServiceMetrics, + // validator: StatefulValidator } impl NodeService { @@ -99,6 +100,8 @@ impl Future for NodeService { let steps_used = outcome.stats.cairo_steps_used; metrics.l1_gas_processed_total.increment(gas_used as u64); metrics.cairo_steps_processed_total.increment(steps_used as u64); + + pin.block_producer.update_validator().expect("failed to update validator"); } Err(err) => { diff --git a/crates/katana/executor/Cargo.toml b/crates/katana/executor/Cargo.toml index 2c028baaa6..28b23c070a 100644 --- a/crates/katana/executor/Cargo.toml +++ b/crates/katana/executor/Cargo.toml @@ -15,7 +15,7 @@ starknet = { workspace = true, optional = true } thiserror.workspace = true tracing.workspace = true -blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7", features = [ "testing" ], optional = true } +blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7-new", features = [ "testing" ], optional = true } katana-cairo = { workspace = true, optional = true } [dev-dependencies] diff --git a/crates/katana/executor/benches/execution.rs b/crates/katana/executor/benches/execution.rs index d68c6a263e..932393d4b5 100644 --- a/crates/katana/executor/benches/execution.rs +++ b/crates/katana/executor/benches/execution.rs @@ -45,7 +45,7 @@ fn blockifier( || { // setup state let state = provider.latest().expect("failed to get latest state"); - let state = CachedState::new(StateProviderDb::from(state)); + let state = CachedState::new(StateProviderDb::new(state)); (state, &block_context, execution_flags, tx.clone()) }, diff --git a/crates/katana/executor/src/abstraction/executor.rs b/crates/katana/executor/src/abstraction/executor.rs index 7395c03882..fee93b0ee5 100644 --- a/crates/katana/executor/src/abstraction/executor.rs +++ b/crates/katana/executor/src/abstraction/executor.rs @@ -28,6 +28,9 @@ pub trait ExecutorFactory: Send + Sync + 'static + core::fmt::Debug { /// Returns the configuration environment of the factory. fn cfg(&self) -> &CfgEnv; + + /// Returns the execution flags set by the factory. + fn execution_flags(&self) -> &SimulationFlag; } /// An executor that can execute a block of transactions. diff --git a/crates/katana/executor/src/abstraction/mod.rs b/crates/katana/executor/src/abstraction/mod.rs index 0dcf0e9b50..b98c5599bb 100644 --- a/crates/katana/executor/src/abstraction/mod.rs +++ b/crates/katana/executor/src/abstraction/mod.rs @@ -149,10 +149,10 @@ pub struct ResultAndStates { /// A wrapper around a boxed [StateProvider] for implementing the executor's own state reader /// traits. #[derive(Debug)] -pub struct StateProviderDb<'a>(pub(crate) Box); +pub struct StateProviderDb<'a>(Box); -impl From> for StateProviderDb<'_> { - fn from(provider: Box) -> Self { +impl<'a> StateProviderDb<'a> { + pub fn new(provider: Box) -> Self { Self(provider) } } diff --git a/crates/katana/executor/src/implementation/blockifier/mod.rs b/crates/katana/executor/src/implementation/blockifier/mod.rs index ff25f043d0..eaeb94951b 100644 --- a/crates/katana/executor/src/implementation/blockifier/mod.rs +++ b/crates/katana/executor/src/implementation/blockifier/mod.rs @@ -1,3 +1,6 @@ +// Re-export the blockifier crate. +pub use blockifier; + mod error; mod state; pub mod utils; @@ -63,6 +66,11 @@ impl ExecutorFactory for BlockifierFactory { fn cfg(&self) -> &CfgEnv { &self.cfg } + + /// Returns the execution flags set by the factory. + fn execution_flags(&self) -> &SimulationFlag { + &self.flags + } } #[derive(Debug)] @@ -83,7 +91,7 @@ impl<'a> StarknetVMProcessor<'a> { ) -> Self { let transactions = Vec::new(); let block_context = utils::block_context_from_envs(&block_env, &cfg_env); - let state = state::CachedState::new(StateProviderDb(state)); + let state = state::CachedState::new(StateProviderDb::new(state)); Self { block_context, state, transactions, simulation_flags, stats: Default::default() } } diff --git a/crates/katana/executor/src/implementation/blockifier/state.rs b/crates/katana/executor/src/implementation/blockifier/state.rs index 3258f94a72..af801d1ebe 100644 --- a/crates/katana/executor/src/implementation/blockifier/state.rs +++ b/crates/katana/executor/src/implementation/blockifier/state.rs @@ -27,8 +27,7 @@ impl<'a> StateReader for StateProviderDb<'a> { &self, contract_address: katana_cairo::starknet_api::core::ContractAddress, ) -> StateResult { - self.0 - .class_hash_of_contract(utils::to_address(contract_address)) + self.class_hash_of_contract(utils::to_address(contract_address)) .map(|v| ClassHash(v.unwrap_or_default())) .map_err(|e| StateError::StateReadError(e.to_string())) } @@ -38,7 +37,6 @@ impl<'a> StateReader for StateProviderDb<'a> { class_hash: katana_cairo::starknet_api::core::ClassHash, ) -> StateResult { if let Some(hash) = self - .0 .compiled_class_hash_of_class_hash(class_hash.0) .map_err(|e| StateError::StateReadError(e.to_string()))? { @@ -53,7 +51,7 @@ impl<'a> StateReader for StateProviderDb<'a> { class_hash: ClassHash, ) -> StateResult { if let Some(class) = - self.0.class(class_hash.0).map_err(|e| StateError::StateReadError(e.to_string()))? + self.class(class_hash.0).map_err(|e| StateError::StateReadError(e.to_string()))? { let class = utils::to_class(class).map_err(|e| StateError::StateReadError(e.to_string()))?; @@ -68,8 +66,7 @@ impl<'a> StateReader for StateProviderDb<'a> { &self, contract_address: katana_cairo::starknet_api::core::ContractAddress, ) -> StateResult { - self.0 - .nonce(utils::to_address(contract_address)) + self.nonce(utils::to_address(contract_address)) .map(|n| Nonce(n.unwrap_or_default())) .map_err(|e| StateError::StateReadError(e.to_string())) } @@ -79,15 +76,14 @@ impl<'a> StateReader for StateProviderDb<'a> { contract_address: katana_cairo::starknet_api::core::ContractAddress, key: katana_cairo::starknet_api::state::StorageKey, ) -> StateResult { - self.0 - .storage(utils::to_address(contract_address), *key.0.key()) + self.storage(utils::to_address(contract_address), *key.0.key()) .map(|v| v.unwrap_or_default()) .map_err(|e| StateError::StateReadError(e.to_string())) } } #[derive(Debug)] -pub(super) struct CachedState(pub(super) Arc>>); +pub struct CachedState(pub(super) Arc>>); impl Clone for CachedState { fn clone(&self) -> Self { @@ -286,7 +282,7 @@ mod tests { #[test] fn can_fetch_from_inner_state_provider() -> anyhow::Result<()> { let state = state_provider(); - let cached_state = CachedState::new(StateProviderDb(state)); + let cached_state = CachedState::new(StateProviderDb::new(state)); let address = ContractAddress::from(felt!("0x67")); let legacy_class_hash = felt!("0x111"); @@ -357,7 +353,7 @@ mod tests { assert_eq!(actual_new_compiled_class_hash, None, "data shouldn't exist"); assert_eq!(actual_new_legacy_compiled_hash, None, "data shouldn't exist"); - let cached_state = CachedState::new(StateProviderDb(sp)); + let cached_state = CachedState::new(StateProviderDb::new(sp)); // insert some data to the cached state { @@ -472,7 +468,7 @@ mod tests { let sp = db.latest()?; - let cached_state = CachedState::new(StateProviderDb(sp)); + let cached_state = CachedState::new(StateProviderDb::new(sp)); let api_address = utils::to_blk_address(address); let api_storage_key = StorageKey(storage_key.try_into().unwrap()); diff --git a/crates/katana/executor/src/implementation/blockifier/utils.rs b/crates/katana/executor/src/implementation/blockifier/utils.rs index 61bcf6c8a9..cc202d3695 100644 --- a/crates/katana/executor/src/implementation/blockifier/utils.rs +++ b/crates/katana/executor/src/implementation/blockifier/utils.rs @@ -179,7 +179,7 @@ pub fn call( Ok(res.execution.retdata.0) } -fn to_executor_tx(tx: ExecutableTxWithHash) -> Transaction { +pub fn to_executor_tx(tx: ExecutableTxWithHash) -> Transaction { let hash = tx.hash; match tx.transaction { diff --git a/crates/katana/executor/src/implementation/noop.rs b/crates/katana/executor/src/implementation/noop.rs index d9f1e205e0..e8551c4e48 100644 --- a/crates/katana/executor/src/implementation/noop.rs +++ b/crates/katana/executor/src/implementation/noop.rs @@ -19,6 +19,7 @@ use crate::ExecutionError; #[derive(Debug, Default)] pub struct NoopExecutorFactory { cfg: CfgEnv, + execution_flags: SimulationFlag, } impl NoopExecutorFactory { @@ -53,6 +54,10 @@ impl ExecutorFactory for NoopExecutorFactory { fn cfg(&self) -> &CfgEnv { &self.cfg } + + fn execution_flags(&self) -> &SimulationFlag { + &self.execution_flags + } } #[derive(Debug, Default)] diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index fe72ac1aaa..4454474d01 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -24,7 +24,6 @@ use katana_core::service::{NodeService, TransactionMiner}; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; use katana_pool::ordering::FiFo; -use katana_pool::validation::NoopValidator; use katana_pool::{TransactionPool, TxPool}; use katana_primitives::block::FinalityStatus; use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; @@ -154,11 +153,6 @@ pub async fn start( config: starknet_config, }); - // --- build transaction pool and miner - - let pool = TxPool::new(NoopValidator::new(), FiFo::new()); - let miner = TransactionMiner::new(pool.add_listener()); - // --- build block producer service let block_producer = if sequencer_config.block_time.is_some() || sequencer_config.no_mining { @@ -171,6 +165,12 @@ pub async fn start( BlockProducer::instant(Arc::clone(&backend)) }; + // --- build transaction pool and miner + + let validator = block_producer.validator().clone(); + let pool = TxPool::new(validator, FiFo::new()); + let miner = TransactionMiner::new(pool.add_listener()); + // --- build metrics service // Metrics recorder must be initialized before calling any of the metrics macros, in order for diff --git a/crates/katana/pool/Cargo.toml b/crates/katana/pool/Cargo.toml index 824edc7de8..256f6ce928 100644 --- a/crates/katana/pool/Cargo.toml +++ b/crates/katana/pool/Cargo.toml @@ -10,6 +10,7 @@ version.workspace = true futures.workspace = true katana-executor.workspace = true katana-primitives.workspace = true +katana-provider.workspace = true parking_lot.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/katana/pool/src/lib.rs b/crates/katana/pool/src/lib.rs index 3a9f484864..465f679732 100644 --- a/crates/katana/pool/src/lib.rs +++ b/crates/katana/pool/src/lib.rs @@ -12,11 +12,21 @@ use katana_primitives::transaction::{ExecutableTxWithHash, TxHash}; use ordering::{FiFo, PoolOrd}; use pool::Pool; use tx::{PendingTx, PoolTransaction}; -use validation::{NoopValidator, Validator}; +use validation::stateful::TxValidator; +use validation::{InvalidTransactionError, Validator}; /// Katana default transacstion pool type. -pub type TxPool = - Pool, FiFo>; +pub type TxPool = Pool>; + +pub type PoolResult = Result; + +#[derive(Debug, thiserror::Error)] +pub enum PoolError { + #[error("Invalid transaction: {0}")] + InvalidTransaction(Box), + #[error("Internal error: {0}")] + Internal(Box), +} /// Represents a complete transaction pool. pub trait TransactionPool { @@ -31,7 +41,7 @@ pub trait TransactionPool { type Validator: Validator; /// Add a new transaction to the pool. - fn add_transaction(&self, tx: Self::Transaction); + fn add_transaction(&self, tx: Self::Transaction) -> PoolResult; fn take_transactions( &self, diff --git a/crates/katana/pool/src/pool.rs b/crates/katana/pool/src/pool.rs index eb4722ae93..ba7b7d8ed8 100644 --- a/crates/katana/pool/src/pool.rs +++ b/crates/katana/pool/src/pool.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::sync::Arc; use std::vec::IntoIter; @@ -8,8 +9,8 @@ use tracing::{error, info, warn}; use crate::ordering::PoolOrd; use crate::tx::{PendingTx, PoolTransaction, TxId}; -use crate::validation::{ValidationOutcome, Validator}; -use crate::TransactionPool; +use crate::validation::{InvalidTransactionError, ValidationOutcome, Validator}; +use crate::{PoolError, PoolResult, TransactionPool}; #[derive(Debug)] pub struct Pool @@ -84,7 +85,7 @@ where impl TransactionPool for Pool where - T: PoolTransaction, + T: PoolTransaction + fmt::Debug, V: Validator, O: PoolOrd, { @@ -92,34 +93,48 @@ where type Validator = V; type Ordering = O; - fn add_transaction(&self, tx: T) { + fn add_transaction(&self, tx: T) -> PoolResult { + let hash = tx.hash(); let id = TxId::new(tx.sender(), tx.nonce()); + info!(hash = format!("{hash:#x}"), "Transaction received."); + match self.inner.validator.validate(tx) { Ok(outcome) => { - let hash = match outcome { + match outcome { ValidationOutcome::Valid(tx) => { // get the priority of the validated tx let priority = self.inner.ordering.priority(&tx); - - let tx = PendingTx::new(id.clone(), tx, priority); - let hash = tx.tx.hash(); + let tx = PendingTx::new(id, tx, priority); // insert the tx in the pool self.inner.transactions.write().push(tx); self.notify_listener(hash); - hash + + Ok(hash) } - // for now, this variant is a no-op - ValidationOutcome::Invalid { tx, .. } => tx.hash(), - }; + ValidationOutcome::Invalid { tx, error } => { + warn!(hash = format!("{:#x}", tx.hash()), "Invalid transaction."); + Err(PoolError::InvalidTransaction(Box::new(error))) + } - info!(hash = format!("{hash:#x}"), "Transaction received."); + // return as error for now but ideally we should kept the tx in a separate + // queue and revalidate it when the parent tx is added to the pool + ValidationOutcome::Dependent { tx, tx_nonce, current_nonce } => { + let err = InvalidTransactionError::InvalidNonce { + address: tx.sender(), + current_nonce, + tx_nonce, + }; + Err(PoolError::InvalidTransaction(Box::new(err))) + } + } } - Err(error @ crate::validation::Error { hash, .. }) => { - error!(hash = format!("{hash:#x}"), %error, "Failed to validate transaction."); + Err(e @ crate::validation::Error { hash, .. }) => { + error!(hash = format!("{hash:#x}"), %e, "Failed to validate transaction."); + Err(PoolError::Internal(e.error)) } } } @@ -193,7 +208,6 @@ where #[cfg(test)] pub(crate) mod test_utils { - use katana_executor::ExecutionError; use katana_primitives::contract::{ContractAddress, Nonce}; use katana_primitives::FieldElement; use rand::Rng; @@ -288,8 +302,11 @@ pub(crate) mod test_utils { fn validate(&self, tx: Self::Transaction) -> ValidationResult { if tx.tip() < self.threshold { return ValidationResult::Ok(ValidationOutcome::Invalid { + error: InvalidTransactionError::InsufficientFunds { + balance: FieldElement::ONE, + max_fee: tx.max_fee(), + }, tx, - error: ExecutionError::Other("tip too low".to_string()), }); } @@ -341,7 +358,9 @@ mod tests { assert!(pool.inner.transactions.read().is_empty()); // add all the txs to the pool - txs.iter().for_each(|tx| pool.add_transaction(tx.clone())); + txs.iter().for_each(|tx| { + let _ = pool.add_transaction(tx.clone()); + }); // all the txs should be in the pool assert_eq!(pool.size(), txs.len()); @@ -389,7 +408,9 @@ mod tests { let mut listener = pool.add_listener(); // start adding txs to the pool - txs.iter().for_each(|tx| pool.add_transaction(tx.clone())); + txs.iter().for_each(|tx| { + let _ = pool.add_transaction(tx.clone()); + }); // the channel should contain all the added txs let mut counter = 0; @@ -434,13 +455,20 @@ mod tests { acc.1.push(tx); acc } + + ValidationOutcome::Dependent { tx, .. } => { + acc.0.push(tx); + acc + } }); assert_eq!(expected_valids.len(), 3); assert_eq!(expected_invalids.len(), 4); // Add all transactions to the pool - all.iter().for_each(|tx| pool.add_transaction(tx.clone())); + all.iter().for_each(|tx| { + let _ = pool.add_transaction(tx.clone()); + }); // Check that all transactions should be in the pool regardless of validity assert!(all.iter().all(|tx| pool.get(tx.hash()).is_some())); @@ -481,7 +509,9 @@ mod tests { let pool = Pool::new(NoopValidator::new(), ordering::Tip::new()); // Add transactions to the pool - txs.iter().for_each(|tx| pool.add_transaction(tx.clone())); + txs.iter().for_each(|tx| { + let _ = pool.add_transaction(tx.clone()); + }); // Get pending transactions let pending = pool.take_transactions().collect::>(); @@ -509,7 +539,9 @@ mod tests { .collect(); // Add all transactions to the pool - txs.iter().for_each(|tx| pool.add_transaction(tx.clone())); + txs.iter().for_each(|tx| { + let _ = pool.add_transaction(tx.clone()); + }); // Get pending transactions let pending = pool.take_transactions().collect::>(); diff --git a/crates/katana/pool/src/tx.rs b/crates/katana/pool/src/tx.rs index 2487aa3e72..7282ad6f32 100644 --- a/crates/katana/pool/src/tx.rs +++ b/crates/katana/pool/src/tx.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Instant; use katana_primitives::contract::{ContractAddress, Nonce}; use katana_primitives::transaction::{ @@ -56,11 +57,12 @@ pub struct PendingTx { pub id: TxId, pub tx: Arc, pub priority: O::PriorityValue, + pub added_at: std::time::Instant, } impl PendingTx { pub fn new(id: TxId, tx: T, priority: O::PriorityValue) -> Self { - Self { id, tx: Arc::new(tx), priority } + Self { id, tx: Arc::new(tx), priority, added_at: Instant::now() } } } @@ -69,7 +71,12 @@ impl PendingTx { impl Clone for PendingTx { fn clone(&self) -> Self { - Self { id: self.id.clone(), tx: Arc::clone(&self.tx), priority: self.priority.clone() } + Self { + id: self.id.clone(), + added_at: self.added_at, + tx: Arc::clone(&self.tx), + priority: self.priority.clone(), + } } } diff --git a/crates/katana/pool/src/validation.rs b/crates/katana/pool/src/validation.rs deleted file mode 100644 index ad7ba895eb..0000000000 --- a/crates/katana/pool/src/validation.rs +++ /dev/null @@ -1,65 +0,0 @@ -use katana_executor::ExecutionError; -use katana_primitives::transaction::TxHash; - -use crate::tx::PoolTransaction; - -#[derive(Debug, thiserror::Error)] -#[error("{error}")] -pub struct Error { - /// The hash of the transaction that failed validation. - pub hash: TxHash, - /// The error that caused the transaction to fail validation. - pub error: Box, -} - -pub type ValidationResult = Result, Error>; - -/// A trait for validating transactions before they are added to the transaction pool. -pub trait Validator { - type Transaction: PoolTransaction; - - /// Validate a transaction. - fn validate(&self, tx: Self::Transaction) -> ValidationResult; - - /// Validate a batch of transactions. - fn validate_all( - &self, - txs: Vec, - ) -> Vec> { - txs.into_iter().map(|tx| self.validate(tx)).collect() - } -} - -// outcome of the validation phase. the variant of this enum determines on which pool -// the tx should be inserted into. -#[derive(Debug)] -pub enum ValidationOutcome { - /// tx that is or may eventually be valid after some nonce changes. - Valid(T), - /// tx that will never be valid, eg. due to invalid signature, nonce lower than current, etc. - Invalid { tx: T, error: ExecutionError }, -} - -/// A no-op validator that does nothing and assume all incoming transactions are valid. -#[derive(Debug)] -pub struct NoopValidator(std::marker::PhantomData); - -impl NoopValidator { - pub fn new() -> Self { - Self(std::marker::PhantomData) - } -} - -impl Validator for NoopValidator { - type Transaction = T; - - fn validate(&self, tx: Self::Transaction) -> ValidationResult { - ValidationResult::Ok(ValidationOutcome::Valid(tx)) - } -} - -impl Default for NoopValidator { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/katana/pool/src/validation/mod.rs b/crates/katana/pool/src/validation/mod.rs new file mode 100644 index 0000000000..68943ecd73 --- /dev/null +++ b/crates/katana/pool/src/validation/mod.rs @@ -0,0 +1,130 @@ +pub mod stateful; + +use katana_primitives::class::ClassHash; +use katana_primitives::contract::{ContractAddress, Nonce}; +use katana_primitives::transaction::TxHash; +use katana_primitives::FieldElement; + +use crate::tx::PoolTransaction; + +#[derive(Debug, thiserror::Error)] +#[error("{error}")] +pub struct Error { + /// The hash of the transaction that failed validation. + pub hash: TxHash, + /// The actual error object. + pub error: Box, +} + +// TODO: figure out how to combine this with ExecutionError +#[derive(Debug, thiserror::Error)] +pub enum InvalidTransactionError { + /// Error when the account's balance is insufficient to cover the specified transaction fee. + #[error("Max fee ({max_fee}) exceeds balance ({balance}).")] + InsufficientFunds { + /// The specified transaction fee. + max_fee: u128, + /// The account's balance of the fee token. + balance: FieldElement, + }, + + /// Error when the specified transaction fee is insufficient to cover the minimum fee required. + #[error("The specified tx max fee is insufficient")] + InsufficientMaxFee { min_fee: u128, max_fee: u128 }, + + /// Error when the account's validation logic fails (ie __validate__ function). + #[error("{error}")] + ValidationFailure { + /// The address of the contract that failed validation. + address: ContractAddress, + /// The class hash of the account contract. + class_hash: ClassHash, + /// The error message returned by Blockifier. + // TODO: this should be a more specific error type. + error: String, + }, + + /// Error when the transaction's sender is not an account contract. + #[error("sender is not an account")] + NonAccount { + /// The address of the contract that is not an account. + address: ContractAddress, + }, + + /// Error when the transaction is using a nonexpected nonce. + #[error( + "Invalid transaction nonce of contract at address {address}. Account nonce: \ + {current_nonce:#x}; got: {tx_nonce:#x}." + )] + InvalidNonce { + /// The address of the contract that has the invalid nonce. + address: ContractAddress, + /// The current nonce of the sender's account. + current_nonce: Nonce, + /// The nonce that the tx is using. + tx_nonce: Nonce, + }, +} + +pub type ValidationResult = Result, Error>; + +/// A trait for validating transactions before they are added to the transaction pool. +pub trait Validator { + type Transaction: PoolTransaction; + + /// Validate a transaction. + fn validate(&self, tx: Self::Transaction) -> ValidationResult; + + /// Validate a batch of transactions. + fn validate_all( + &self, + txs: Vec, + ) -> Vec> { + txs.into_iter().map(|tx| self.validate(tx)).collect() + } +} + +// outcome of the validation phase. the variant of this enum determines on which pool +// the tx should be inserted into. +#[derive(Debug)] +pub enum ValidationOutcome { + /// tx that is or may eventually be valid after some nonce changes. + Valid(T), + + /// tx that will never be valid, eg. due to invalid signature, nonce lower than current, etc. + Invalid { tx: T, error: InvalidTransactionError }, + + /// tx that is dependent on another tx ie. when the tx nonce is higher than the current account + /// nonce. + Dependent { + tx: T, + /// The nonce that the tx is using. + tx_nonce: Nonce, + /// The current nonce of the sender's account. + current_nonce: Nonce, + }, +} + +/// A no-op validator that does nothing and assume all incoming transactions are valid. +#[derive(Debug)] +pub struct NoopValidator(std::marker::PhantomData); + +impl NoopValidator { + pub fn new() -> Self { + Self(std::marker::PhantomData) + } +} + +impl Validator for NoopValidator { + type Transaction = T; + + fn validate(&self, tx: Self::Transaction) -> ValidationResult { + ValidationResult::Ok(ValidationOutcome::Valid(tx)) + } +} + +impl Default for NoopValidator { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/katana/pool/src/validation/stateful.rs b/crates/katana/pool/src/validation/stateful.rs new file mode 100644 index 0000000000..1b57bd376a --- /dev/null +++ b/crates/katana/pool/src/validation/stateful.rs @@ -0,0 +1,206 @@ +use std::sync::Arc; + +use katana_executor::implementation::blockifier::blockifier::blockifier::stateful_validator::{ + StatefulValidator, StatefulValidatorError, +}; +use katana_executor::implementation::blockifier::blockifier::state::cached_state::CachedState; +use katana_executor::implementation::blockifier::blockifier::transaction::errors::{ + TransactionExecutionError, TransactionFeeError, TransactionPreValidationError, +}; +use katana_executor::implementation::blockifier::blockifier::transaction::transaction_execution::Transaction; +use katana_executor::implementation::blockifier::utils::{ + block_context_from_envs, to_address, to_blk_address, to_executor_tx, +}; +use katana_executor::{SimulationFlag, StateProviderDb}; +use katana_primitives::contract::{ContractAddress, Nonce}; +use katana_primitives::env::{BlockEnv, CfgEnv}; +use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash}; +use katana_provider::traits::state::StateProvider; +use parking_lot::Mutex; + +use super::{Error, InvalidTransactionError, ValidationOutcome, ValidationResult, Validator}; +use crate::tx::PoolTransaction; + +#[allow(missing_debug_implementations)] +#[derive(Clone)] +pub struct TxValidator { + cfg_env: CfgEnv, + execution_flags: SimulationFlag, + validator: Arc>, +} + +impl TxValidator { + pub fn new( + state: Box, + execution_flags: SimulationFlag, + cfg_env: CfgEnv, + block_env: &BlockEnv, + ) -> Self { + let inner = StatefulValidatorAdapter::new(state, block_env, &cfg_env); + Self { cfg_env, execution_flags, validator: Arc::new(Mutex::new(inner)) } + } + + /// Reset the state of the validator with the given params. This method is used to update the + /// validator's state with a new state and block env after a block is mined. + pub fn update(&self, state: Box, block_env: &BlockEnv) { + let updated = StatefulValidatorAdapter::new(state, block_env, &self.cfg_env); + *self.validator.lock() = updated; + } + + // NOTE: + // If you check the get_nonce method of StatefulValidator in blockifier, under the hood it + // unwraps the Option to get the state of the TransactionExecutor struct. StatefulValidator + // guaranteees that the state will always be present so it is safe to uwnrap. However, this + // safety is not guaranteed by TransactionExecutor itself. + pub fn get_nonce(&self, address: ContractAddress) -> Nonce { + let address = to_blk_address(address); + let nonce = self.validator.lock().inner.get_nonce(address).expect("state err"); + nonce.0 + } +} + +#[allow(missing_debug_implementations)] +struct StatefulValidatorAdapter { + inner: StatefulValidator>, +} + +impl StatefulValidatorAdapter { + fn new( + state: Box, + block_env: &BlockEnv, + cfg_env: &CfgEnv, + ) -> StatefulValidatorAdapter { + let inner = Self::new_inner(state, block_env, cfg_env); + Self { inner } + } + + fn new_inner( + state: Box, + block_env: &BlockEnv, + cfg_env: &CfgEnv, + ) -> StatefulValidator> { + let state = CachedState::new(StateProviderDb::new(state)); + let block_context = block_context_from_envs(block_env, cfg_env); + StatefulValidator::create(state, block_context) + } + + /// Used only in the [`Validator::validate`] trait + fn validate( + &mut self, + tx: ExecutableTxWithHash, + skip_validate: bool, + skip_fee_check: bool, + ) -> ValidationResult { + match to_executor_tx(tx.clone()) { + Transaction::AccountTransaction(blockifier_tx) => { + // Check if the transaction nonce is higher than the current account nonce, + // if yes, dont't run its validation logic but tag it as dependent + let account = to_blk_address(tx.sender()); + let account_nonce = self.inner.get_nonce(account).expect("state err"); + + if tx.nonce() > account_nonce.0 { + return Ok(ValidationOutcome::Dependent { + current_nonce: account_nonce.0, + tx_nonce: tx.nonce(), + tx, + }); + } + + match self.inner.perform_validations(blockifier_tx, skip_validate, skip_fee_check) { + Ok(()) => Ok(ValidationOutcome::Valid(tx)), + Err(e) => match map_invalid_tx_err(e) { + Ok(error) => Ok(ValidationOutcome::Invalid { tx, error }), + Err(error) => Err(Error { hash: tx.hash, error }), + }, + } + } + + // we skip validation for L1HandlerTransaction + Transaction::L1HandlerTransaction(_) => Ok(ValidationOutcome::Valid(tx)), + } + } +} + +impl Validator for TxValidator { + type Transaction = ExecutableTxWithHash; + + fn validate(&self, tx: Self::Transaction) -> ValidationResult { + let this = &mut *self.validator.lock(); + + // Check if validation of an invoke transaction should be skipped due to deploy_account not + // being proccessed yet. This feature is used to improve UX for users sending + // deploy_account + invoke at once. + let skip_validate = match tx.transaction { + // we skip validation for invoke tx with nonce 1 and nonce 0 in the state, this + ExecutableTx::DeployAccount(_) | ExecutableTx::Declare(_) => false, + + // we skip validation for invoke tx with nonce 1 and nonce 0 in the state, this + _ => { + let address = to_blk_address(tx.sender()); + let account_nonce = this.inner.get_nonce(address).expect("state err"); + tx.nonce() == Nonce::ONE && account_nonce.0 == Nonce::ZERO + } + }; + + StatefulValidatorAdapter::validate( + this, + tx, + self.execution_flags.skip_validate || skip_validate, + self.execution_flags.skip_fee_transfer, + ) + } +} + +fn map_invalid_tx_err( + err: StatefulValidatorError, +) -> Result> { + match err { + StatefulValidatorError::TransactionExecutionError(err) => match err { + e @ TransactionExecutionError::ValidateTransactionError { + storage_address, + class_hash, + .. + } => { + let address = to_address(storage_address); + let class_hash = class_hash.0; + let error = e.to_string(); + Ok(InvalidTransactionError::ValidationFailure { address, class_hash, error }) + } + + _ => Err(Box::new(err)), + }, + + StatefulValidatorError::TransactionPreValidationError(err) => match err { + TransactionPreValidationError::InvalidNonce { + address, + account_nonce, + incoming_tx_nonce, + } => { + let address = to_address(address); + let current_nonce = account_nonce.0; + let tx_nonce = incoming_tx_nonce.0; + Ok(InvalidTransactionError::InvalidNonce { address, current_nonce, tx_nonce }) + } + + TransactionPreValidationError::TransactionFeeError(err) => match err { + TransactionFeeError::MaxFeeExceedsBalance { max_fee, balance } => { + let max_fee = max_fee.0; + let balance = balance.into(); + Ok(InvalidTransactionError::InsufficientFunds { max_fee, balance }) + } + + TransactionFeeError::MaxFeeTooLow { min_fee, max_fee } => { + let max_fee = max_fee.0; + let min_fee = min_fee.0; + Ok(InvalidTransactionError::InsufficientMaxFee { max_fee, min_fee }) + } + + _ => Err(Box::new(err)), + }, + + _ => Err(Box::new(err)), + }, + + _ => Err(Box::new(err)), + } +} diff --git a/crates/katana/rpc/rpc-types/Cargo.toml b/crates/katana/rpc/rpc-types/Cargo.toml index 5a89d5c0b0..52cb37c664 100644 --- a/crates/katana/rpc/rpc-types/Cargo.toml +++ b/crates/katana/rpc/rpc-types/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true [dependencies] katana-cairo.workspace = true katana-core.workspace = true +katana-pool.workspace = true katana-executor.workspace = true katana-primitives.workspace = true katana-provider.workspace = true diff --git a/crates/katana/rpc/rpc-types/src/error/starknet.rs b/crates/katana/rpc/rpc-types/src/error/starknet.rs index 9a5d32ed68..a01dcb701a 100644 --- a/crates/katana/rpc/rpc-types/src/error/starknet.rs +++ b/crates/katana/rpc/rpc-types/src/error/starknet.rs @@ -1,9 +1,12 @@ use jsonrpsee::core::Error; use jsonrpsee::types::error::CallError; use jsonrpsee::types::ErrorObject; +use katana_pool::validation::InvalidTransactionError; +use katana_pool::PoolError; use katana_primitives::event::ContinuationTokenError; use katana_provider::error::ProviderError; use serde::Serialize; +use serde_json::Value; /// Possible list of errors that can be returned by the Starknet API according to the spec: . #[derive(Debug, thiserror::Error, Clone, Serialize)] @@ -52,7 +55,7 @@ pub enum StarknetApiError { #[error("Account balance is smaller than the transaction's max_fee")] InsufficientAccountBalance, #[error("Account validation failed")] - ValidationFailure, + ValidationFailure { reason: String }, #[error("Compilation failed")] CompilationFailed, #[error("Contract class size is too large")] @@ -100,7 +103,7 @@ impl StarknetApiError { StarknetApiError::InvalidTransactionNonce => 52, StarknetApiError::InsufficientMaxFee => 53, StarknetApiError::InsufficientAccountBalance => 54, - StarknetApiError::ValidationFailure => 55, + StarknetApiError::ValidationFailure { .. } => 55, StarknetApiError::CompilationFailed => 56, StarknetApiError::ContractClassSizeIsTooLarge => 57, StarknetApiError::NonAccount => 58, @@ -122,6 +125,11 @@ impl StarknetApiError { StarknetApiError::ContractError { .. } | StarknetApiError::UnexpectedError { .. } | StarknetApiError::TransactionExecutionError { .. } => Some(serde_json::json!(self)), + + StarknetApiError::ValidationFailure { reason } => { + Some(Value::String(reason.to_string())) + } + _ => None, } } @@ -155,6 +163,31 @@ impl From for StarknetApiError { } } +impl From for StarknetApiError { + fn from(error: PoolError) -> Self { + match error { + PoolError::InvalidTransaction(err) => err.into(), + PoolError::Internal(err) => { + StarknetApiError::UnexpectedError { reason: err.to_string() } + } + } + } +} + +impl From> for StarknetApiError { + fn from(error: Box) -> Self { + match error.as_ref() { + InvalidTransactionError::InsufficientFunds { .. } => Self::InsufficientAccountBalance, + InvalidTransactionError::InsufficientMaxFee { .. } => Self::InsufficientMaxFee, + InvalidTransactionError::InvalidNonce { .. } => Self::InvalidTransactionNonce, + InvalidTransactionError::NonAccount { .. } => Self::NonAccount, + InvalidTransactionError::ValidationFailure { error, .. } => { + Self::ValidationFailure { reason: error.to_string() } + } + } + } +} + #[cfg(test)] mod tests { use rstest::rstest; @@ -171,7 +204,6 @@ mod tests { #[case(StarknetApiError::CompilationFailed, 56, "Compilation failed")] #[case(StarknetApiError::ClassHashNotFound, 28, "Class hash not found")] #[case(StarknetApiError::TxnHashNotFound, 29, "Transaction hash not found")] - #[case(StarknetApiError::ValidationFailure, 55, "Account validation failed")] #[case(StarknetApiError::ClassAlreadyDeclared, 51, "Class already declared")] #[case(StarknetApiError::InvalidContractClass, 50, "Invalid contract class")] #[case(StarknetApiError::PageSizeTooBig, 31, "Requested page size is too big")] @@ -240,6 +272,14 @@ mod tests { "reason": "Unexpected error reason".to_string() }), )] + #[case( + StarknetApiError::ValidationFailure { + reason: "Invalid signature".to_string() + }, + 55, + "Account validation failed", + Value::String("Invalid signature".to_string()) + )] fn test_starknet_api_error_to_error_conversion_data_some( #[case] starknet_error: StarknetApiError, #[case] expected_code: i32, diff --git a/crates/katana/rpc/rpc/Cargo.toml b/crates/katana/rpc/rpc/Cargo.toml index 919d2b8e4c..20e1b84af4 100644 --- a/crates/katana/rpc/rpc/Cargo.toml +++ b/crates/katana/rpc/rpc/Cargo.toml @@ -32,11 +32,13 @@ cainome.workspace = true dojo-test-utils.workspace = true dojo-utils.workspace = true dojo-world.workspace = true +indexmap.workspace = true jsonrpsee = { workspace = true, features = [ "client" ] } katana-cairo.workspace = true katana-node = { workspace = true, features = [ "messaging" ] } katana-rpc-api = { workspace = true, features = [ "client" ] } katana-runner.workspace = true +rstest.workspace = true num-traits.workspace = true rand.workspace = true serde.workspace = true diff --git a/crates/katana/rpc/rpc/src/dev.rs b/crates/katana/rpc/rpc/src/dev.rs index ba6d16940c..1d3d98df7e 100644 --- a/crates/katana/rpc/rpc/src/dev.rs +++ b/crates/katana/rpc/rpc/src/dev.rs @@ -21,7 +21,7 @@ impl DevApi { /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. fn pending_executor(&self) -> Option { - match &*self.block_producer.inner.read() { + match &*self.block_producer.producer.read() { BlockProducerMode::Instant(_) => None, BlockProducerMode::Interval(producer) => Some(producer.executor()), } diff --git a/crates/katana/rpc/rpc/src/saya.rs b/crates/katana/rpc/rpc/src/saya.rs index 9c29b92382..c93d4188ae 100644 --- a/crates/katana/rpc/rpc/src/saya.rs +++ b/crates/katana/rpc/rpc/src/saya.rs @@ -44,7 +44,7 @@ impl SayaApi { /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. fn pending_executor(&self) -> Option { - match &*self.block_producer.inner.read() { + match &*self.block_producer.producer.read() { BlockProducerMode::Instant(_) => None, BlockProducerMode::Interval(producer) => Some(producer.executor()), } diff --git a/crates/katana/rpc/rpc/src/starknet/mod.rs b/crates/katana/rpc/rpc/src/starknet/mod.rs index a78c31415f..acdcd43f56 100644 --- a/crates/katana/rpc/rpc/src/starknet/mod.rs +++ b/crates/katana/rpc/rpc/src/starknet/mod.rs @@ -131,7 +131,7 @@ impl StarknetApi { /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. fn pending_executor(&self) -> Option { - match &*self.inner.block_producer.inner.read() { + match &*self.inner.block_producer.producer.read() { BlockProducerMode::Instant(_) => None, BlockProducerMode::Interval(producer) => Some(producer.executor()), } @@ -291,6 +291,16 @@ impl StarknetApi { contract_address: ContractAddress, ) -> Result { self.on_io_blocking_task(move |this| { + // read from the pool state if pending block + // + // TODO: this is a temporary solution, we should have a better way to handle this. + // perhaps a pending/pool state provider that implements all the state provider traits. + if let BlockIdOrTag::Tag(BlockTag::Pending) = block_id { + let validator = this.inner.block_producer.validator(); + let pool_nonce = validator.get_nonce(contract_address); + return Ok(pool_nonce); + } + let state = this.state(&block_id)?; let nonce = state.nonce(contract_address)?.ok_or(StarknetApiError::ContractNotFound)?; Ok(nonce) diff --git a/crates/katana/rpc/rpc/src/starknet/write.rs b/crates/katana/rpc/rpc/src/starknet/write.rs index 7ecf445db0..e30e213b1a 100644 --- a/crates/katana/rpc/rpc/src/starknet/write.rs +++ b/crates/katana/rpc/rpc/src/starknet/write.rs @@ -23,10 +23,9 @@ impl StarknetApi { let tx = tx.into_tx_with_chain_id(this.inner.backend.chain_id); let tx = ExecutableTxWithHash::new(ExecutableTx::Invoke(tx)); - let tx_hash = tx.hash; + let hash = this.inner.pool.add_transaction(tx)?; - this.inner.pool.add_transaction(tx); - Ok(tx_hash.into()) + Ok(hash.into()) }) .await } @@ -46,10 +45,9 @@ impl StarknetApi { let class_hash = tx.class_hash(); let tx = ExecutableTxWithHash::new(ExecutableTx::Declare(tx)); - let tx_hash = tx.hash; + let hash = this.inner.pool.add_transaction(tx)?; - this.inner.pool.add_transaction(tx); - Ok((tx_hash, class_hash).into()) + Ok((hash, class_hash).into()) }) .await } @@ -67,10 +65,9 @@ impl StarknetApi { let contract_address = tx.contract_address(); let tx = ExecutableTxWithHash::new(ExecutableTx::DeployAccount(tx)); - let tx_hash = tx.hash; + let hash = this.inner.pool.add_transaction(tx)?; - this.inner.pool.add_transaction(tx); - Ok((tx_hash, contract_address).into()) + Ok((hash, contract_address).into()) }) .await } diff --git a/crates/katana/rpc/rpc/src/torii.rs b/crates/katana/rpc/rpc/src/torii.rs index 63e09f8b7f..e8db499384 100644 --- a/crates/katana/rpc/rpc/src/torii.rs +++ b/crates/katana/rpc/rpc/src/torii.rs @@ -55,7 +55,7 @@ impl ToriiApi { /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. fn pending_executor(&self) -> Option { - match &*self.block_producer.inner.read() { + match &*self.block_producer.producer.read() { BlockProducerMode::Instant(_) => None, BlockProducerMode::Interval(producer) => Some(producer.executor()), } @@ -152,7 +152,7 @@ impl ToriiApiServer for ToriiApi { // If there are no transactions after the index in the pending block if pending_transactions.is_empty() { // Wait for a new transaction to be executed - let inner = this.block_producer.inner.read(); + let inner = this.block_producer.producer.read(); let block_producer = match &*inner { BlockProducerMode::Interval(block_producer) => block_producer, _ => panic!( @@ -204,7 +204,7 @@ impl ToriiApiServer for ToriiApi { if transactions.is_empty() { // Wait for a new transaction to be executed - let inner = this.block_producer.inner.read(); + let inner = this.block_producer.producer.read(); let block_producer = match &*inner { BlockProducerMode::Instant(block_producer) => block_producer, _ => { diff --git a/crates/katana/rpc/rpc/tests/starknet.rs b/crates/katana/rpc/rpc/tests/starknet.rs index 25175a73be..fd4f51f37b 100644 --- a/crates/katana/rpc/rpc/tests/starknet.rs +++ b/crates/katana/rpc/rpc/tests/starknet.rs @@ -6,20 +6,27 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use assert_matches::assert_matches; use cainome::rs::abigen_legacy; use dojo_test_utils::sequencer::{get_default_test_starknet_config, TestSequencer}; +use indexmap::IndexSet; use katana_core::sequencer::SequencerConfig; -use katana_primitives::genesis::constant::DEFAULT_FEE_TOKEN_ADDRESS; +use katana_primitives::genesis::constant::{ + DEFAULT_FEE_TOKEN_ADDRESS, DEFAULT_PREFUNDED_ACCOUNT_BALANCE, +}; use katana_rpc_types::receipt::ReceiptBlock; -use starknet::accounts::{Account, Call, ConnectedAccount}; +use starknet::accounts::{ + Account, AccountError, Call, ConnectedAccount, ExecutionEncoding, SingleOwnerAccount, +}; use starknet::core::types::contract::legacy::LegacyContractClass; use starknet::core::types::{ - BlockId, BlockTag, DeclareTransactionReceipt, Felt, TransactionFinalityStatus, - TransactionReceipt, + BlockId, BlockTag, DeclareTransactionReceipt, ExecutionResult, Felt, StarknetError, + TransactionFinalityStatus, TransactionReceipt, }; use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::macros::felt; -use starknet::providers::Provider; +use starknet::providers::{Provider, ProviderError}; +use starknet::signers::{LocalWallet, SigningKey}; mod common; @@ -198,10 +205,8 @@ async fn estimate_fee() -> Result<()> { // send a valid transaction first to increment the nonce (so that we can test nonce < current // nonce later) - let result = contract.transfer(&recipient, &amount).send().await?; - - // wait until the tx is included in a block - dojo_utils::TransactionWaiter::new(result.transaction_hash, &provider).await?; + let res = contract.transfer(&recipient, &amount).send().await?; + dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await?; // estimate fee with current nonce (the expected nonce) let nonce = provider.get_nonce(BlockId::Tag(BlockTag::Pending), account.address()).await?; @@ -223,3 +228,256 @@ async fn estimate_fee() -> Result<()> { Ok(()) } + +#[rstest::rstest] +#[tokio::test] +async fn rapid_transactions_submissions( + #[values(None, Some(1000))] block_time: Option, +) -> Result<()> { + // setup test sequencer with the given configuration + let starknet_config = get_default_test_starknet_config(); + let sequencer_config = SequencerConfig { block_time, ..Default::default() }; + + let sequencer = TestSequencer::start(sequencer_config, starknet_config).await; + let provider = sequencer.provider(); + let account = sequencer.account(); + + // setup test contract to interact with. + abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json"); + let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account); + + // function call params + let recipient = Felt::ONE; + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + + const N: usize = 10; + let mut txs = IndexSet::with_capacity(N); + + for _ in 0..N { + let res = contract.transfer(&recipient, &amount).send().await?; + txs.insert(res.transaction_hash); + } + + // Wait only for the last transaction to be accepted + let last_tx = txs.last().unwrap(); + dojo_utils::TransactionWaiter::new(*last_tx, &provider).await?; + + // we should've submitted ITERATION transactions + assert_eq!(txs.len(), N); + + // check the status of each txs + for hash in txs { + let receipt = provider.get_transaction_receipt(hash).await?; + assert_eq!(receipt.receipt.execution_result(), &ExecutionResult::Succeeded); + assert_eq!(receipt.receipt.finality_status(), &TransactionFinalityStatus::AcceptedOnL2); + } + + let nonce = account.get_nonce().await?; + assert_eq!(nonce, Felt::from(N), "Nonce should be incremented by {N} time"); + + Ok(()) +} + +/// Macro used to assert that the given error is a Starknet error. +macro_rules! assert_starknet_err { + ($err:expr, $api_err:pat) => { + assert_matches!($err, AccountError::Provider(ProviderError::StarknetError($api_err))) + }; +} + +#[rstest::rstest] +#[tokio::test] +async fn send_txs_with_insufficient_fee( + #[values(true, false)] disable_fee: bool, + #[values(None, Some(1000))] block_time: Option, +) -> Result<()> { + // setup test sequencer with the given configuration + let mut starknet_config = get_default_test_starknet_config(); + starknet_config.disable_fee = disable_fee; + let sequencer_config = SequencerConfig { block_time, ..Default::default() }; + + let sequencer = TestSequencer::start(sequencer_config, starknet_config).await; + + // setup test contract to interact with. + abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json"); + let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), sequencer.account()); + + // function call params + let recipient = Felt::ONE; + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + + // initial sender's account nonce. use to assert how the txs validity change the account nonce. + let initial_nonce = sequencer.account().get_nonce().await?; + + // ----------------------------------------------------------------------- + // transaction with low max fee (underpriced). + + let res = contract.transfer(&recipient, &amount).max_fee(Felt::TWO).send().await; + + if disable_fee { + // in no fee mode, setting the max fee (which translates to the tx run resources) lower + // than the amount required would result in a validation failure. due to insufficient + // resources. + assert_starknet_err!(res.unwrap_err(), StarknetError::ValidationFailure(_)); + } else { + assert_starknet_err!(res.unwrap_err(), StarknetError::InsufficientMaxFee); + } + + let nonce = sequencer.account().get_nonce().await?; + assert_eq!(initial_nonce, nonce, "Nonce shouldn't change after invalid tx"); + + // ----------------------------------------------------------------------- + // transaction with insufficient balance. + + let fee = Felt::from(DEFAULT_PREFUNDED_ACCOUNT_BALANCE + 1); + let res = contract.transfer(&recipient, &amount).max_fee(fee).send().await; + + if disable_fee { + // in no fee mode, account balance is ignored. as long as the max fee (aka resources) is + // enough to at least run the account validation, the tx should be accepted. + // Wait for the transaction to be accepted + dojo_utils::TransactionWaiter::new(res?.transaction_hash, &sequencer.provider()).await?; + + // nonce should be incremented by 1 after a valid tx. + let nonce = sequencer.account().get_nonce().await?; + assert_eq!(initial_nonce + 1, nonce); + } else { + assert_starknet_err!(res.unwrap_err(), StarknetError::InsufficientAccountBalance); + + // nonce shouldn't change for an invalid tx. + let nonce = sequencer.account().get_nonce().await?; + assert_eq!(initial_nonce, nonce); + } + + Ok(()) +} + +#[rstest::rstest] +#[tokio::test] +async fn send_txs_with_invalid_signature( + #[values(true, false)] disable_validate: bool, + #[values(None, Some(1000))] block_time: Option, +) -> Result<()> { + // setup test sequencer with the given configuration + let mut starknet_config = get_default_test_starknet_config(); + starknet_config.disable_validate = disable_validate; + let sequencer_config = SequencerConfig { block_time, ..Default::default() }; + + let sequencer = TestSequencer::start(sequencer_config, starknet_config).await; + + // starknet-rs doesn't provide a way to manually set the signatures so instead we create an + // account with random signer to simulate invalid signatures. + + let account = SingleOwnerAccount::new( + sequencer.provider(), + LocalWallet::from(SigningKey::from_random()), + sequencer.account().address(), + sequencer.provider().chain_id().await?, + ExecutionEncoding::New, + ); + + // setup test contract to interact with. + abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json"); + let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account); + + // function call params + let recipient = Felt::ONE; + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + + // initial sender's account nonce. use to assert how the txs validity change the account nonce. + let initial_nonce = account.get_nonce().await?; + + // ----------------------------------------------------------------------- + // transaction with invalid signatures. + + // we set the max fee manually here to skip fee estimation. we want to test the pool validator. + let res = contract.transfer(&recipient, &amount).max_fee(felt!("0x1111111111")).send().await; + + if disable_validate { + // Wait for the transaction to be accepted + dojo_utils::TransactionWaiter::new(res?.transaction_hash, &sequencer.provider()).await?; + + // nonce should be incremented by 1 after a valid tx. + let nonce = sequencer.account().get_nonce().await?; + assert_eq!(initial_nonce + 1, nonce); + } else { + assert_starknet_err!(res.unwrap_err(), StarknetError::ValidationFailure(_)); + + // nonce shouldn't change for an invalid tx. + let nonce = sequencer.account().get_nonce().await?; + assert_eq!(initial_nonce, nonce); + } + + Ok(()) +} + +#[rstest::rstest] +#[tokio::test] +async fn send_txs_with_invalid_nonces( + #[values(None, Some(1000))] block_time: Option, +) -> Result<()> { + // setup test sequencer with the given configuration + let starknet_config = get_default_test_starknet_config(); + let sequencer_config = SequencerConfig { block_time, ..Default::default() }; + + let sequencer = TestSequencer::start(sequencer_config, starknet_config).await; + let provider = sequencer.provider(); + let account = sequencer.account(); + + // setup test contract to interact with. + abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json"); + let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account); + + // function call params + let recipient = Felt::ONE; + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + + // set the fee manually here to skip fee estimation. we want to test the pool validator. + let fee = felt!("0x11111111111"); + + // send a valid transaction first to increment the nonce (so that we can test nonce < current + // nonce later) + let res = contract.transfer(&recipient, &amount).send().await?; + dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await?; + + // initial sender's account nonce. use to assert how the txs validity change the account nonce. + let initial_nonce = account.get_nonce().await?; + assert_eq!(initial_nonce, Felt::ONE, "Initial nonce after sending 1st tx should be 1."); + + // ----------------------------------------------------------------------- + // transaction with nonce < account nonce. + + let old_nonce = initial_nonce - Felt::ONE; + let res = contract.transfer(&recipient, &amount).nonce(old_nonce).max_fee(fee).send().await; + assert_starknet_err!(res.unwrap_err(), StarknetError::InvalidTransactionNonce); + + let nonce = account.get_nonce().await?; + assert_eq!(nonce, initial_nonce, "Nonce shouldn't change on invalid tx."); + + // ----------------------------------------------------------------------- + // transaction with nonce = account nonce. + + let curr_nonce = initial_nonce; + let res = contract.transfer(&recipient, &amount).nonce(curr_nonce).max_fee(fee).send().await?; + dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await?; + + let nonce = account.get_nonce().await?; + assert_eq!(nonce, Felt::TWO, "Nonce should be 2 after sending two valid txs."); + + // ----------------------------------------------------------------------- + // transaction with nonce >= account nonce. + // + // ideally, tx with nonce >= account nonce should be considered as valid BUT not to be executed + // immediately and should be kept around in the pool until the nonce is reached. however, + // katana doesn't support this feature yet so the current behaviour is to treat the tx as + // invalid with nonce mismatch error. + + let new_nonce = felt!("0x100"); + let res = contract.transfer(&recipient, &amount).nonce(new_nonce).max_fee(fee).send().await; + assert_starknet_err!(res.unwrap_err(), StarknetError::InvalidTransactionNonce); + + let nonce = account.get_nonce().await?; + assert_eq!(nonce, Felt::TWO, "Nonce shouldn't change bcs the tx is still invalid."); + + Ok(()) +}