Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(katana): pool validation #2344

Merged
merged 11 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

142 changes: 87 additions & 55 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFactory};
use katana_pool::validation::stateful::TxValidator;
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
Expand Down Expand Up @@ -72,37 +73,36 @@
#[allow(missing_debug_implementations)]
pub struct BlockProducer<EF: ExecutorFactory> {
/// The inner mode of mining.
pub inner: RwLock<BlockProducerMode<EF>>,
pub producer: RwLock<BlockProducerMode<EF>>,
/// validator used in the tx pool
// the validator needs to always be built against the state of the block producer, so
// im putting here for now until we find a better way to handle this.
validator: TxValidator,
}

impl<EF: ExecutorFactory> BlockProducer<EF> {
/// Creates a block producer that mines a new block every `interval` milliseconds.
pub fn interval(backend: Arc<Backend<EF>>, interval: u64) -> Self {
Self {
inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new(
backend, interval,
))),
}
let (prod, validator) = IntervalBlockProducer::new(backend, Some(interval));
Self { producer: BlockProducerMode::Interval(prod).into(), validator }
}

/// Creates a new block producer that will only be possible to mine by calling the
/// `katana_generateBlock` RPC method.
pub fn on_demand(backend: Arc<Backend<EF>>) -> Self {
Self {
inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new_no_mining(
backend,
))),
}
let (prod, validator) = IntervalBlockProducer::new(backend, None);
Self { producer: BlockProducerMode::Interval(prod).into(), validator }
}

/// Creates a block producer that mines a new block as soon as there are ready transactions in
/// the transactions pool.
pub fn instant(backend: Arc<Backend<EF>>) -> Self {
Self { inner: RwLock::new(BlockProducerMode::Instant(InstantBlockProducer::new(backend))) }
let (prod, validator) = InstantBlockProducer::new(backend);
Self { producer: BlockProducerMode::Instant(prod).into(), validator }
}

pub(super) fn queue(&self, transactions: Vec<ExecutableTxWithHash>) {
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.queued.push_back(transactions),
BlockProducerMode::Interval(producer) => producer.queued.push_back(transactions),
Expand All @@ -111,26 +111,57 @@

/// Returns `true` if the block producer is running in _interval_ mode. Otherwise, `fales`.
pub fn is_interval_mining(&self) -> bool {
matches!(*self.inner.read(), BlockProducerMode::Interval(_))
matches!(*self.producer.read(), BlockProducerMode::Interval(_))

Check warning on line 114 in crates/katana/core/src/service/block_producer.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/core/src/service/block_producer.rs#L114

Added line #L114 was not covered by tests
}

/// Returns `true` if the block producer is running in _instant_ mode. Otherwise, `fales`.
pub fn is_instant_mining(&self) -> bool {
matches!(*self.inner.read(), BlockProducerMode::Instant(_))
matches!(*self.producer.read(), BlockProducerMode::Instant(_))

Check warning on line 119 in crates/katana/core/src/service/block_producer.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/core/src/service/block_producer.rs#L119

Added line #L119 was not covered by tests
}

// Handler for the `katana_generateBlock` RPC method.
pub fn force_mine(&self) {
trace!(target: LOG_TARGET, "Scheduling force block mining.");
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.force_mine(),
BlockProducerMode::Interval(producer) => producer.force_mine(),
}
}

pub fn validator(&self) -> &TxValidator {
&self.validator
}

pub fn update_validator(&self) -> Result<(), ProviderError> {
let mut mode = self.producer.write();

match &mut *mode {
BlockProducerMode::Instant(pd) => {
let provider = pd.backend.blockchain.provider();
let state = provider.latest()?;

let latest_num = provider.latest_number()?;
let block_env = provider.block_env_at(latest_num.into())?.expect("latest");

self.validator.update(state, &block_env)
}

BlockProducerMode::Interval(pd) => {
let pending_state = pd.executor.0.read();

let state = pending_state.state();
let block_env = pending_state.block_env();

self.validator.update(state, &block_env)
}
};

Ok(())
}

