Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(katana): pipeline execution loop with checkpointing #2741

Merged
merged 7 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/katana/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ impl NodeArgs {
}

fn init_logging(&self) -> Result<()> {
const DEFAULT_LOG_FILTER: &str = "info,tasks=debug,executor=trace,forking::backend=trace,\
blockifier=off,jsonrpsee_server=off,hyper=off,\
messaging=debug,node=error";
const DEFAULT_LOG_FILTER: &str =
"pipeline=debug,stage=debug,info,tasks=debug,executor=trace,forking::backend=trace,\
blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,node=error";

let filter = if self.development.dev {
&format!("{DEFAULT_LOG_FILTER},server=debug")
Expand Down
5 changes: 4 additions & 1 deletion crates/katana/core/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use katana_provider::providers::fork::ForkedProvider;
use katana_provider::traits::block::{BlockProvider, BlockWriter};
use katana_provider::traits::contract::ContractClassWriter;
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::stage::StageCheckpointProvider;
use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter};
use katana_provider::traits::state_update::StateUpdateProvider;
use katana_provider::traits::transaction::{
Expand Down Expand Up @@ -46,6 +47,7 @@ pub trait Database:
+ BlockEnvProvider
+ ClassTrieWriter
+ ContractTrieWriter
+ StageCheckpointProvider
kariy marked this conversation as resolved.
Show resolved Hide resolved
+ 'static
+ Send
+ Sync
Expand All @@ -69,14 +71,15 @@ impl<T> Database for T where
+ BlockEnvProvider
+ ClassTrieWriter
+ ContractTrieWriter
+ StageCheckpointProvider
+ 'static
+ Send
+ Sync
+ core::fmt::Debug
{
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Blockchain {
inner: BlockchainProvider<Box<dyn Database>>,
}
Expand Down
35 changes: 35 additions & 0 deletions crates/katana/docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Syncing pipeline

```mermaid
flowchart TD
A[Start Pipeline Run] --> B[Initialize chunk_tip]

B --> D{Process Blocks in Chunks}
D --> E[run_once_until]

%% run_once_until subflow
E --> S1[For each Stage]
S1 --> S2[Get Stage Checkpoint]
S2 --> S3{Checkpoint >= Target?}
S3 -->|Yes| S4[Skip Stage]
S3 -->|No| S5[Execute Stage<br>from checkpoint+1 to target]
S5 --> S6[Update Stage Checkpoint]
S6 --> S1
S4 --> S1

S1 -->|All Stages Complete| F{Reached Target Tip?}
F -->|No| G[Increment chunk_tip by<br>chunk_size]
G --> D

F -->|Yes| H[Wait for New Tip]
H -->|New Tip Received| D
H -->|Channel Closed| I[Pipeline Complete]

style A fill:#f9f,stroke:#333
style I fill:#f96,stroke:#333

%% Example annotations
classDef note fill:#fff,stroke:#333,stroke-dasharray: 5 5
N1[For example: Tip=1000<br>chunk_size=100<br>Processes: 0-100, 100-200, etc]:::note
N1 -.-> D
```
2 changes: 1 addition & 1 deletion crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ tower = { workspace = true, features = [ "full" ] }
tower-http = { workspace = true, features = [ "full" ] }
tracing.workspace = true

const_format = "0.2.33"
strum.workspace = true
strum_macros.workspace = true
const_format = "0.2.33"

[build-dependencies]
vergen = { version = "9.0.0", features = [ "build", "cargo", "emit_and_set" ] }
Expand Down
2 changes: 2 additions & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ version.workspace = true
katana-core.workspace = true
katana-executor.workspace = true
katana-pool.workspace = true
katana-primitives.workspace = true
katana-provider = { workspace = true, features = [ "test-utils" ] }
katana-tasks.workspace = true

anyhow.workspace = true
Expand Down
180 changes: 160 additions & 20 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use katana_primitives::block::BlockNumber;
use katana_provider::error::ProviderError;
use katana_provider::traits::stage::StageCheckpointProvider;
use stage::{Stage, StageExecutionInput};
use tokio::sync::watch;
use tracing::{error, info};

/// The result of a pipeline execution.
Expand All @@ -16,45 +20,129 @@

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Stage not found: {id}")]
StageNotFound { id: String },

#[error(transparent)]
Stage(#[from] stage::Error),

#[error(transparent)]
Provider(#[from] ProviderError),
}

/// Manages the execution of stages.
#[derive(Debug)]
pub struct PipelineHandle {
tx: watch::Sender<Option<BlockNumber>>,
}

impl PipelineHandle {
pub fn set_tip(&self, tip: BlockNumber) {
self.tx.send(Some(tip)).expect("channel closed");
}

Check warning on line 41 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L39-L41

Added lines #L39 - L41 were not covered by tests
}
kariy marked this conversation as resolved.
Show resolved Hide resolved

/// Syncing pipeline.
///
/// The pipeline drives the execution of stages, running each stage to completion in the order they
/// were added.
///
/// Inspired by [`reth`]'s staged sync pipeline.
///
/// [`reth`]: https://github.com/paradigmxyz/reth/blob/c7aebff0b6bc19cd0b73e295497d3c5150d40ed8/crates/stages/api/src/pipeline/mod.rs#L66
pub struct Pipeline {
pub struct Pipeline<P> {
chunk_size: u64,
provider: P,
stages: Vec<Box<dyn Stage>>,
tip: watch::Receiver<Option<BlockNumber>>,
}

impl Pipeline {
impl<P> Pipeline<P> {
/// Create a new empty pipeline.
pub fn new() -> Self {
Self { stages: Vec::new() }
pub fn new(provider: P, chunk_size: u64) -> (Self, PipelineHandle) {
let (tx, rx) = watch::channel(None);
let handle = PipelineHandle { tx };
let pipeline = Self { stages: Vec::new(), tip: rx, provider, chunk_size };
(pipeline, handle)
}

/// Insert a new stage into the pipeline.
pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
self.stages.push(stage);
pub fn add_stage<S: Stage + 'static>(&mut self, stage: S) {
self.stages.push(Box::new(stage));
}

/// Insert multiple stages into the pipeline.
///
/// The stages will be executed in the order they are appear in the iterator.
pub fn add_stages(&mut self, stages: impl Iterator<Item = Box<dyn Stage>>) {
self.stages.extend(stages);

Check warning on line 77 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L76-L77

Added lines #L76 - L77 were not covered by tests
}
}

/// Start the pipeline.
impl<P: StageCheckpointProvider> Pipeline<P> {
/// Run the pipeline in a loop.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(target: "pipeline", id = %stage.id(), "Executing stage.");
stage.execute().await?;
let mut current_chunk_tip = self.chunk_size;

Check warning on line 84 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L84

Added line #L84 was not covered by tests

loop {
let tip = *self.tip.borrow_and_update();

Check warning on line 87 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L86-L87

Added lines #L86 - L87 were not covered by tests

loop {
if let Some(tip) = tip {
let to = current_chunk_tip.min(tip);
self.run_once_until(to).await?;

Check warning on line 92 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L90-L92

Added lines #L90 - L92 were not covered by tests

if to >= tip {
info!(target: "pipeline", %tip, "Finished processing until tip.");
break;
} else {
current_chunk_tip = (current_chunk_tip + self.chunk_size).min(tip);
}
}

Check warning on line 100 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L94-L100

Added lines #L94 - L100 were not covered by tests
}

// If we reach here, that means we have run the pipeline up until the `tip`.
// So, wait until the tip has changed.
if self.tip.changed().await.is_err() {
break;
}

Check warning on line 107 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L105-L107

Added lines #L105 - L107 were not covered by tests
}

info!(target: "pipeline", "Pipeline finished.");

Ok(())
}

Check warning on line 113 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L112-L113

Added lines #L112 - L113 were not covered by tests

/// Run the pipeline once, until the given block number.
async fn run_once_until(&mut self, to: BlockNumber) -> PipelineResult {
for stage in &mut self.stages {
let id = stage.id();

// Get the checkpoint for the stage, otherwise default to block number 0
let checkpoint = self.provider.checkpoint(id)?.unwrap_or_default();

// Skip the stage if the checkpoint is greater than or equal to the target block number
if checkpoint >= to {
info!(target: "pipeline", %id, "Skipping stage.");
continue;
}

info!(target: "pipeline", %id, from = %checkpoint, %to, "Executing stage.");

// plus 1 because the checkpoint is inclusive
let input = StageExecutionInput { from: checkpoint + 1, to };
stage.execute(&input).await?;
self.provider.set_checkpoint(id, to)?;

info!(target: "pipeline", %id, from = %checkpoint, %to, "Stage execution completed.");
}
Ok(())
}
}

impl IntoFuture for Pipeline {
impl<P> IntoFuture for Pipeline<P>
where
P: StageCheckpointProvider + 'static,
{
type Output = PipelineResult;
type IntoFuture = PipelineFut;

Expand All @@ -67,16 +155,68 @@
}
}

impl core::default::Default for Pipeline {
fn default() -> Self {
Self::new()
}
}

impl core::fmt::Debug for Pipeline {
impl<P> core::fmt::Debug for Pipeline<P>
where
P: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("tip", &self.tip)
.field("provider", &self.provider)
.field("chunk_size", &self.chunk_size)

Check warning on line 166 in crates/katana/pipeline/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pipeline/src/lib.rs#L164-L166

Added lines #L164 - L166 were not covered by tests
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())
.finish()
}
}

#[cfg(test)]
mod tests {
use katana_provider::test_utils::test_provider;
use katana_provider::traits::stage::StageCheckpointProvider;

use super::{Pipeline, Stage, StageExecutionInput};
use crate::stage::StageResult;

struct MockStage;

#[async_trait::async_trait]
impl Stage for MockStage {
fn id(&self) -> &'static str {
"Mock"
}

async fn execute(&mut self, _: &StageExecutionInput) -> StageResult {
Ok(())
}
}

#[tokio::test]
async fn stage_checkpoint() {
let provider = test_provider();

let (mut pipeline, _handle) = Pipeline::new(&provider, 10);
pipeline.add_stage(MockStage);

// check that the checkpoint was set
let initial_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(initial_checkpoint, None);

pipeline.run_once_until(5).await.expect("failed to run the pipeline once");

// check that the checkpoint was set
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(5));

pipeline.run_once_until(10).await.expect("failed to run the pipeline once");

// check that the checkpoint was set
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(10));

pipeline.run_once_until(10).await.expect("failed to run the pipeline once");

// check that the checkpoint doesn't change
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
assert_eq!(actual_checkpoint, Some(10));
}
}
26 changes: 15 additions & 11 deletions crates/katana/pipeline/src/stage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
use katana_primitives::block::BlockNumber;
use katana_provider::error::ProviderError;

mod sequencing;

pub use sequencing::Sequencing;

/// The result type of a stage execution. See [Stage::execute].
pub type StageResult = Result<(), Error>;

#[derive(Debug, Clone, Copy)]
pub enum StageId {
Sequencing,
#[derive(Debug, Default, Clone)]
pub struct StageExecutionInput {
pub from: BlockNumber,
pub to: BlockNumber,
}

impl core::fmt::Display for StageId {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StageId::Sequencing => write!(f, "Sequencing"),
}
}
#[derive(Debug, Default)]
pub struct StageExecutionOutput {
pub last_block_processed: BlockNumber,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Provider(#[from] ProviderError),

#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[async_trait::async_trait]
pub trait Stage: Send + Sync {
/// Returns the id which uniquely identifies the stage.
fn id(&self) -> StageId;
fn id(&self) -> &'static str;

/// Executes the stage.
async fn execute(&mut self) -> StageResult;
async fn execute(&mut self, input: &StageExecutionInput) -> StageResult;
}
Loading
Loading