Skip to content

Commit

Permalink
feat(katana): pool validation (#2344)
Browse files Browse the repository at this point in the history
add a validator for validating incoming transaction on the rpc level. the validation will happen right before the transaction is added to the pool, and its result will determine whether the tx will get added to the pool or not.

before this all txs regardless of their validity are added to the pool. the validation will happen asynchronously where txs will get validated the same time as it is being executed. this happen later in the execution stage of the pipeline which is detached from the rpc side, at this point the rpc has already finished its request and has no context of the tx validity (hence what it means by asyncly).

this doesn't follow the same behaviour as mainnet, which would invalidate the txs on the rpc level. meaning if your tx, eg has invalid signatures, the sequencer would return the validation error as the response in the same `add_*_transaction` request you used to send the invalid tx.

this change also requires some changes on the blockifier side. added the necessary changes in a new branch [**blockifier:cairo-2.7-new**](https://github.com/dojoengine/blockifier/tree/cairo-2.7-new)
  • Loading branch information
kariy authored Aug 26, 2024
1 parent f9606d8 commit 419d1cf
Show file tree
Hide file tree
Showing 29 changed files with 889 additions and 203 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

142 changes: 87 additions & 55 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFactory};
use katana_pool::validation::stateful::TxValidator;
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
Expand Down Expand Up @@ -72,37 +73,36 @@ type BlockProductionWithTxnsFuture =
#[allow(missing_debug_implementations)]
pub struct BlockProducer<EF: ExecutorFactory> {
/// The inner mode of mining.
pub inner: RwLock<BlockProducerMode<EF>>,
pub producer: RwLock<BlockProducerMode<EF>>,
/// validator used in the tx pool
// the validator needs to always be built against the state of the block producer, so
// im putting here for now until we find a better way to handle this.
validator: TxValidator,
}

impl<EF: ExecutorFactory> BlockProducer<EF> {
/// Creates a block producer that mines a new block every `interval` milliseconds.
pub fn interval(backend: Arc<Backend<EF>>, interval: u64) -> Self {
Self {
inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new(
backend, interval,
))),
}
let (prod, validator) = IntervalBlockProducer::new(backend, Some(interval));
Self { producer: BlockProducerMode::Interval(prod).into(), validator }
}

/// Creates a new block producer that will only be possible to mine by calling the
/// `katana_generateBlock` RPC method.
pub fn on_demand(backend: Arc<Backend<EF>>) -> Self {
Self {
inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new_no_mining(
backend,
))),
}
let (prod, validator) = IntervalBlockProducer::new(backend, None);
Self { producer: BlockProducerMode::Interval(prod).into(), validator }
}

/// Creates a block producer that mines a new block as soon as there are ready transactions in
/// the transactions pool.
pub fn instant(backend: Arc<Backend<EF>>) -> Self {
Self { inner: RwLock::new(BlockProducerMode::Instant(InstantBlockProducer::new(backend))) }
let (prod, validator) = InstantBlockProducer::new(backend);
Self { producer: BlockProducerMode::Instant(prod).into(), validator }
}