pub(super) fn poll_next(&self, cx: &mut Context<'_>) -> Poll<Option<BlockProductionResult>> {
let mut mode = self.inner.write();
let mut mode = self.producer.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.poll_next_unpin(cx),
BlockProducerMode::Interval(producer) => producer.poll_next_unpin(cx),
Expand Down Expand Up @@ -184,13 +215,13 @@
}

impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
pub fn new(backend: Arc<Backend<EF>>, interval: u64) -> Self {
let interval = {
let duration = Duration::from_millis(interval);
pub fn new(backend: Arc<Backend<EF>>, interval: Option<u64>) -> (Self, TxValidator) {
let interval = interval.map(|time| {
let duration = Duration::from_millis(time);
let mut interval = interval_at(Instant::now() + duration, duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
};
});

let provider = backend.blockchain.provider();

Expand All @@ -199,49 +230,33 @@
backend.update_block_env(&mut block_env);

let state = provider.latest().unwrap();
let executor = backend.executor_factory.with_state_and_block_env(state, block_env);
let executor = PendingExecutor::new(executor);
let executor = backend.executor_factory.with_state_and_block_env(state, block_env.clone());

let blocking_task_spawner = BlockingTaskPool::new().unwrap();
// -- build the validator using the same state and envs as the executor
let state = executor.state();
let cfg = backend.executor_factory.cfg();
let flags = backend.executor_factory.execution_flags();
let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env);

Self {
let producer = Self {
backend,
executor,
interval,
ongoing_mining: None,
blocking_task_spawner,
ongoing_execution: None,
interval: Some(interval),
queued: VecDeque::default(),
executor: PendingExecutor::new(executor),
tx_execution_listeners: RwLock::new(vec![]),
}
blocking_task_spawner: BlockingTaskPool::new().unwrap(),
};

(producer, validator)
}

/// Creates a new [IntervalBlockProducer] with no `interval`. This mode will not produce blocks
/// for every fixed interval, although it will still execute all queued transactions and
/// keep hold of the pending state.
pub fn new_no_mining(backend: Arc<Backend<EF>>) -> Self {
let provider = backend.blockchain.provider();

let latest_num = provider.latest_number().unwrap();
let mut block_env = provider.block_env_at(latest_num.into()).unwrap().unwrap();
backend.update_block_env(&mut block_env);

let state = provider.latest().unwrap();
let executor = backend.executor_factory.with_state_and_block_env(state, block_env);
let executor = PendingExecutor::new(executor);

let blocking_task_spawner = BlockingTaskPool::new().unwrap();

Self {
backend,
executor,
interval: None,
ongoing_mining: None,
queued: VecDeque::default(),
blocking_task_spawner,
ongoing_execution: None,
tx_execution_listeners: RwLock::new(vec![]),
}
pub fn new_no_mining(backend: Arc<Backend<EF>>) -> (Self, TxValidator) {
Self::new(backend, None)

Check warning on line 259 in crates/katana/core/src/service/block_producer.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/core/src/service/block_producer.rs#L258-L259

Added lines #L258 - L259 were not covered by tests
}

pub fn executor(&self) -> PendingExecutor {
Expand Down Expand Up @@ -463,14 +478,29 @@
}

impl<EF: ExecutorFactory> InstantBlockProducer<EF> {
pub fn new(backend: Arc<Backend<EF>>) -> Self {
Self {
pub fn new(backend: Arc<Backend<EF>>) -> (Self, TxValidator) {
let provider = backend.blockchain.provider();

let latest_num = provider.latest_number().expect("latest block num");
let block_env = provider
.block_env_at(latest_num.into())
.expect("provider error")
.expect("latest block env");

let state = provider.latest().expect("latest state");
let cfg = backend.executor_factory.cfg();
let flags = backend.executor_factory.execution_flags();
let validator = TxValidator::new(state, flags.clone(), cfg.clone(), &block_env);

let producer = Self {
backend,
block_mining: None,
queued: VecDeque::default(),
blocking_task_pool: BlockingTaskPool::new().unwrap(),
tx_execution_listeners: RwLock::new(vec![]),
}
};

(producer, validator)
}

pub fn force_mine(&mut self) {
Expand Down Expand Up @@ -528,6 +558,8 @@

let outcome = backend.do_mine_block(&block_env, execution_output)?;

// update pool validator state here

trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block.");

Ok((outcome, txs_outcomes))
Expand Down
10 changes: 8 additions & 2 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@
txs.into_iter().for_each(|tx| {
let hash = tx.calculate_hash();
trace_l1_handler_tx_exec(hash, &tx);
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() })

// ignore result because L1Handler tx will always be valid
let _ =
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() });

Check warning on line 97 in crates/katana/core/src/service/messaging/service.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/core/src/service/messaging/service.rs#L94-L97

Added lines #L94 - L97 were not covered by tests
});

Ok((block_num, txs_count))
Expand All @@ -106,7 +109,10 @@
txs.into_iter().for_each(|tx| {
let hash = tx.calculate_hash();
trace_l1_handler_tx_exec(hash, &tx);
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() })

// ignore result because L1Handler tx will always be valid
let tx = ExecutableTxWithHash { hash, transaction: tx.into() };
let _ = pool.add_transaction(tx);

Check warning on line 115 in crates/katana/core/src/service/messaging/service.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/core/src/service/messaging/service.rs#L112-L115

Added lines #L112 - L115 were not covered by tests
});

