Skip to content

Commit

Permalink
feat(katana): pipeline execution loop with checkpointing (#2741)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Nov 30, 2024
1 parent e191569 commit 0f72d24
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 41 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/katana/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ impl NodeArgs {
}

fn init_logging(&self) -> Result<()> {
const DEFAULT_LOG_FILTER: &str = "info,tasks=debug,executor=trace,forking::backend=trace,\
blockifier=off,jsonrpsee_server=off,hyper=off,\
messaging=debug,node=error";
const DEFAULT_LOG_FILTER: &str =
"pipeline=debug,stage=debug,info,tasks=debug,executor=trace,forking::backend=trace,\
blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,node=error";

let filter = if self.development.dev {
&format!("{DEFAULT_LOG_FILTER},server=debug")
Expand Down
5 changes: 4 additions & 1 deletion crates/katana/core/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use katana_provider::providers::fork::ForkedProvider;
use katana_provider::traits::block::{BlockProvider, BlockWriter};
use katana_provider::traits::contract::ContractClassWriter;
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::stage::StageCheckpointProvider;
use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter};
use katana_provider::traits::state_update::StateUpdateProvider;
use katana_provider::traits::transaction::{
Expand Down Expand Up @@ -46,6 +47,7 @@ pub trait Database:
+ BlockEnvProvider
+ ClassTrieWriter
+ ContractTrieWriter
+ StageCheckpointProvider
+ 'static
+ Send
+ Sync
Expand All @@ -69,14 +71,15 @@ impl<T> Database for T where
+ BlockEnvProvider
+ ClassTrieWriter
+ ContractTrieWriter
+ StageCheckpointProvider
+ 'static
+ Send
+ Sync
+ core::fmt::Debug
{
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Blockchain {
inner: BlockchainProvider<Box<dyn Database>>,
}
Expand Down
35 changes: 35 additions & 0 deletions crates/katana/docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Syncing pipeline

```mermaid
flowchart TD
A[Start Pipeline Run] --> B[Initialize chunk_tip]
B --> D{Process Blocks in Chunks}
D --> E[run_once_until]
%% run_once_until subflow
E --> S1[For each Stage]
S1 --> S2[Get Stage Checkpoint]
S2 --> S3{Checkpoint >= Target?}
S3 -->|Yes| S4[Skip Stage]
S3 -->|No| S5[Execute Stage<br>from checkpoint+1 to target]
S5 --> S6[Update Stage Checkpoint]
S6 --> S1
S4 --> S1
S1 -->|All Stages Complete| F{Reached Target Tip?}
F -->|No| G[Increment chunk_tip by<br>chunk_size]
G --> D
F -->|Yes| H[Wait for New Tip]
H -->|New Tip Received| D
H -->|Channel Closed| I[Pipeline Complete]
style A fill:#f9f,stroke:#333
style I fill:#f96,stroke:#333
%% Example annotations
classDef note fill:#fff,stroke:#333,stroke-dasharray: 5 5
N1[For example: Tip=1000<br>chunk_size=100<br>Processes: 0-100, 100-200, etc]:::note
N1 -.-> D
```
2 changes: 1 addition & 1 deletion crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ tower = { workspace = true, features = [ "full" ] }
tower-http = { workspace = true, features = [ "full" ] }
tracing.workspace = true

const_format = "0.2.33"
strum.workspace = true
strum_macros.workspace = true
const_format = "0.2.33"

[build-dependencies]
vergen = { version = "9.0.0", features = [ "build", "cargo", "emit_and_set" ] }
Expand Down
2 changes: 2 additions & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ version.workspace = true
katana-core.workspace = true
katana-executor.workspace = true
katana-pool.workspace = true
katana-primitives.workspace = true
katana-provider = { workspace = true, features = [ "test-utils" ] }
katana-tasks.workspace = true

anyhow.workspace = true
Expand Down
180 changes: 160 additions & 20 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ pub mod stage;
use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use katana_primitives::block::BlockNumber;
use katana_provider::error::ProviderError;
use katana_provider::traits::stage::StageCheckpointProvider;
use stage::{Stage, StageExecutionInput};
use tokio::sync::watch;
use tracing::{error, info};

/// The result of a pipeline execution.
Expand All @@ -16,45 +20,129 @@ pub type PipelineFut = BoxFuture<'static, PipelineResult>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Stage not found: {id}")]
StageNotFound { id: String },

#[error(transparent)]
Stage(#[from] stage::Error),

#[error(transparent)]
Provider(#[from] ProviderError),
}

/// Manages the execution of stages.
#[derive(Debug)]
pub struct PipelineHandle {
tx: watch::Sender<Option<BlockNumber>>,
}

impl PipelineHandle {
pub fn set_tip(&self, tip: BlockNumber) {
self.tx.send(Some(tip)).expect("channel closed");
}
}

/// Syncing pipeline.
///
/// The pipeline drives the execution of stages, running each stage to completion in the order they
/// were added.
///
/// Inspired by [`reth`]'s staged sync pipeline.
///
/// [`reth`]: https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66
pub struct Pipeline {
pub struct Pipeline<P> {
chunk_size: u64,
provider: P,
stages: Vec<Box<dyn Stage>>,
tip: watch::Receiver<Option<BlockNumber>>,
}

impl Pipeline {
impl<P> Pipeline<P> {
/// Create a new empty pipeline.
pub fn new() -> Self {
Self { stages: Vec::new() }
pub fn new(provider: P, chunk_size: u64) -> (Self, PipelineHandle) {
let (tx, rx) = watch::channel(None);
let handle = PipelineHandle { tx };
let pipeline = Self { stages: Vec::new(), tip: rx, provider, chunk_size };
(pipeline, handle)
}

/// Insert a new stage into the pipeline.
pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
self.stages.push(stage);
pub fn add_stage<S: Stage + 'static>(&mut self, stage: S) {
self.stages.push(Box::new(stage));
}

/// Insert multiple stages into the pipeline.
///
/// The stages will be executed in the order they are appear in the iterator.
pub fn add_stages(&mut self, stages: impl Iterator<Item = Box<dyn Stage>>) {
self.stages.extend(stages);
}
}

/// Start the pipeline.
impl<P: StageCheckpointProvider> Pipeline<P> {
/// Run the pipeline in a loop.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(target: "pipeline", id = %stage.id(), "Executing stage.");
stage.execute().await?;
let mut current_chunk_tip = self.chunk_size;

loop {
let tip = *self.tip.borrow_and_update();

loop {
if let Some(tip) = tip {
let to = current_chunk_tip.min(tip);
self.run_once_until(to).await?;

if to >= tip {
info!(target: "pipeline", %tip, "Finished processing until tip.");
break;
} else {
current_chunk_tip = (current_chunk_tip + self.chunk_size).min(tip);
}
}
}

// If we reach here, that means we have run the pipeline up until the `tip`.
// So, wait until the tip has changed.
if self.tip.changed().await.is_err() {
break;
}
}

info!(target: "pipeline", "Pipeline finished.");

Ok(())
}

/// Run the pipeline once, until the given block number.
async fn run_once_until(&mut self, to: BlockNumber) -> PipelineResult {
for stage in &mut self.stages {
let id = stage.id();

// Get the checkpoint for the stage, otherwise default to block number 0
let checkpoint = self.provider.checkpoint(id)?.unwrap_or_default();

// Skip the stage if the checkpoint is greater than or equal to the target block number
if checkpoint >= to {
info!(target: "pipeline", %id, "Skipping stage.");
continue;
}

info!(target: "pipeline", %id, from = %checkpoint, %to, "Executing stage.");

// plus 1 because the checkpoint is inclusive
let input = StageExecutionInput { from: checkpoint + 1, to };
stage.execute(&input).await?;
self.provider.set_checkpoint(id, to)?;

info!(target: "pipeline", %id, from = %checkpoint, %to, "Stage execution completed.");
}
Ok(())
}
}

impl IntoFuture for Pipeline {
impl<P> IntoFuture for Pipeline<P>
where
P: StageCheckpointProvider + 'static,
{
type Output = PipelineResult;
type IntoFuture = PipelineFut;

Expand All @@ -67,16 +155,68 @@ impl IntoFuture for Pipeline {
}
}

impl core::default::Default for Pipeline {
fn default() -> Self {
Self::new()
}
}

impl core::fmt::Debug for Pipeline {
impl<P> core::fmt::Debug for Pipeline<P>
where
P: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("tip", &self.tip)
.field("provider", &self.provider)
.field("chunk_size", &self.chunk_size)
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())
.finish()
}
}

#[cfg(test)]
mod tests {
use katana_provider::test_utils::test_provider;
use katana_provider::traits::stage::StageCheckpointProvider;

use super::{Pipeline, Stage, StageExecutionInput};
use crate::stage::StageResult;

struct MockStage;

#[async_trait::async_trait]
impl Stage for MockStage {
fn id(&self) -> &'static str {
"Mock"
}

async fn execute(&mut self, _: &StageExecutionInput) -> StageResult {
Ok(())
}
}

#[tokio::test]
async fn stage_checkpoint() {
let provider = test_provider();

let (mut pipeline, _handle) = Pipeline::new(&provider, 10);
pipeline.add_stage(MockStage);

// check that the checkpoint was set
let initial_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(initial_checkpoint, None);

pipeline.run_once_until(5).await.expect("failed to run the pipeline once");

// check that the checkpoint was set
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(5));

pipeline.run_once_until(10).await.expect("failed to run the pipeline once");

// check that the checkpoint was set
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(10));

pipeline.run_once_until(10).await.expect("failed to run the pipeline once");

// check that the checkpoint doesn't change
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(10));
}
}
26 changes: 15 additions & 11 deletions crates/katana/pipeline/src/stage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
use katana_primitives::block::BlockNumber;
use katana_provider::error::ProviderError;

mod sequencing;

pub use sequencing::Sequencing;

/// The result type of a stage execution. See [Stage::execute].
pub type StageResult = Result<(), Error>;

#[derive(Debug, Clone, Copy)]
pub enum StageId {
Sequencing,
#[derive(Debug, Default, Clone)]
pub struct StageExecutionInput {
pub from: BlockNumber,
pub to: BlockNumber,
}

impl core::fmt::Display for StageId {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StageId::Sequencing => write!(f, "Sequencing"),
}
}
#[derive(Debug, Default)]
pub struct StageExecutionOutput {
pub last_block_processed: BlockNumber,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Provider(#[from] ProviderError),

#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[async_trait::async_trait]
pub trait Stage: Send + Sync {
/// Returns the id which uniquely identifies the stage.
fn id(&self) -> StageId;
fn id(&self) -> &'static str;

/// Executes the stage.
async fn execute(&mut self) -> StageResult;
async fn execute(&mut self, input: &StageExecutionInput) -> StageResult;
}
Loading

0 comments on commit 0f72d24

Please sign in to comment.