Skip to content

Commit

Permalink
feat(katana): fetch forked block data (#2592)
Browse files Browse the repository at this point in the history
Enable fetching block data from the forked chain. current forking feature only covers up to state data only. meaning doing any operations that requires only doing contract executions - estimate fee, tx execution - are allowed.

This is a pretty simple solution as it just forwards the request to the forked network provider but doesn't do any caching of the requested data.
  • Loading branch information
kariy authored Oct 29, 2024
1 parent 1914a4a commit 6f845a7
Show file tree
Hide file tree
Showing 17 changed files with 1,396 additions and 419 deletions.
11 changes: 8 additions & 3 deletions crates/katana/core/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use katana_db::mdbx::DbEnv;
use katana_primitives::block::{
BlockHashOrNumber, BlockIdOrTag, FinalityStatus, SealedBlockWithStatus,
BlockHashOrNumber, BlockIdOrTag, BlockNumber, FinalityStatus, SealedBlockWithStatus,
};
use katana_primitives::chain_spec::ChainSpec;
use katana_primitives::da::L1DataAvailabilityMode;
Expand Down Expand Up @@ -119,7 +119,7 @@ impl Blockchain {
fork_url: Url,
fork_block: Option<BlockHashOrNumber>,
chain: &mut ChainSpec,
) -> Result<Self> {
) -> Result<(Self, BlockNumber)> {
let provider = JsonRpcClient::new(HttpTransport::new(fork_url));
let chain_id = provider.chain_id().await.context("failed to fetch forked network id")?;

Expand Down Expand Up @@ -149,6 +149,8 @@ impl Blockchain {
bail!("forking a pending block is not allowed")
};

let block_num = forked_block.block_number;

chain.id = chain_id.into();
chain.version = ProtocolVersion::parse(&forked_block.starknet_version)?;

Expand All @@ -172,6 +174,8 @@ impl Blockchain {
_ => bail!("qed; block status shouldn't be pending"),
};

// TODO: convert this to block number instead of BlockHashOrNumber so that it is easier to
// check if the requested block is within the supported range or not.
let database = ForkedProvider::new(Arc::new(provider), block_id)?;

// update the genesis block with the forked block's data
Expand All @@ -193,7 +197,8 @@ impl Blockchain {
let block = block.seal_with_hash_and_status(forked_block.block_hash, status);
let state_updates = chain.state_updates();

Self::new_with_genesis_block_and_state(database, block, state_updates)
let blockchain = Self::new_with_genesis_block_and_state(database, block, state_updates)?;
Ok((blockchain, block_num))
}

pub fn provider(&self) -> &BlockchainProvider<Box<dyn Database>> {
Expand Down
88 changes: 53 additions & 35 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use katana_primitives::env::{CfgEnv, FeeTokenAddressses};
use katana_rpc::dev::DevApi;
use katana_rpc::metrics::RpcServerMetrics;
use katana_rpc::saya::SayaApi;
use katana_rpc::starknet::forking::ForkedClient;
use katana_rpc::starknet::StarknetApi;
use katana_rpc::torii::ToriiApi;
use katana_rpc_api::dev::DevApiServer;
Expand Down Expand Up @@ -87,13 +88,14 @@ pub struct Node {
pub metrics_config: Option<MetricsConfig>,
pub sequencing_config: SequencingConfig,
pub messaging_config: Option<MessagingConfig>,
forked_client: Option<ForkedClient>,
}

impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(self) -> Result<LaunchedNode> {
pub async fn launch(mut self) -> Result<LaunchedNode> {
let chain = self.backend.chain_spec.id;
info!(%chain, "Starting node.");

Expand Down Expand Up @@ -139,7 +141,7 @@ impl Node {
.name("Pipeline")
.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let node_components = (pool, backend, block_producer, validator, self.forked_client.take());
let rpc = spawn(node_components, self.rpc_config.clone()).await?;

Ok(LaunchedNode { node: self, rpc })
Expand Down Expand Up @@ -178,15 +180,20 @@ pub async fn build(mut config: Config) -> Result<Node> {

// --- build backend

let (blockchain, db) = if let Some(cfg) = config.forking {
let bc = Blockchain::new_from_forked(cfg.url.clone(), cfg.block, &mut config.chain).await?;
(bc, None)
let (blockchain, db, forked_client) = if let Some(cfg) = &config.forking {
let (bc, block_num) =
Blockchain::new_from_forked(cfg.url.clone(), cfg.block, &mut config.chain).await?;

// TODO: it'd bee nice if the client can be shared on both the rpc and forked backend side
let forked_client = ForkedClient::new_http(cfg.url.clone(), block_num);

(bc, None, Some(forked_client))
} else if let Some(db_path) = &config.db.dir {
let db = katana_db::init_db(db_path)?;
(Blockchain::new_with_db(db.clone(), &config.chain)?, Some(db))
(Blockchain::new_with_db(db.clone(), &config.chain)?, Some(db), None)
} else {
let db = katana_db::init_ephemeral_db()?;
(Blockchain::new_with_db(db.clone(), &config.chain)?, Some(db))
(Blockchain::new_with_db(db.clone(), &config.chain)?, Some(db), None)
};

let block_context_generator = BlockContextGenerator::default().into();
Expand Down Expand Up @@ -218,6 +225,7 @@ pub async fn build(mut config: Config) -> Result<Node> {
db,
pool,
backend,
forked_client,
block_producer,
rpc_config: config.rpc,
metrics_config: config.metrics,
Expand All @@ -231,40 +239,50 @@ pub async fn build(mut config: Config) -> Result<Node> {

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, BlockProducer<EF>, TxValidator),
node_components: (
TxPool,
Arc<Backend<EF>>,
BlockProducer<EF>,
TxValidator,
Option<ForkedClient>,
),
config: RpcConfig,
) -> Result<RpcServer> {
let (pool, backend, block_producer, validator) = node_components;
let (pool, backend, block_producer, validator, forked_client) = node_components;

let mut methods = RpcModule::new(());
methods.register_method("health", |_, _| Ok(serde_json::json!({ "health": true })))?;

for api in &config.apis {
match api {
ApiKind::Starknet => {
// TODO: merge these into a single logic.
let server = StarknetApi::new(
backend.clone(),
pool.clone(),
block_producer.clone(),
validator.clone(),
);
methods.merge(StarknetApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetWriteApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetTraceApiServer::into_rpc(server))?;
}
ApiKind::Dev => {
methods.merge(DevApi::new(backend.clone(), block_producer.clone()).into_rpc())?;
}
ApiKind::Torii => {
methods.merge(
ToriiApi::new(backend.clone(), pool.clone(), block_producer.clone()).into_rpc(),
)?;
}
ApiKind::Saya => {
methods.merge(SayaApi::new(backend.clone(), block_producer.clone()).into_rpc())?;
}
}
if config.apis.contains(&ApiKind::Starknet) {
let server = if let Some(client) = forked_client {
StarknetApi::new_forked(
backend.clone(),
pool.clone(),
block_producer.clone(),
validator,
client,
)
} else {
StarknetApi::new(backend.clone(), pool.clone(), block_producer.clone(), validator)
};

methods.merge(StarknetApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetWriteApiServer::into_rpc(server.clone()))?;
methods.merge(StarknetTraceApiServer::into_rpc(server))?;
}

if config.apis.contains(&ApiKind::Dev) {
methods.merge(DevApi::new(backend.clone(), block_producer.clone()).into_rpc())?;
}

if config.apis.contains(&ApiKind::Torii) {
methods.merge(
ToriiApi::new(backend.clone(), pool.clone(), block_producer.clone()).into_rpc(),
)?;
}

if config.apis.contains(&ApiKind::Saya) {
methods.merge(SayaApi::new(backend.clone(), block_producer.clone()).into_rpc())?;
}

let cors = CorsLayer::new()
Expand Down
4 changes: 2 additions & 2 deletions crates/katana/rpc/rpc-api/src/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use katana_rpc_types::block::{
use katana_rpc_types::event::{EventFilterWithPage, EventsPage};
use katana_rpc_types::message::MsgFromL1;
use katana_rpc_types::receipt::TxReceiptWithBlockInfo;
use katana_rpc_types::state_update::StateUpdate;
use katana_rpc_types::state_update::MaybePendingStateUpdate;
use katana_rpc_types::transaction::{
BroadcastedDeclareTx, BroadcastedDeployAccountTx, BroadcastedInvokeTx, BroadcastedTx,
DeclareTxResult, DeployAccountTxResult, InvokeTxResult, Tx,
Expand Down Expand Up @@ -61,7 +61,7 @@ pub trait StarknetApi {

/// Get the information about the result of executing the requested block.
#[method(name = "getStateUpdate")]
async fn get_state_update(&self, block_id: BlockIdOrTag) -> RpcResult<StateUpdate>;
async fn get_state_update(&self, block_id: BlockIdOrTag) -> RpcResult<MaybePendingStateUpdate>;

/// Get the value of the storage at the given address and key
#[method(name = "getStorageAt")]
Expand Down
6 changes: 4 additions & 2 deletions crates/katana/rpc/rpc-types-builder/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ where
}

pub fn build_with_receipts(self) -> ProviderResult<Option<BlockWithReceipts>> {
let Some(block) = BlockProvider::block(&self.provider, self.block_id)? else {
let Some(hash) = BlockHashProvider::block_hash_by_id(&self.provider, self.block_id)? else {
return Ok(None);
};

let block = BlockProvider::block(&self.provider, self.block_id)?
.expect("should exist if block exists");
let finality_status = BlockStatusProvider::block_status(&self.provider, self.block_id)?
.expect("should exist if block exists");
let receipts = ReceiptProvider::receipts_by_block(&self.provider, self.block_id)?
.expect("should exist if block exists");

let receipts_with_txs = block.body.into_iter().zip(receipts);

Ok(Some(BlockWithReceipts::new(block.header, finality_status, receipts_with_txs)))
Ok(Some(BlockWithReceipts::new(hash, block.header, finality_status, receipts_with_txs)))
}
}
42 changes: 41 additions & 1 deletion crates/katana/rpc/rpc-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ pub enum MaybePendingBlockWithTxs {
Block(BlockWithTxs),
}

impl From<starknet::core::types::MaybePendingBlockWithTxs> for MaybePendingBlockWithTxs {
fn from(value: starknet::core::types::MaybePendingBlockWithTxs) -> Self {
match value {
starknet::core::types::MaybePendingBlockWithTxs::PendingBlock(block) => {
MaybePendingBlockWithTxs::Pending(PendingBlockWithTxs(block))
}
starknet::core::types::MaybePendingBlockWithTxs::Block(block) => {
MaybePendingBlockWithTxs::Block(BlockWithTxs(block))
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct BlockWithTxHashes(starknet::core::types::BlockWithTxHashes);
Expand Down Expand Up @@ -181,6 +194,19 @@ pub enum MaybePendingBlockWithTxHashes {
Block(BlockWithTxHashes),
}

impl From<starknet::core::types::MaybePendingBlockWithTxHashes> for MaybePendingBlockWithTxHashes {
fn from(value: starknet::core::types::MaybePendingBlockWithTxHashes) -> Self {
match value {
starknet::core::types::MaybePendingBlockWithTxHashes::PendingBlock(block) => {
MaybePendingBlockWithTxHashes::Pending(PendingBlockWithTxHashes(block))
}
starknet::core::types::MaybePendingBlockWithTxHashes::Block(block) => {
MaybePendingBlockWithTxHashes::Block(BlockWithTxHashes(block))
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct BlockHashAndNumber(starknet::core::types::BlockHashAndNumber);
Expand All @@ -203,6 +229,7 @@ pub struct BlockWithReceipts(starknet::core::types::BlockWithReceipts);

impl BlockWithReceipts {
pub fn new(
hash: BlockHash,
header: Header,
finality_status: FinalityStatus,
receipts: impl Iterator<Item = (TxWithHash, Receipt)>,
Expand Down Expand Up @@ -230,7 +257,7 @@ impl BlockWithReceipts {
FinalityStatus::AcceptedOnL1 => BlockStatus::AcceptedOnL1,
FinalityStatus::AcceptedOnL2 => BlockStatus::AcceptedOnL2,
},
block_hash: header.parent_hash,
block_hash: hash,
parent_hash: header.parent_hash,
block_number: header.number,
new_root: header.state_root,
Expand Down Expand Up @@ -297,3 +324,16 @@ pub enum MaybePendingBlockWithReceipts {
Pending(PendingBlockWithReceipts),
Block(BlockWithReceipts),
}

impl From<starknet::core::types::MaybePendingBlockWithReceipts> for MaybePendingBlockWithReceipts {
fn from(value: starknet::core::types::MaybePendingBlockWithReceipts) -> Self {
match value {
starknet::core::types::MaybePendingBlockWithReceipts::PendingBlock(block) => {
MaybePendingBlockWithReceipts::Pending(PendingBlockWithReceipts(block))
}
starknet::core::types::MaybePendingBlockWithReceipts::Block(block) => {
MaybePendingBlockWithReceipts::Block(BlockWithReceipts(block))
}
}
}
}
67 changes: 66 additions & 1 deletion crates/katana/rpc/rpc-types/src/error/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use katana_primitives::event::ContinuationTokenError;
use katana_provider::error::ProviderError;
use serde::Serialize;
use serde_json::Value;
use starknet::core::types::StarknetError as StarknetRsError;
use starknet::providers::ProviderError as StarknetRsProviderError;

/// Possible list of errors that can be returned by the Starknet API according to the spec: <https://github.com/starkware-libs/starknet-specs>.
#[derive(Debug, thiserror::Error, Clone, Serialize)]
Expand Down Expand Up @@ -40,7 +42,7 @@ pub enum StarknetApiError {
#[error("Transaction execution error")]
TransactionExecutionError {
/// The index of the first transaction failing in a sequence of given transactions.
transaction_index: usize,
transaction_index: u64,
/// The revert error with the execution trace up to the point of failure.
execution_error: String,
},
Expand Down Expand Up @@ -195,6 +197,69 @@ impl From<Box<InvalidTransactionError>> for StarknetApiError {
}
}

// ---- Forking client error conversion

impl From<StarknetRsError> for StarknetApiError {
fn from(value: StarknetRsError) -> Self {
match value {
StarknetRsError::FailedToReceiveTransaction => Self::FailedToReceiveTxn,
StarknetRsError::NoBlocks => Self::NoBlocks,
StarknetRsError::NonAccount => Self::NonAccount,
StarknetRsError::BlockNotFound => Self::BlockNotFound,
StarknetRsError::PageSizeTooBig => Self::PageSizeTooBig,
StarknetRsError::DuplicateTx => Self::DuplicateTransaction,
StarknetRsError::ContractNotFound => Self::ContractNotFound,
StarknetRsError::CompilationFailed => Self::CompilationFailed,
StarknetRsError::ClassHashNotFound => Self::ClassHashNotFound,
StarknetRsError::InsufficientMaxFee => Self::InsufficientMaxFee,
StarknetRsError::TooManyKeysInFilter => Self::TooManyKeysInFilter,
StarknetRsError::InvalidTransactionIndex => Self::InvalidTxnIndex,
StarknetRsError::TransactionHashNotFound => Self::TxnHashNotFound,
StarknetRsError::ClassAlreadyDeclared => Self::ClassAlreadyDeclared,
StarknetRsError::UnexpectedError(reason) => Self::UnexpectedError { reason },
StarknetRsError::InvalidContinuationToken => Self::InvalidContinuationToken,
StarknetRsError::UnsupportedTxVersion => Self::UnsupportedTransactionVersion,
StarknetRsError::CompiledClassHashMismatch => Self::CompiledClassHashMismatch,
StarknetRsError::InsufficientAccountBalance => Self::InsufficientAccountBalance,
StarknetRsError::ValidationFailure(reason) => Self::ValidationFailure { reason },
StarknetRsError::ContractClassSizeIsTooLarge => Self::ContractClassSizeIsTooLarge,
StarknetRsError::ContractError(data) => {
Self::ContractError { revert_error: data.revert_error }
}
StarknetRsError::TransactionExecutionError(data) => Self::TransactionExecutionError {
execution_error: data.execution_error,
transaction_index: data.transaction_index,
},
StarknetRsError::InvalidTransactionNonce => {
Self::InvalidTransactionNonce { reason: "".to_string() }
}
StarknetRsError::UnsupportedContractClassVersion => {
Self::UnsupportedContractClassVersion
}
StarknetRsError::NoTraceAvailable(_) => {
Self::UnexpectedError { reason: "No trace available".to_string() }
}
}
}
}

impl From<StarknetRsProviderError> for StarknetApiError {
fn from(value: StarknetRsProviderError) -> Self {
match value {
StarknetRsProviderError::StarknetError(error) => error.into(),
StarknetRsProviderError::Other(error) => {
Self::UnexpectedError { reason: error.to_string() }
}
StarknetRsProviderError::ArrayLengthMismatch { .. } => Self::UnexpectedError {
reason: "Forking client: Array length mismatch".to_string(),
},
StarknetRsProviderError::RateLimited { .. } => {
Self::UnexpectedError { reason: "Forking client: Rate limited".to_string() }
}
}
}
}

#[cfg(test)]
mod tests {
use rstest::rstest;
Expand Down
Loading

0 comments on commit 6f845a7

Please sign in to comment.