Ok((block_num, txs_count))
Expand Down
3 changes: 3 additions & 0 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct NodeService<EF: ExecutorFactory> {
pub(crate) messaging: Option<MessagingService<EF>>,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
// validator: StatefulValidator
}

impl<EF: ExecutorFactory> NodeService<EF> {
Expand Down Expand Up @@ -99,6 +100,8 @@ impl<EF: ExecutorFactory> Future for NodeService<EF> {
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);

pin.block_producer.update_validator().expect("failed to update validator");
}

Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ starknet = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7", features = [ "testing" ], optional = true }
blockifier = { git = "https://github.com/dojoengine/blockifier", branch = "cairo-2.7-new", features = [ "testing" ], optional = true }
katana-cairo = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/executor/benches/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn blockifier(
|| {
// setup state
let state = provider.latest().expect("failed to get latest state");
let state = CachedState::new(StateProviderDb::from(state));
let state = CachedState::new(StateProviderDb::new(state));

(state, &block_context, execution_flags, tx.clone())
},
Expand Down
3 changes: 3 additions & 0 deletions crates/katana/executor/src/abstraction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub trait ExecutorFactory: Send + Sync + 'static + core::fmt::Debug {

/// Returns the configuration environment of the factory.
fn cfg(&self) -> &CfgEnv;

/// Returns the execution flags set by the factory.
fn execution_flags(&self) -> &SimulationFlag;
}

/// An executor that can execute a block of transactions.
Expand Down
6 changes: 3 additions & 3 deletions crates/katana/executor/src/abstraction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ pub struct ResultAndStates {
/// A wrapper around a boxed [StateProvider] for implementing the executor's own state reader
/// traits.
#[derive(Debug)]
pub struct StateProviderDb<'a>(pub(crate) Box<dyn StateProvider + 'a>);
pub struct StateProviderDb<'a>(Box<dyn StateProvider + 'a>);

impl From<Box<dyn StateProvider>> for StateProviderDb<'_> {
fn from(provider: Box<dyn StateProvider>) -> Self {
impl<'a> StateProviderDb<'a> {
pub fn new(provider: Box<dyn StateProvider + 'a>) -> Self {
Self(provider)
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/katana/executor/src/implementation/blockifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Re-export the blockifier crate.
pub use blockifier;

mod error;
mod state;
pub mod utils;
Expand Down Expand Up @@ -63,6 +66,11 @@ impl ExecutorFactory for BlockifierFactory {
fn cfg(&self) -> &CfgEnv {
&self.cfg
}

/// Returns the execution flags set by the factory.
fn execution_flags(&self) -> &SimulationFlag {
&self.flags
}
}

#[derive(Debug)]
Expand All @@ -83,7 +91,7 @@ impl<'a> StarknetVMProcessor<'a> {
) -> Self {
let transactions = Vec::new();
let block_context = utils::block_context_from_envs(&block_env, &cfg_env);
let state = state::CachedState::new(StateProviderDb(state));
let state = state::CachedState::new(StateProviderDb::new(state));
Self { block_context, state, transactions, simulation_flags, stats: Default::default() }
}

Expand Down
Loading
Loading