From 0f72d24bba8c5aa73fef6c8941eee26b3eac82b2 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 30 Nov 2024 15:25:26 -0500 Subject: [PATCH] feat(katana): pipeline execution loop with checkpointing (#2741) --- Cargo.lock | 2 + crates/katana/cli/src/args.rs | 6 +- crates/katana/core/src/backend/storage.rs | 5 +- crates/katana/docs/pipeline.md | 35 ++++ crates/katana/node/Cargo.toml | 2 +- crates/katana/pipeline/Cargo.toml | 2 + crates/katana/pipeline/src/lib.rs | 180 ++++++++++++++++-- crates/katana/pipeline/src/stage/mod.rs | 26 +-- crates/katana/storage/db/src/codecs/mod.rs | 13 ++ .../katana/storage/db/src/codecs/postcard.rs | 2 + crates/katana/storage/db/src/models/mod.rs | 1 + crates/katana/storage/db/src/models/stage.rs | 13 ++ crates/katana/storage/db/src/tables.rs | 13 +- crates/katana/storage/provider/src/lib.rs | 25 ++- .../storage/provider/src/providers/db/mod.rs | 24 ++- .../provider/src/providers/fork/mod.rs | 14 ++ .../katana/storage/provider/src/traits/mod.rs | 1 + .../storage/provider/src/traits/stage.rs | 11 ++ 18 files changed, 334 insertions(+), 41 deletions(-) create mode 100644 crates/katana/docs/pipeline.md create mode 100644 crates/katana/storage/db/src/models/stage.rs create mode 100644 crates/katana/storage/provider/src/traits/stage.rs diff --git a/Cargo.lock b/Cargo.lock index 5673493b1a..6b6939c18b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8495,6 +8495,8 @@ dependencies = [ "katana-core", "katana-executor", "katana-pool", + "katana-primitives", + "katana-provider", "katana-tasks", "thiserror", "tokio", diff --git a/crates/katana/cli/src/args.rs b/crates/katana/cli/src/args.rs index aa852432dc..0976880839 100644 --- a/crates/katana/cli/src/args.rs +++ b/crates/katana/cli/src/args.rs @@ -134,9 +134,9 @@ impl NodeArgs { } fn init_logging(&self) -> Result<()> { - const DEFAULT_LOG_FILTER: &str = "info,tasks=debug,executor=trace,forking::backend=trace,\ - blockifier=off,jsonrpsee_server=off,hyper=off,\ - messaging=debug,node=error"; + const DEFAULT_LOG_FILTER: &str = + "pipeline=debug,stage=debug,info,tasks=debug,executor=trace,forking::backend=trace,\ + blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,node=error"; let filter = if self.development.dev { &format!("{DEFAULT_LOG_FILTER},server=debug") diff --git a/crates/katana/core/src/backend/storage.rs b/crates/katana/core/src/backend/storage.rs index 8bc7b81c3d..1123b9d8f4 100644 --- a/crates/katana/core/src/backend/storage.rs +++ b/crates/katana/core/src/backend/storage.rs @@ -14,6 +14,7 @@ use katana_provider::providers::fork::ForkedProvider; use katana_provider::traits::block::{BlockProvider, BlockWriter}; use katana_provider::traits::contract::ContractClassWriter; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::stage::StageCheckpointProvider; use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter}; use katana_provider::traits::state_update::StateUpdateProvider; use katana_provider::traits::transaction::{ @@ -46,6 +47,7 @@ pub trait Database: + BlockEnvProvider + ClassTrieWriter + ContractTrieWriter + + StageCheckpointProvider + 'static + Send + Sync @@ -69,6 +71,7 @@ impl Database for T where + BlockEnvProvider + ClassTrieWriter + ContractTrieWriter + + StageCheckpointProvider + 'static + Send + Sync @@ -76,7 +79,7 @@ impl Database for T where { } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Blockchain { inner: BlockchainProvider>, } diff --git a/crates/katana/docs/pipeline.md b/crates/katana/docs/pipeline.md new file mode 100644 index 0000000000..0c79474646 --- /dev/null +++ b/crates/katana/docs/pipeline.md @@ -0,0 +1,35 @@ +# Syncing pipeline + +```mermaid +flowchart TD + A[Start Pipeline Run] --> B[Initialize chunk_tip] + + B --> D{Process Blocks in Chunks} + D --> E[run_once_until] + + %% run_once_until subflow + E --> S1[For each Stage] + S1 --> S2[Get Stage Checkpoint] + S2 --> S3{Checkpoint >= Target?} + S3 -->|Yes| S4[Skip Stage] + S3 -->|No| S5[Execute Stage
from checkpoint+1 to target] + S5 --> S6[Update Stage Checkpoint] + S6 --> S1 + S4 --> S1 + + S1 -->|All Stages Complete| F{Reached Target Tip?} + F -->|No| G[Increment chunk_tip by
chunk_size] + G --> D + + F -->|Yes| H[Wait for New Tip] + H -->|New Tip Received| D + H -->|Channel Closed| I[Pipeline Complete] + + style A fill:#f9f,stroke:#333 + style I fill:#f96,stroke:#333 + +%% Example annotations + classDef note fill:#fff,stroke:#333,stroke-dasharray: 5 5 + N1[For example: Tip=1000
chunk_size=100
Processes: 0-100, 100-200, etc]:::note + N1 -.-> D +``` diff --git a/crates/katana/node/Cargo.toml b/crates/katana/node/Cargo.toml index fd55d0f9c7..7db858dd07 100644 --- a/crates/katana/node/Cargo.toml +++ b/crates/katana/node/Cargo.toml @@ -27,9 +27,9 @@ tower = { workspace = true, features = [ "full" ] } tower-http = { workspace = true, features = [ "full" ] } tracing.workspace = true +const_format = "0.2.33" strum.workspace = true strum_macros.workspace = true -const_format = "0.2.33" [build-dependencies] vergen = { version = "9.0.0", features = [ "build", "cargo", "emit_and_set" ] } diff --git a/crates/katana/pipeline/Cargo.toml b/crates/katana/pipeline/Cargo.toml index 5988c7aa0c..adf8b33552 100644 --- a/crates/katana/pipeline/Cargo.toml +++ b/crates/katana/pipeline/Cargo.toml @@ -9,6 +9,8 @@ version.workspace = true katana-core.workspace = true katana-executor.workspace = true katana-pool.workspace = true +katana-primitives.workspace = true +katana-provider = { workspace = true, features = [ "test-utils" ] } katana-tasks.workspace = true anyhow.workspace = true diff --git a/crates/katana/pipeline/src/lib.rs b/crates/katana/pipeline/src/lib.rs index 7850fe4974..081eec88e3 100644 --- a/crates/katana/pipeline/src/lib.rs +++ b/crates/katana/pipeline/src/lib.rs @@ -5,7 +5,11 @@ pub mod stage; use core::future::IntoFuture; use futures::future::BoxFuture; -use stage::Stage; +use katana_primitives::block::BlockNumber; +use katana_provider::error::ProviderError; +use katana_provider::traits::stage::StageCheckpointProvider; +use stage::{Stage, StageExecutionInput}; +use tokio::sync::watch; use tracing::{error, info}; /// The result of a pipeline execution. @@ -16,11 +20,28 @@ pub type PipelineFut = BoxFuture<'static, PipelineResult>; #[derive(Debug, thiserror::Error)] pub enum Error { + #[error("Stage not found: {id}")] + StageNotFound { id: String }, + #[error(transparent)] Stage(#[from] stage::Error), + + #[error(transparent)] + Provider(#[from] ProviderError), } -/// Manages the execution of stages. +#[derive(Debug)] +pub struct PipelineHandle { + tx: watch::Sender>, +} + +impl PipelineHandle { + pub fn set_tip(&self, tip: BlockNumber) { + self.tx.send(Some(tip)).expect("channel closed"); + } +} + +/// Syncing pipeline. /// /// The pipeline drives the execution of stages, running each stage to completion in the order they /// were added. @@ -28,33 +49,100 @@ pub enum Error { /// Inspired by [`reth`]'s staged sync pipeline. /// /// [`reth`]: https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66 -pub struct Pipeline { +pub struct Pipeline

{ + chunk_size: u64, + provider: P, stages: Vec>, + tip: watch::Receiver>, } -impl Pipeline { +impl

Pipeline

{ /// Create a new empty pipeline. - pub fn new() -> Self { - Self { stages: Vec::new() } + pub fn new(provider: P, chunk_size: u64) -> (Self, PipelineHandle) { + let (tx, rx) = watch::channel(None); + let handle = PipelineHandle { tx }; + let pipeline = Self { stages: Vec::new(), tip: rx, provider, chunk_size }; + (pipeline, handle) } /// Insert a new stage into the pipeline. - pub fn add_stage(&mut self, stage: Box) { - self.stages.push(stage); + pub fn add_stage(&mut self, stage: S) { + self.stages.push(Box::new(stage)); + } + + /// Insert multiple stages into the pipeline. + /// + /// The stages will be executed in the order they are appear in the iterator. + pub fn add_stages(&mut self, stages: impl Iterator>) { + self.stages.extend(stages); } +} - /// Start the pipeline. +impl Pipeline

{ + /// Run the pipeline in a loop. pub async fn run(&mut self) -> PipelineResult { - for stage in &mut self.stages { - info!(target: "pipeline", id = %stage.id(), "Executing stage."); - stage.execute().await?; + let mut current_chunk_tip = self.chunk_size; + + loop { + let tip = *self.tip.borrow_and_update(); + + loop { + if let Some(tip) = tip { + let to = current_chunk_tip.min(tip); + self.run_once_until(to).await?; + + if to >= tip { + info!(target: "pipeline", %tip, "Finished processing until tip."); + break; + } else { + current_chunk_tip = (current_chunk_tip + self.chunk_size).min(tip); + } + } + } + + // If we reach here, that means we have run the pipeline up until the `tip`. + // So, wait until the tip has changed. + if self.tip.changed().await.is_err() { + break; + } } + info!(target: "pipeline", "Pipeline finished."); + + Ok(()) + } + + /// Run the pipeline once, until the given block number. + async fn run_once_until(&mut self, to: BlockNumber) -> PipelineResult { + for stage in &mut self.stages { + let id = stage.id(); + + // Get the checkpoint for the stage, otherwise default to block number 0 + let checkpoint = self.provider.checkpoint(id)?.unwrap_or_default(); + + // Skip the stage if the checkpoint is greater than or equal to the target block number + if checkpoint >= to { + info!(target: "pipeline", %id, "Skipping stage."); + continue; + } + + info!(target: "pipeline", %id, from = %checkpoint, %to, "Executing stage."); + + // plus 1 because the checkpoint is inclusive + let input = StageExecutionInput { from: checkpoint + 1, to }; + stage.execute(&input).await?; + self.provider.set_checkpoint(id, to)?; + + info!(target: "pipeline", %id, from = %checkpoint, %to, "Stage execution completed."); + } Ok(()) } } -impl IntoFuture for Pipeline { +impl

IntoFuture for Pipeline

+where + P: StageCheckpointProvider + 'static, +{ type Output = PipelineResult; type IntoFuture = PipelineFut; @@ -67,16 +155,68 @@ impl IntoFuture for Pipeline { } } -impl core::default::Default for Pipeline { - fn default() -> Self { - Self::new() - } -} - -impl core::fmt::Debug for Pipeline { +impl

core::fmt::Debug for Pipeline

+where + P: core::fmt::Debug, +{ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Pipeline") + .field("tip", &self.tip) + .field("provider", &self.provider) + .field("chunk_size", &self.chunk_size) .field("stages", &self.stages.iter().map(|s| s.id()).collect::>()) .finish() } } + +#[cfg(test)] +mod tests { + use katana_provider::test_utils::test_provider; + use katana_provider::traits::stage::StageCheckpointProvider; + + use super::{Pipeline, Stage, StageExecutionInput}; + use crate::stage::StageResult; + + struct MockStage; + + #[async_trait::async_trait] + impl Stage for MockStage { + fn id(&self) -> &'static str { + "Mock" + } + + async fn execute(&mut self, _: &StageExecutionInput) -> StageResult { + Ok(()) + } + } + + #[tokio::test] + async fn stage_checkpoint() { + let provider = test_provider(); + + let (mut pipeline, _handle) = Pipeline::new(&provider, 10); + pipeline.add_stage(MockStage); + + // check that the checkpoint was set + let initial_checkpoint = provider.checkpoint("Mock").unwrap(); + assert_eq!(initial_checkpoint, None); + + pipeline.run_once_until(5).await.expect("failed to run the pipeline once"); + + // check that the checkpoint was set + let actual_checkpoint = provider.checkpoint("Mock").unwrap(); + assert_eq!(actual_checkpoint, Some(5)); + + pipeline.run_once_until(10).await.expect("failed to run the pipeline once"); + + // check that the checkpoint was set + let actual_checkpoint = provider.checkpoint("Mock").unwrap(); + assert_eq!(actual_checkpoint, Some(10)); + + pipeline.run_once_until(10).await.expect("failed to run the pipeline once"); + + // check that the checkpoint doesn't change + let actual_checkpoint = provider.checkpoint("Mock").unwrap(); + assert_eq!(actual_checkpoint, Some(10)); + } +} diff --git a/crates/katana/pipeline/src/stage/mod.rs b/crates/katana/pipeline/src/stage/mod.rs index 6d50761ffc..d01c8811bb 100644 --- a/crates/katana/pipeline/src/stage/mod.rs +++ b/crates/katana/pipeline/src/stage/mod.rs @@ -1,3 +1,6 @@ +use katana_primitives::block::BlockNumber; +use katana_provider::error::ProviderError; + mod sequencing; pub use sequencing::Sequencing; @@ -5,21 +8,22 @@ pub use sequencing::Sequencing; /// The result type of a stage execution. See [Stage::execute]. pub type StageResult = Result<(), Error>; -#[derive(Debug, Clone, Copy)] -pub enum StageId { - Sequencing, +#[derive(Debug, Default, Clone)] +pub struct StageExecutionInput { + pub from: BlockNumber, + pub to: BlockNumber, } -impl core::fmt::Display for StageId { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - StageId::Sequencing => write!(f, "Sequencing"), - } - } +#[derive(Debug, Default)] +pub struct StageExecutionOutput { + pub last_block_processed: BlockNumber, } #[derive(Debug, thiserror::Error)] pub enum Error { + #[error(transparent)] + Provider(#[from] ProviderError), + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -27,8 +31,8 @@ pub enum Error { #[async_trait::async_trait] pub trait Stage: Send + Sync { /// Returns the id which uniquely identifies the stage. - fn id(&self) -> StageId; + fn id(&self) -> &'static str; /// Executes the stage. - async fn execute(&mut self) -> StageResult; + async fn execute(&mut self, input: &StageExecutionInput) -> StageResult; } diff --git a/crates/katana/storage/db/src/codecs/mod.rs b/crates/katana/storage/db/src/codecs/mod.rs index 615a027079..f777f3899a 100644 --- a/crates/katana/storage/db/src/codecs/mod.rs +++ b/crates/katana/storage/db/src/codecs/mod.rs @@ -72,6 +72,19 @@ macro_rules! impl_encode_and_decode_for_felts { impl_encode_and_decode_for_uints!(u64); impl_encode_and_decode_for_felts!(Felt, ContractAddress); +impl Encode for String { + type Encoded = Vec; + fn encode(self) -> Self::Encoded { + self.into_bytes() + } +} + +impl Decode for String { + fn decode>(bytes: B) -> Result { + String::from_utf8(bytes.as_ref().to_vec()).map_err(|e| CodecError::Decode(e.to_string())) + } +} + impl Compress for ContractClass { type Compressed = Vec; fn compress(self) -> Self::Compressed { diff --git a/crates/katana/storage/db/src/codecs/postcard.rs b/crates/katana/storage/db/src/codecs/postcard.rs index 2a154fb756..5b5dfb27c5 100644 --- a/crates/katana/storage/db/src/codecs/postcard.rs +++ b/crates/katana/storage/db/src/codecs/postcard.rs @@ -11,6 +11,7 @@ use crate::error::CodecError; use crate::models::block::StoredBlockBodyIndices; use crate::models::contract::ContractInfoChangeList; use crate::models::list::BlockList; +use crate::models::stage::StageCheckpoint; use crate::models::trie::TrieDatabaseValue; macro_rules! impl_compress_and_decompress_for_table_values { @@ -42,6 +43,7 @@ impl_compress_and_decompress_for_table_values!( TrieDatabaseValue, ContractAddress, BlockList, + StageCheckpoint, GenericContractInfo, StoredBlockBodyIndices, ContractInfoChangeList diff --git a/crates/katana/storage/db/src/models/mod.rs b/crates/katana/storage/db/src/models/mod.rs index 09fe7d1e0c..426c7ea447 100644 --- a/crates/katana/storage/db/src/models/mod.rs +++ b/crates/katana/storage/db/src/models/mod.rs @@ -2,5 +2,6 @@ pub mod block; pub mod class; pub mod contract; pub mod list; +pub mod stage; pub mod storage; pub mod trie; diff --git a/crates/katana/storage/db/src/models/stage.rs b/crates/katana/storage/db/src/models/stage.rs new file mode 100644 index 0000000000..7bd32d2220 --- /dev/null +++ b/crates/katana/storage/db/src/models/stage.rs @@ -0,0 +1,13 @@ +use katana_primitives::block::BlockNumber; +use serde::{Deserialize, Serialize}; + +/// Unique identifier for a pipeline stage. +pub type StageId = String; + +/// Pipeline stage checkpoint. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +#[cfg_attr(test, derive(::arbitrary::Arbitrary))] +pub struct StageCheckpoint { + /// The block number that the stage has processed up to. + pub block: BlockNumber, +} diff --git a/crates/katana/storage/db/src/tables.rs b/crates/katana/storage/db/src/tables.rs index ff6c28c48c..e4223d38cf 100644 --- a/crates/katana/storage/db/src/tables.rs +++ b/crates/katana/storage/db/src/tables.rs @@ -9,6 +9,7 @@ use crate::codecs::{Compress, Decode, Decompress, Encode}; use crate::models::block::StoredBlockBodyIndices; use crate::models::contract::{ContractClassChange, ContractInfoChangeList, ContractNonceChange}; use crate::models::list::BlockList; +use crate::models::stage::{StageCheckpoint, StageId}; use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; use crate::models::trie::{TrieDatabaseKey, TrieDatabaseValue}; @@ -47,7 +48,7 @@ pub enum TableType { DupSort, } -pub const NUM_TABLES: usize = 26; +pub const NUM_TABLES: usize = 27; /// Macro to declare `libmdbx` tables. #[macro_export] @@ -173,10 +174,14 @@ define_tables_enum! {[ (StorageChangeSet, TableType::Table), (ClassTrie, TableType::Table), (ContractTrie, TableType::Table), - (ContractStorageTrie, TableType::Table) + (ContractStorageTrie, TableType::Table), + (StageCheckpoints, TableType::Table) ]} tables! { + /// Pipeline stages checkpoint + StageCheckpoints: (StageId) => StageCheckpoint, + /// Store canonical block headers Headers: (BlockNumber) => Header, /// Stores block hashes according to its block number @@ -275,6 +280,8 @@ mod tests { assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME); assert_eq!(Tables::ALL[23].name(), ClassTrie::NAME); assert_eq!(Tables::ALL[24].name(), ContractTrie::NAME); + assert_eq!(Tables::ALL[25].name(), ContractStorageTrie::NAME); + assert_eq!(Tables::ALL[26].name(), StageCheckpoints::NAME); assert_eq!(Tables::Headers.table_type(), TableType::Table); assert_eq!(Tables::BlockHashes.table_type(), TableType::Table); @@ -301,6 +308,8 @@ mod tests { assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table); assert_eq!(Tables::ClassTrie.table_type(), TableType::Table); assert_eq!(Tables::ContractTrie.table_type(), TableType::Table); + assert_eq!(Tables::ContractStorageTrie.table_type(), TableType::Table); + assert_eq!(Tables::StageCheckpoints.table_type(), TableType::Table); } use katana_primitives::address; diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index 3b737d55e3..9c8c3b9488 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::ops::{Range, RangeInclusive}; +use std::sync::Arc; use katana_db::models::block::StoredBlockBodyIndices; use katana_primitives::block::{ @@ -17,6 +18,7 @@ use katana_primitives::Felt; use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter}; use traits::contract::{ContractClassProvider, ContractClassWriter, ContractClassWriterExt}; use traits::env::BlockEnvProvider; +use traits::stage::StageCheckpointProvider; use traits::state::{StateRootProvider, StateWriter}; use traits::transaction::{TransactionStatusProvider, TransactionTraceProvider}; use traits::trie::{ClassTrieWriter, ContractTrieWriter}; @@ -42,12 +44,18 @@ pub type ProviderResult = Result; /// operation is done through this provider. #[derive(Debug)] pub struct BlockchainProvider { - provider: Db, + provider: Arc, } impl BlockchainProvider { pub fn new(provider: Db) -> Self { - Self { provider } + Self { provider: Arc::new(provider) } + } +} + +impl Clone for BlockchainProvider { + fn clone(&self) -> Self { + Self { provider: self.provider.clone() } } } @@ -409,3 +417,16 @@ where self.provider.insert_updates(block_number, state_updates) } } + +impl StageCheckpointProvider for BlockchainProvider +where + Db: StageCheckpointProvider, +{ + fn checkpoint(&self, id: &str) -> ProviderResult> { + self.provider.checkpoint(id) + } + + fn set_checkpoint(&self, id: &str, block_number: BlockNumber) -> ProviderResult<()> { + self.provider.set_checkpoint(id, block_number) + } +} diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index b8ac71eef9..fc8053b375 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -14,6 +14,7 @@ use katana_db::models::contract::{ ContractClassChange, ContractInfoChangeList, ContractNonceChange, }; use katana_db::models::list::BlockList; +use katana_db::models::stage::StageCheckpoint; use katana_db::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; use katana_db::tables::{self, DupSort, Table}; use katana_db::utils::KeyValue; @@ -38,6 +39,7 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; +use crate::traits::stage::StageCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -48,7 +50,7 @@ use crate::ProviderResult; /// A provider implementation that uses a persistent database as the backend. // TODO: remove the default generic type -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DbProvider(Db); impl DbProvider { @@ -774,6 +776,26 @@ impl BlockWriter for DbProvider { } } +impl StageCheckpointProvider for DbProvider { + fn checkpoint(&self, id: &str) -> ProviderResult> { + let tx = self.0.tx()?; + let result = tx.get::(id.to_string())?; + tx.commit()?; + Ok(result.map(|x| x.block)) + } + + fn set_checkpoint(&self, id: &str, block_number: BlockNumber) -> ProviderResult<()> { + let tx = self.0.tx_mut()?; + + let key = id.to_string(); + let value = StageCheckpoint { block: block_number }; + tx.put::(key, value)?; + + tx.commit()?; + Ok(()) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 5b2c4833d1..4acb26eede 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -32,6 +32,7 @@ use crate::traits::block::{ }; use crate::traits::contract::{ContractClassWriter, ContractClassWriterExt}; use crate::traits::env::BlockEnvProvider; +use crate::traits::stage::StageCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -607,3 +608,16 @@ impl ContractTrieWriter for ForkedProvider { Ok(Felt::ZERO) } } + +impl StageCheckpointProvider for ForkedProvider { + fn checkpoint(&self, id: &str) -> ProviderResult> { + let _ = id; + unimplemented!("syncing is not supported for forked provider") + } + + fn set_checkpoint(&self, id: &str, block_number: BlockNumber) -> ProviderResult<()> { + let _ = id; + let _ = block_number; + unimplemented!("syncing is not supported for forked provider") + } +} diff --git a/crates/katana/storage/provider/src/traits/mod.rs b/crates/katana/storage/provider/src/traits/mod.rs index 7274975da3..eb9f5df0eb 100644 --- a/crates/katana/storage/provider/src/traits/mod.rs +++ b/crates/katana/storage/provider/src/traits/mod.rs @@ -5,6 +5,7 @@ pub mod block; pub mod contract; pub mod env; +pub mod stage; pub mod state; pub mod state_update; pub mod transaction; diff --git a/crates/katana/storage/provider/src/traits/stage.rs b/crates/katana/storage/provider/src/traits/stage.rs new file mode 100644 index 0000000000..1171c7b6a8 --- /dev/null +++ b/crates/katana/storage/provider/src/traits/stage.rs @@ -0,0 +1,11 @@ +use katana_primitives::block::BlockNumber; + +use crate::ProviderResult; + +#[auto_impl::auto_impl(&, Box, Arc)] +pub trait StageCheckpointProvider: Send + Sync { + /// Returns the number of the last block that was successfully processed by the stage. + fn checkpoint(&self, id: &str) -> ProviderResult>; + + fn set_checkpoint(&self, id: &str, block_number: BlockNumber) -> ProviderResult<()>; +}