pub(super) fn queue(&self, transactions: Vec<ExecutableTxWithHash>) {
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.queued.push_back(transactions),
BlockProducerMode::Interval(producer) => producer.queued.push_back(transactions),
Expand All @@ -111,26 +111,57 @@ impl<EF: ExecutorFactory> BlockProducer<EF> {

/// Returns `true` if the block producer is running in _interval_ mode. Otherwise, `fales`.
pub fn is_interval_mining(&self) -> bool {
matches!(*self.inner.read(), BlockProducerMode::Interval(_))
matches!(*self.producer.read(), BlockProducerMode::Interval(_))
}

/// Returns `true` if the block producer is running in _instant_ mode. Otherwise, `fales`.
pub fn is_instant_mining(&self) -> bool {
matches!(*self.inner.read(), BlockProducerMode::Instant(_))
matches!(*self.producer.read(), BlockProducerMode::Instant(_))
}

// Handler for the `katana_generateBlock` RPC method.
pub fn force_mine(&self) {
trace!(target: LOG_TARGET, "Scheduling force block mining.");
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.force_mine(),
BlockProducerMode::Interval(producer) => producer.force_mine(),
}
}

pub fn validator(&self) -> &TxValidator {
&self.validator
}

pub fn update_validator(&self) -> Result<(), ProviderError> {
let mut mode = self.producer.write();

match &mut *mode {
BlockProducerMode::Instant(pd) => {
let provider = pd.backend.blockchain.provider();
let state = provider.latest()?;

let latest_num = provider.latest_number()?;
let block_env = provider.block_env_at(latest_num.into())?.expect("latest");

self.validator.update(state, &block_env)
}

BlockProducerMode::Interval(pd) => {
let pending_state = pd.executor.0.read();

let state = pending_state.state();
let block_env = pending_state.block_env();

self.validator.update(state, &block_env)
}
};

Ok(())
}

pub(super) fn poll_next(&self, cx: &mut Context<'_>) -> Poll<Option<BlockProductionResult>> {
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.poll_next_unpin(cx),
BlockProducerMode::Interval(producer) => producer.poll_next_unpin(cx),
Expand Down Expand Up @@ -184,13 +215,13 @@ pub struct IntervalBlockProducer<EF: ExecutorFactory> {
}

impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
pub fn new(backend: Arc<Backend<EF>>, interval: u64) -> Self {
let interval = {
let duration = Duration::from_millis(interval);
pub fn new(backend: Arc<Backend<EF>>, interval: Option<u64>) -> (Self, TxValidator) {
let interval = interval.map(|time| {
let duration = Duration::from_millis(time);
let mut interval = interval_at(Instant::now() + duration, duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
};
});

let provider = backend.blockchain.provider();

Expand All @@ -199,49 +230,33 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
backend.update_block_env(&mut block_env);

let state = provider.latest().unwrap();
let executor = backend.executor_factory.with_state_and_block_env(state, block_env);
let executor = PendingExecutor::new(executor);
let executor = backend.executor_factory.with_state_and_block_env(state, block_env.clone());

let blocking_task_spawner = BlockingTaskPool::new().unwrap();
// -- build the validator using the same state and envs as the executor
let state = executor.state();
let cfg = backend.executor_factory.cfg();
let flags = backend.executor_factory.execution_flags();
let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env);

Self {
let producer = Self {
backend,
executor,
interval,
ongoing_mining: None,
blocking_task_spawner,
ongoing_execution: None,
interval: Some(interval),
queued: VecDeque::default(),
executor: PendingExecutor::new(executor),
tx_execution_listeners: RwLock::new(vec![]),
}
blocking_task_spawner: BlockingTaskPool::new().unwrap(),
};

(producer, validator)
}

/// Creates a new [IntervalBlockProducer] with no `interval`. This mode will not produce blocks
/// for every fixed interval, although it will still execute all queued transactions and
/// keep hold of the pending state.
pub fn new_no_mining(backend: Arc<Backend<EF>>) -> Self {
let provider = backend.blockchain.provider();

let latest_num = provider.latest_number().unwrap();
let mut block_env = provider.block_env_at(latest_num.into()).unwrap().unwrap();
backend.update_block_env(&mut block_env);

let state = provider.latest().unwrap();
let executor = backend.executor_factory.with_state_and_block_env(state, block_env);
let executor = PendingExecutor::new(executor);

let blocking_task_spawner = BlockingTaskPool::new().unwrap();

Self {
backend,
executor,
interval: None,
ongoing_mining: None,
queued: VecDeque::default(),
blocking_task_spawner,
ongoing_execution: None,
tx_execution_listeners: RwLock::new(vec![]),
}
pub fn new_no_mining(backend: Arc<Backend<EF>>) -> (Self, TxValidator) {
Self::new(backend, None)
}

pub fn executor(&self) -> PendingExecutor {
Expand Down Expand Up @@ -463,14 +478,29 @@ pub struct InstantBlockProducer<EF: ExecutorFactory> {
}

impl<EF: ExecutorFactory> InstantBlockProducer<EF> {
pub fn new(backend: Arc<Backend<EF>>) -> Self {
Self {
pub fn new(backend: Arc<Backend<EF>>) -> (Self, TxValidator) {
let provider = backend.blockchain.provider();

let latest_num = provider.latest_number().expect("latest block num");
let block_env = provider
.block_env_at(latest_num.into())
.expect("provider error")
.expect("latest block env");

let state = provider.latest().expect("latest state");
let cfg = backend.executor_factory.cfg();
let flags = backend.executor_factory.execution_flags();
let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env);

let producer = Self {
backend,
block_mining: None,
queued: VecDeque::default(),
blocking_task_pool: BlockingTaskPool::new().unwrap(),
tx_execution_listeners: RwLock::new(vec![]),
}
};

(producer, validator)
}

pub fn force_mine(&mut self) {
Expand Down Expand Up @@ -528,6 +558,8 @@ impl<EF: ExecutorFactory> InstantBlockProducer<EF> {

let outcome = backend.do_mine_block(&block_env, execution_output)?;

// update pool validator state here

trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block.");

Ok((outcome, txs_outcomes))
Expand Down
10 changes: 8 additions & 2 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
txs.into_iter().for_each(|tx| {
let hash = tx.calculate_hash();
trace_l1_handler_tx_exec(hash, &tx);
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() })

// ignore result because L1Handler tx will always be valid
let _ =
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() });
});

Ok((block_num, txs_count))
Expand All @@ -106,7 +109,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
txs.into_iter().for_each(|tx| {
let hash = tx.calculate_hash();
trace_l1_handler_tx_exec(hash, &tx);
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() })

// ignore result because L1Handler tx will always be valid
let tx = ExecutableTxWithHash { hash, transaction: tx.into() };
let _ = pool.add_transaction(tx);
});

Ok((block_num, txs_count))
Expand Down
3 changes: 3 additions & 0 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct NodeService<EF: ExecutorFactory> {
pub(crate) messaging: Option<MessagingService<EF>>,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
// validator: StatefulValidator
}

impl<EF: ExecutorFactory> NodeService<EF> {
Expand Down Expand Up @@ -99,6 +100,8 @@ impl<EF: ExecutorFactory> Future for NodeService<EF> {
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);

pin.block_producer.update_validator().expect("failed to update validator");
}

Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ starknet = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7", features = [ "testing" ], optional = true }
blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7-new", features = [ "testing" ], optional = true }
katana-cairo = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/benches/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn blockifier(
|| {
// setup state
let state = provider.latest().expect("failed to get latest state");
let state = CachedState::new(StateProviderDb::from(state));
let state = CachedState::new(StateProviderDb::new(state));

(state, &block_context, execution_flags, tx.clone())
},
Expand Down
3 changes: 3 additions & 0 deletions crates/katana/executor/src/abstraction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub trait ExecutorFactory: Send + Sync + 'static + core::fmt::Debug {

/// Returns the configuration environment of the factory.
fn cfg(&self) -> &CfgEnv;

/// Returns the execution flags set by the factory.
fn execution_flags(&self) -> &SimulationFlag;
}

/// An executor that can execute a block of transactions.
Expand Down
6 changes: 3 additions & 3 deletions crates/katana/executor/src/abstraction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ pub struct ResultAndStates {
/// A wrapper around a boxed [StateProvider] for implementing the executor's own state reader
/// traits.
#[derive(Debug)]
pub struct StateProviderDb<'a>(pub(crate) Box<dyn StateProvider + 'a>);
pub struct StateProviderDb<'a>(Box<dyn StateProvider + 'a>);

impl From<Box<dyn StateProvider>> for StateProviderDb<'_> {
fn from(provider: Box<dyn StateProvider>) -> Self {
impl<'a> StateProviderDb<'a> {
pub fn new(provider: Box<dyn StateProvider + 'a>) -> Self {
Self(provider)
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/katana/executor/src/implementation/blockifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Re-export the blockifier crate.
pub use blockifier;

mod error;
mod state;
pub mod utils;
Expand Down Expand Up @@ -63,6 +66,11 @@ impl ExecutorFactory for BlockifierFactory {
fn cfg(&self) -> &CfgEnv {
&self.cfg
}

/// Returns the execution flags set by the factory.
fn execution_flags(&self) -> &SimulationFlag {
&self.flags
}
}

#[derive(Debug)]
Expand All @@ -83,7 +91,7 @@ impl<'a> StarknetVMProcessor<'a> {
) -> Self {
let transactions = Vec::new();
let block_context = utils::block_context_from_envs(&block_env, &cfg_env);
let state = state::CachedState::new(StateProviderDb(state));
let state = state::CachedState::new(StateProviderDb::new(state));
Self { block_context, state, transactions, simulation_flags, stats: Default::default() }
}

Expand Down
Loading

0 comments on commit 419d1cf

Please sign in to comment.