Skip to content

Commit

Permalink
hotfix(katana): make sure validator state is synced with block produc…
Browse files Browse the repository at this point in the history
…er (#2353)

* wip

* wip

* wip

* fix blockifier patch

* fix

* clippy

* fix

* fmt

* fmt

* fix
  • Loading branch information
kariy authored Aug 28, 2024
1 parent 94e4508 commit 76dae5e
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 121 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

191 changes: 119 additions & 72 deletions crates/katana/core/src/service/block_producer.rs

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub struct NodeService<EF: ExecutorFactory> {
pub(crate) messaging: Option<MessagingService<EF>>,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
// validator: StatefulValidator
}

impl<EF: ExecutorFactory> NodeService<EF> {
Expand Down Expand Up @@ -100,8 +99,6 @@ impl<EF: ExecutorFactory> Future for NodeService<EF> {
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);

pin.block_producer.update_validator().expect("failed to update validator");
}

Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ starknet = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7-new", features = [ "testing" ], optional = true }
blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7-newer", features = [ "testing" ], optional = true }
katana-cairo = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
19 changes: 12 additions & 7 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use katana_core::service::{NodeService, TransactionMiner};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pool::ordering::FiFo;
use katana_pool::validation::stateful::TxValidator;
use katana_pool::{TransactionPool, TxPool};
use katana_primitives::block::FinalityStatus;
use katana_primitives::env::{CfgEnv, FeeTokenAddressses};
Expand Down Expand Up @@ -167,8 +168,8 @@ pub async fn start(

// --- build transaction pool and miner

let validator = block_producer.validator().clone();
let pool = TxPool::new(validator, FiFo::new());
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());
let miner = TransactionMiner::new(pool.add_listener());

// --- build metrics service
Expand Down Expand Up @@ -212,18 +213,18 @@ pub async fn start(

// --- spawn rpc server

let node_components = (pool, backend.clone(), block_producer);
let node_components = (pool, backend.clone(), block_producer, validator);
let rpc_handle = spawn(node_components, server_config).await?;

Ok((rpc_handle, backend))
}

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>),
node_components: (TxPool, Arc<Backend<EF>>, Arc<BlockProducer<EF>>, TxValidator),
config: ServerConfig,
) -> Result<NodeHandle> {
let (pool, backend, block_producer) = node_components;
let (pool, backend, block_producer, validator) = node_components;

let mut methods = RpcModule::new(());
methods.register_method("health", |_, _| Ok(serde_json::json!({ "health": true })))?;
Expand All @@ -232,8 +233,12 @@ pub async fn spawn<EF: ExecutorFactory>(
match api {
ApiKind::Starknet => {
// TODO: merge these into a single logic.
let server =
StarknetApi::new(backend.clone(), pool.clone(), block_producer.clone());
let server = StarknetApi::new(
backend.clone(),
pool.clone(),
block_producer.clone(),
validator.clone(),
);
methods.merge(StarknetApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetWriteApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetTraceApiServer::into_rpc(server))?;
Expand Down
5 changes: 3 additions & 2 deletions crates/katana/pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ where
Ok(hash)
}

ValidationOutcome::Invalid { tx, error } => {
warn!(hash = format!("{:#x}", tx.hash()), "Invalid transaction.");
ValidationOutcome::Invalid { error, .. } => {
warn!(hash = format!("{hash:#x}"), "Invalid transaction.");
Err(PoolError::InvalidTransaction(Box::new(error)))
}

// return as error for now but ideally we should kept the tx in a separate
// queue and revalidate it when the parent tx is added to the pool
ValidationOutcome::Dependent { tx, tx_nonce, current_nonce } => {
info!(hash = format!("{hash:#x}"), "Dependent transaction.");
let err = InvalidTransactionError::InvalidNonce {
address: tx.sender(),
current_nonce,
Expand Down
56 changes: 35 additions & 21 deletions crates/katana/pool/src/validation/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ use crate::tx::PoolTransaction;
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct TxValidator {
inner: Arc<Inner>,
}

struct Inner {
cfg_env: CfgEnv,
execution_flags: SimulationFlag,
validator: Arc<Mutex<StatefulValidatorAdapter>>,
validator: Mutex<StatefulValidatorAdapter>,
permit: Arc<Mutex<()>>,
}

impl TxValidator {
Expand All @@ -35,16 +40,28 @@ impl TxValidator {
execution_flags: SimulationFlag,
cfg_env: CfgEnv,
block_env: &BlockEnv,
permit: Arc<Mutex<()>>,
) -> Self {
let inner = StatefulValidatorAdapter::new(state, block_env, &cfg_env);
Self { cfg_env, execution_flags, validator: Arc::new(Mutex::new(inner)) }
let validator = StatefulValidatorAdapter::new(state, block_env, &cfg_env);
Self {
inner: Arc::new(Inner {
permit,
cfg_env,
execution_flags,
validator: Mutex::new(validator),
}),
}
}

/// Reset the state of the validator with the given params. This method is used to update the
/// validator's state with a new state and block env after a block is mined.
pub fn update(&self, state: Box<dyn StateProvider>, block_env: &BlockEnv) {
let updated = StatefulValidatorAdapter::new(state, block_env, &self.cfg_env);
*self.validator.lock() = updated;
pub fn update(&self, new_state: Box<dyn StateProvider>, block_env: &BlockEnv) {
let mut validator = self.inner.validator.lock();

let mut state = validator.inner.tx_executor.block_state.take().unwrap();
state.state = StateProviderDb::new(new_state);

*validator = StatefulValidatorAdapter::new_inner(state, block_env, &self.inner.cfg_env);
}

// NOTE:
Expand All @@ -54,7 +71,7 @@ impl TxValidator {
// safety is not guaranteed by TransactionExecutor itself.
pub fn get_nonce(&self, address: ContractAddress) -> Nonce {
let address = to_blk_address(address);
let nonce = self.validator.lock().inner.get_nonce(address).expect("state err");
let nonce = self.inner.validator.lock().inner.get_nonce(address).expect("state err");
nonce.0
}
}
Expand All @@ -65,23 +82,19 @@ struct StatefulValidatorAdapter {
}

impl StatefulValidatorAdapter {
fn new(
state: Box<dyn StateProvider>,
block_env: &BlockEnv,
cfg_env: &CfgEnv,
) -> StatefulValidatorAdapter {
let inner = Self::new_inner(state, block_env, cfg_env);
Self { inner }
fn new(state: Box<dyn StateProvider>, block_env: &BlockEnv, cfg_env: &CfgEnv) -> Self {
let state = CachedState::new(StateProviderDb::new(state));
Self::new_inner(state, block_env, cfg_env)
}

fn new_inner(
state: Box<dyn StateProvider>,
state: CachedState<StateProviderDb<'static>>,
block_env: &BlockEnv,
cfg_env: &CfgEnv,
) -> StatefulValidator<StateProviderDb<'static>> {
let state = CachedState::new(StateProviderDb::new(state));
) -> Self {
let block_context = block_context_from_envs(block_env, cfg_env);
StatefulValidator::create(state, block_context)
let inner = StatefulValidator::create(state, block_context, Default::default());
Self { inner }
}

/// Used only in the [`Validator::validate`] trait
Expand Down Expand Up @@ -125,7 +138,8 @@ impl Validator for TxValidator {
type Transaction = ExecutableTxWithHash;

fn validate(&self, tx: Self::Transaction) -> ValidationResult<Self::Transaction> {
let this = &mut *self.validator.lock();
let _permit = self.inner.permit.lock();
let this = &mut *self.inner.validator.lock();

// Check if validation of an invoke transaction should be skipped due to deploy_account not
// being proccessed yet. This feature is used to improve UX for users sending
Expand All @@ -145,8 +159,8 @@ impl Validator for TxValidator {
StatefulValidatorAdapter::validate(
this,
tx,
self.execution_flags.skip_validate || skip_validate,
self.execution_flags.skip_fee_transfer,
self.inner.execution_flags.skip_validate || skip_validate,
self.inner.execution_flags.skip_fee_transfer,
)
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/katana/rpc/rpc/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use anyhow::Result;
use katana_core::backend::Backend;
use katana_core::service::block_producer::{BlockProducer, BlockProducerMode, PendingExecutor};
use katana_executor::{ExecutionResult, ExecutorFactory};
use katana_pool::validation::stateful::TxValidator;
use katana_pool::TxPool;
use katana_primitives::block::{
BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber, BlockTag, FinalityStatus,
Expand Down Expand Up @@ -53,6 +54,7 @@ impl<EF: ExecutorFactory> Clone for StarknetApi<EF> {
}

struct Inner<EF: ExecutorFactory> {
validator: TxValidator,
pool: TxPool,
backend: Arc<Backend<EF>>,
block_producer: Arc<BlockProducer<EF>>,
Expand All @@ -64,11 +66,12 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {
backend: Arc<Backend<EF>>,
pool: TxPool,
block_producer: Arc<BlockProducer<EF>>,
validator: TxValidator,
) -> Self {
let blocking_task_pool =
BlockingTaskPool::new().expect("failed to create blocking task pool");

let inner = Inner { pool, backend, block_producer, blocking_task_pool };
let inner = Inner { pool, backend, block_producer, blocking_task_pool, validator };

Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -296,8 +299,7 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {
// TODO: this is a temporary solution, we should have a better way to handle this.
// perhaps a pending/pool state provider that implements all the state provider traits.
if let BlockIdOrTag::Tag(BlockTag::Pending) = block_id {
let validator = this.inner.block_producer.validator();
let pool_nonce = validator.get_nonce(contract_address);
let pool_nonce = this.inner.validator.get_nonce(contract_address);
return Ok(pool_nonce);
}

Expand Down
3 changes: 2 additions & 1 deletion crates/katana/rpc/rpc/src/starknet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl<EF: ExecutorFactory> StarknetApi<EF> {

let tx = tx.into_tx_with_chain_id(this.inner.backend.chain_id);
let tx = ExecutableTxWithHash::new(ExecutableTx::Invoke(tx));
let hash = this.inner.pool.add_transaction(tx)?;
let hash =
this.inner.pool.add_transaction(tx).inspect_err(|e| println!("Error: {:?}", e))?;

Ok(hash.into())
})
Expand Down
42 changes: 33 additions & 9 deletions crates/katana/rpc/rpc/tests/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name};
use starknet::macros::felt;
use starknet::providers::{Provider, ProviderError};
use starknet::signers::{LocalWallet, SigningKey};
use tokio::sync::Mutex;

mod common;

Expand Down Expand Up @@ -230,8 +231,8 @@ async fn estimate_fee() -> Result<()> {
}

#[rstest::rstest]
#[tokio::test]
async fn rapid_transactions_submissions(
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_transactions_submissions(
#[values(None, Some(1000))] block_time: Option<u64>,
) -> Result<()> {
// setup test sequencer with the given configuration
Expand All @@ -240,33 +241,56 @@ async fn rapid_transactions_submissions(

let sequencer = TestSequencer::start(sequencer_config, starknet_config).await;
let provider = sequencer.provider();
let account = sequencer.account();
let account = Arc::new(sequencer.account());

// setup test contract to interact with.
abigen_legacy!(Contract, "crates/katana/rpc/rpc/tests/test_data/erc20.json");
let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), &account);

// function call params
let recipient = Felt::ONE;
let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO };

const N: usize = 10;
let mut txs = IndexSet::with_capacity(N);
let initial_nonce =
provider.get_nonce(BlockId::Tag(BlockTag::Pending), sequencer.account().address()).await?;

const N: usize = 100;
let nonce = Arc::new(Mutex::new(initial_nonce));
let txs = Arc::new(Mutex::new(IndexSet::with_capacity(N)));

let mut handles = Vec::with_capacity(N);

for _ in 0..N {
let res = contract.transfer(&recipient, &amount).send().await?;
txs.insert(res.transaction_hash);
let txs = txs.clone();
let nonce = nonce.clone();
let amount = amount.clone();
let account = account.clone();

let handle = tokio::spawn(async move {
let mut nonce = nonce.lock().await;
let contract = Contract::new(DEFAULT_FEE_TOKEN_ADDRESS.into(), account);
let res = contract.transfer(&recipient, &amount).nonce(*nonce).send().await.unwrap();
txs.lock().await.insert(res.transaction_hash);
*nonce += Felt::ONE;
});

handles.push(handle);
}

// wait for all txs to be submitted
for handle in handles {
handle.await?;
}

// Wait only for the last transaction to be accepted
let txs = txs.lock().await;
let last_tx = txs.last().unwrap();
dojo_utils::TransactionWaiter::new(*last_tx, &provider).await?;

// we should've submitted ITERATION transactions
assert_eq!(txs.len(), N);

// check the status of each txs
for hash in txs {
for hash in txs.iter() {
let receipt = provider.get_transaction_receipt(hash).await?;
assert_eq!(receipt.receipt.execution_result(), &ExecutionResult::Succeeded);
assert_eq!(receipt.receipt.finality_status(), &TransactionFinalityStatus::AcceptedOnL2);
Expand Down

0 comments on commit 76dae5e

Please sign in to comment.