-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(katana): pipeline execution loop with checkpointing #2741
Conversation
WalkthroughOhayo, sensei! This pull request introduces several modifications across various files in the Katana project. Key changes include updates to the Changes
Possibly related PRs
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (9)
crates/katana/storage/db/src/models/stage.rs (1)
5-5
: Consider using a newtype forStageId
for improved type safety, sensei.Defining
StageId
as a type alias toString
may lead to type confusion elsewhere in the codebase. Wrapping it in a newtype struct can provide better type safety and clarity.Consider modifying it as:
pub struct StageId(pub String);This way,
StageId
becomes a distinct type, preventing accidental misuse of plain strings where aStageId
is expected.crates/katana/pipeline/src/stage/mod.rs (3)
11-14
: IntroduceStageExecutionInput
with clear documentationThe new struct
StageExecutionInput
enhances the clarity of stage execution parameters. Consider adding documentation comments to explain the purpose offrom
andto
fields for better code maintainability.Apply this diff to add documentation:
#[derive(Debug, Default, Clone)] +/// Input parameters for stage execution, defining the block range. pub struct StageExecutionInput { + /// Starting block number. pub from: BlockNumber, + /// Ending block number. pub to: BlockNumber, }
17-19
: Add documentation forStageExecutionOutput
Providing documentation for
StageExecutionOutput
will help other developers understand whatlast_block_processed
represents.Apply this diff to include documentation:
#[derive(Debug, Default)] +/// Output result after stage execution, indicating the last processed block. pub struct StageExecutionOutput { + /// The last block number that was processed. pub last_block_processed: BlockNumber, }
34-34
: Consider using a strongly typed identifier for stage IDsChanging the
id
method to return&'static str
instead of aStageId
enum reduces type safety. Using a strongly typed identifier, such as an enum or newtype, helps prevent typos and ensures compile-time checks for stage identifiers.crates/katana/storage/db/src/codecs/mod.rs (1)
75-80
: Ohayo, sensei! Consider optimizingEncode
implementation forString
.Currently,
encode
consumes theString
, which might lead to unnecessary allocations. ImplementingEncode
for&str
could improve performance by avoiding ownership transfer when possible.Here's how you might adjust the implementation:
impl Encode for &str { type Encoded = Vec<u8>; fn encode(self) -> Self::Encoded { self.as_bytes().to_vec() } }crates/katana/pipeline/src/lib.rs (1)
61-65
: Consider using a builder pattern forPipeline::new
.Returning a tuple
(Self, PipelineHandle)
might be less intuitive for users. A builder pattern or a dedicated struct could improve clarity and usability.crates/katana/storage/provider/src/lib.rs (1)
56-58
: Ohayo sensei! Consider derivingClone
instead of manually implementing itSince all fields in
BlockchainProvider<Db>
implementClone
, you can deriveClone
directly to simplify the code.crates/katana/storage/db/src/tables.rs (1)
51-51
: Ohayo sensei! EnsureNUM_TABLES
constant remains consistentUpdating
NUM_TABLES
to 27 reflects the addition ofStageCheckpoints
. Please ensure that all related tests and usages ofNUM_TABLES
are updated accordingly to prevent inconsistencies.Also applies to: 152-152
crates/katana/cli/src/args.rs (1)
137-139
: Adjust Logging Levels for Consistent VerbosityOhayo, sensei! The updated
DEFAULT_LOG_FILTER
includespipeline=debug
andstage=debug
, which increases the logging verbosity for these components. Please verify if this level of detail is intended for production environments, as excessive debug logs may affect performance and clutter log files.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (16)
crates/katana/cli/src/args.rs
(1 hunks)crates/katana/core/src/backend/storage.rs
(3 hunks)crates/katana/node/Cargo.toml
(1 hunks)crates/katana/pipeline/Cargo.toml
(1 hunks)crates/katana/pipeline/src/lib.rs
(3 hunks)crates/katana/pipeline/src/stage/mod.rs
(1 hunks)crates/katana/storage/db/src/codecs/mod.rs
(1 hunks)crates/katana/storage/db/src/codecs/postcard.rs
(2 hunks)crates/katana/storage/db/src/models/mod.rs
(1 hunks)crates/katana/storage/db/src/models/stage.rs
(1 hunks)crates/katana/storage/db/src/tables.rs
(4 hunks)crates/katana/storage/provider/src/lib.rs
(4 hunks)crates/katana/storage/provider/src/providers/db/mod.rs
(4 hunks)crates/katana/storage/provider/src/providers/fork/mod.rs
(2 hunks)crates/katana/storage/provider/src/traits/mod.rs
(1 hunks)crates/katana/storage/provider/src/traits/stage.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/katana/storage/db/src/models/mod.rs
🔇 Additional comments (25)
crates/katana/storage/provider/src/traits/mod.rs (1)
8-8
: Ohayo, sensei! Great addition of the stage
module.
The introduction of pub mod stage;
enhances modularity and prepares the codebase for stage checkpoint management. This aligns well with the project's architecture.
crates/katana/storage/provider/src/traits/stage.rs (1)
5-11
: Implementing the StageCheckpointProvider
trait looks solid, sensei!
The methods checkpoint
and set_checkpoint
are well-defined and provide essential functionality for managing stage checkpoints in the pipeline. This will enhance the extensibility and reliability of the system.
crates/katana/storage/db/src/models/stage.rs (1)
8-13
: The StageCheckpoint
struct is well-defined and ready to roll, sensei!
Deriving traits like Serialize
, Deserialize
, Debug
, and others ensures that the checkpoint data can be easily managed, serialized, and compared. This is essential for effective checkpoint handling within the pipeline stages.
crates/katana/pipeline/Cargo.toml (1)
12-13
: Ohayo, sensei! Confirm inclusion of the "test-utils"
feature in katana-provider
Including the "test-utils"
feature in the katana-provider
dependency may introduce testing utilities into the production build, which could lead to unintended side effects or increased binary size. Please verify if this feature is necessary for the production environment.
crates/katana/pipeline/src/stage/mod.rs (3)
1-2
: Ohayo, sensei! Validate the necessity of new imports
The imports katana_primitives::block::BlockNumber
and katana_provider::error::ProviderError
have been added. Ensure these imports are required and used appropriately within the code to avoid unused import warnings.
24-25
: Ohayo, sensei! Properly handle provider errors in Error
enum
Adding the Provider
variant to the Error
enum improves error granularity and allows for more precise error handling related to provider operations.
37-37
: Ensure all implementations of execute
method are updated
The method signature for execute
now includes input: &StageExecutionInput
. Please verify that all implementations of the Stage
trait have been updated accordingly to match this new signature.
Run the following script to identify implementations that need updating:
✅ Verification successful
Let me verify this further with a more comprehensive search to ensure we catch all implementations.
All Stage
trait implementations are correctly updated, sensei!
The search results show that there is only one implementation of the Stage
trait's execute
method in crates/katana/pipeline/src/lib.rs
, and it correctly uses the &StageExecutionInput
parameter. All other execute
functions found in the codebase are unrelated methods belonging to different types and contexts.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all implementations of the `Stage` trait and check for methods named `execute`.
# Test: Search for `impl Stage` blocks and display the `execute` method signatures.
# Expect: All `execute` methods should accept `&StageExecutionInput` as a parameter.
rg -U -A 5 'impl\s+Stage\s+for' --glob '**/*.rs' | rg 'async\s+fn\s+execute\(' -A 1
Length of output: 197
Script:
#!/bin/bash
# Search for Stage trait implementations using ast-grep for more accurate results
ast-grep --pattern 'impl Stage for $_ {
$$$
async fn execute($$$) $$$
$$$
}'
# Also search for any execute methods that might not be caught
rg -U "async fn execute" --glob '**/*.rs' -A 2
Length of output: 2013
crates/katana/node/Cargo.toml (1)
30-30
: Ohayo, sensei! Verify the addition of const_format
dependency
The const_format
crate version "0.2.33"
has been added. Ensure that this crate is compatible with the current Rust edition and does not introduce any known issues or conflicts with other dependencies.
crates/katana/storage/db/src/codecs/postcard.rs (2)
14-14
: Ohayo, sensei! Good addition of StageCheckpoint
import.
Including StageCheckpoint
ensures that the compression and decompression implementations are available for this type.
46-46
: Including StageCheckpoint
in the macro is appropriate.
By adding StageCheckpoint
to impl_compress_and_decompress_for_table_values!
, you enable seamless serialization and deserialization using postcard
.
crates/katana/storage/db/src/codecs/mod.rs (1)
82-86
: Decoding String
handles errors correctly.
The decode
method properly handles invalid UTF-8 sequences by returning a CodecError
, ensuring robust error handling during deserialization.
crates/katana/pipeline/src/lib.rs (7)
8-12
: Ohayo, sensei! Necessary imports enhance functionality.
Adding imports for BlockNumber
, ProviderError
, StageCheckpointProvider
, and tokio::sync::watch
is appropriate for implementing checkpointing and synchronization features.
23-24
: Properly handling missing stages with StageNotFound
error variant.
Introducing StageNotFound
improves error reporting when a requested stage is not found in the pipeline.
29-30
: Including ProviderError
in Error
enum for comprehensive error handling.
This change allows ProviderError
s to be propagated, enhancing the pipeline's error management.
52-56
: Ohayo, sensei! Updating Pipeline
struct enhances capabilities.
Introducing the generic provider P
and adding chunk_size
and tip
fields improve the pipeline's flexibility and control over execution.
69-71
: Generic add_stage
method enhances flexibility.
Allowing stages that implement Stage + 'static
increases the pipeline's extensibility.
73-77
: Adding add_stages
method improves convenience.
This method allows adding multiple stages efficiently, enhancing the developer experience.
81-113
: Review the loop logic in run
method to prevent potential infinite loops.
The current implementation relies on the tip
changing to exit the outer loop. If self.tip.changed().await
consistently errors or the tip doesn't change, the loop may run indefinitely.
Consider implementing additional conditions or timeouts to ensure the loop can exit gracefully in all scenarios.
crates/katana/storage/provider/src/lib.rs (1)
421-432
: Well done, sensei! The implementation of StageCheckpointProvider
looks solid
The methods correctly delegate to self.provider
, ensuring the checkpoint functionality is properly extended.
crates/katana/core/src/backend/storage.rs (1)
82-82
: Well done, sensei! Deriving Clone
for Blockchain
Adding Clone
to the derive list for Blockchain
simplifies cloning and aligns with best practices.
crates/katana/storage/provider/src/providers/fork/mod.rs (1)
35-35
: Importing StageCheckpointProvider
Trait
Ohayo, sensei! The inclusion of the StageCheckpointProvider
trait is appropriate here to implement checkpoint functionality for the ForkedProvider
.
crates/katana/storage/provider/src/providers/db/mod.rs (4)
17-17
: Importing StageCheckpoint
Model
Ohayo, sensei! The import of StageCheckpoint
from katana_db::models::stage
is necessary for managing stage checkpoints within the DbProvider
.
42-42
: Including StageCheckpointProvider
Trait
Ohayo, sensei! Adding the StageCheckpointProvider
trait enhances the functionality of DbProvider
by enabling checkpoint management.
779-798
: Implementing Stage Checkpoint Functionality
Ohayo, sensei! The implementation of StageCheckpointProvider
for DbProvider
correctly adds checkpoint management. Make sure that error handling and database transactions are properly managed to prevent data inconsistency.
53-53
: Deriving Clone
for DbProvider
Ohayo, sensei! Deriving Clone
for DbProvider
allows instances to be cloned. Ensure that cloning the database provider doesn't lead to unintended consequences, such as duplicate connections or conflicts.
Please confirm that cloning is safe in this context. Run the following script to search for potential issues with cloning DbProvider
:
✅ Verification successful
Let me gather more information about the DbProvider
implementation to understand its cloning behavior better.
Let me check one more thing about the database implementation to understand how connections are managed.
Deriving Clone
for DbProvider
is safe
Ohayo! After reviewing the implementation, deriving Clone
for DbProvider
is completely safe because:
- The underlying
DbEnv
usesArc
(Atomic Reference Counting) for its inner state:
pub struct DbEnv {
inner: Arc<DbEnvInner>
}
-
All database operations use read-only transactions (
tx()
) or explicit mutable transactions (tx_mut()
), which are properly managed and committed. -
The database connections are thread-safe and designed for concurrent access through the Arc wrapper.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for usage of cloned `DbProvider` that might cause conflicts.
# Test: Find all instances where `DbProvider` is cloned.
# Expect: Cloned instances are safely used without causing database conflicts.
rg -A 5 'let\s+\w+\s*=\s*\w+\.clone\(\)' crates/katana/storage/
Length of output: 6273
Script:
#!/bin/bash
# Search for DbProvider implementation and its fields
ast-grep --pattern 'struct DbProvider {
$$$
}'
# Search for any transaction or connection handling in DbProvider
rg -A 10 "impl DbProvider" crates/katana/storage/provider/src/providers/db/
Length of output: 1175
Script:
#!/bin/bash
# Check the database implementation details
rg -A 10 "struct DbEnv" crates/katana/storage/
rg -A 10 "impl.*Database" crates/katana/storage/
Length of output: 30076
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
crates/katana/storage/db/src/tables.rs (1)
177-184
: Ohayo! Consider enhancing the documentation, sensei!While the table definition is correct, the documentation could be more descriptive. Consider adding:
- Purpose of stage checkpoints
- Relationship between StageId and StageCheckpoint
- Lifecycle of checkpoint data
Consider updating the documentation like this:
- /// Pipeline stages checkpoint + /// Stores checkpoint information for pipeline stages. + /// Each stage (identified by StageId) maintains a single checkpoint + /// that tracks its execution progress and recovery state. StageCheckpoints: (StageId) => StageCheckpoint,
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
crates/katana/storage/db/src/tables.rs
(5 hunks)
🔇 Additional comments (3)
crates/katana/storage/db/src/tables.rs (3)
12-12
: Ohayo! Import looks good, sensei!
The new import follows the existing pattern and is correctly grouped with other model imports.
283-284
: Ohayo! Test coverage looks solid, sensei!
The test cases are properly updated to include the new StageCheckpoints table, verifying both its name and type consistently with other tables.
Also applies to: 311-312
51-51
: Ohayo! Let's verify the table count, sensei!
The increment from 26 to 27 matches the addition of StageCheckpoints table. Let's verify all table definitions are accounted for.
✅ Verification successful
Ohayo! The table count is perfectly balanced, sensei!
The verification shows exactly 27 table definitions in the codebase, which perfectly matches the updated NUM_TABLES
constant value. The increment from 26 to 27 is accurate and consistent with the codebase state.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that NUM_TABLES matches the actual number of table definitions
# Count the number of table entries in define_tables_enum! macro
echo "Checking table definitions..."
rg -U "\(\w+,\s*TableType::\w+\)" "crates/katana/storage/db/src/tables.rs" | wc -l
Length of output: 154
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2741 +/- ##
==========================================
+ Coverage 56.31% 56.35% +0.04%
==========================================
Files 423 425 +2
Lines 54159 54271 +112
==========================================
+ Hits 30499 30585 +86
- Misses 23660 23686 +26 ☔ View full report in Codecov by Sentry. |
the whole pipeline will run until a given block number (tip), it will execute a series of stages until the tip is reached. the stages are executed in specific block range (chunks).