Skip to content

Commit

Permalink
feat(katana): implement struct for in memory provider
Browse files Browse the repository at this point in the history
  • Loading branch information
ybensacq committed Sep 19, 2024
1 parent ca9277b commit fe2c75b
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 128 deletions.
6 changes: 3 additions & 3 deletions crates/katana/core/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use katana_provider::providers::db::DbProvider;
use katana_provider::traits::block::{BlockProvider, BlockWriter};
use katana_provider::traits::contract::ContractClassWriter;
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::messaging::MessagingProvider;
use katana_provider::traits::messaging::MessagingCheckpointProvider;
use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter};
use katana_provider::traits::state_update::StateUpdateProvider;
use katana_provider::traits::transaction::{
Expand All @@ -30,7 +30,7 @@ pub trait Database:
+ ContractClassWriter
+ StateFactoryProvider
+ BlockEnvProvider
+ MessagingProvider
+ MessagingCheckpointProvider
+ 'static
+ Send
+ Sync
Expand All @@ -52,7 +52,7 @@ impl<T> Database for T where
+ ContractClassWriter
+ StateFactoryProvider
+ BlockEnvProvider
+ MessagingProvider
+ MessagingCheckpointProvider
+ 'static
+ Send
+ Sync
Expand Down
34 changes: 12 additions & 22 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::backend::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
Expand All @@ -13,22 +14,20 @@ use katana_pool::validation::stateful::TxValidator;
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash, Tx};
use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxHash, TxWithHash};
use katana_primitives::version::CURRENT_STARKNET_VERSION;
use katana_primitives::FieldElement;
use katana_primitives::Felt;
use katana_provider::error::ProviderError;
use katana_provider::traits::messaging::MessagingProvider;
use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider};
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::messaging::MessagingCheckpointProvider;
use katana_provider::traits::state::StateFactoryProvider;
use katana_tasks::{BlockingTaskPool, BlockingTaskResult};
use parking_lot::lock_api::RawMutex;
use parking_lot::{Mutex, RwLock};
use tokio::time::{interval_at, Instant, Interval};
use tracing::{error, info, trace, warn};

use crate::backend::Backend;

pub(crate) const LOG_TARGET: &str = "miner";

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -322,24 +321,15 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
// get stored nonce from message hash
let message_hash_bytes = l1_tx.message_hash;
let message_hash_bytes: [u8; 32] = *message_hash_bytes;
match FieldElement::from_bytes_be(&message_hash_bytes) {
Ok(message_hash) => {
match provider.get_nonce_from_message_hash(message_hash) {
Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce),
Ok(None) => {
Ok(())
},
Err(_e) => {
Ok(())
}
}
},
Err(_e) => {
Ok(())
}

let message_hash = Felt::from_bytes_be(&message_hash_bytes);
match provider.get_nonce_from_message_hash(message_hash) {
Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce),
Ok(None) => Ok(()),
Err(_e) => Ok(()),
}
},
_ => Ok({})
}
_ => Ok(()),
};

match res {
Expand Down
24 changes: 15 additions & 9 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,20 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n"
)
}
};
let send_from_block = match provider.get_send_from_block() {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n"
)
}
};

Expand All @@ -78,8 +82,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
let messenger = match MessengerMode::from_config(config).await {
Ok(m) => Arc::new(m),
Err(_) => {
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n"
)
}
};

Expand All @@ -105,7 +111,6 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
max_block: u64,
chunk_size: u64,
) -> MessengerResult<(u64, usize)> {

match messenger.as_ref() {
MessengerMode::Ethereum(inner) => {
let (block_num, txs) =
Expand All @@ -126,8 +131,9 @@ impl<EF: ExecutorFactory> MessagingService<EF> {

#[cfg(feature = "starknet-messaging")]
MessengerMode::Starknet(inner) => {
let (block_num, txs) =
inner.gather_messages(from_block, max_block, chunk_size, backend.chain_id).await?;
let (block_num, txs) = inner
.gather_messages(from_block, max_block, chunk_size, backend.chain_id)
.await?;
let txs_count = txs.len();

txs.into_iter().for_each(|tx| {
Expand Down
10 changes: 7 additions & 3 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ impl Messenger for StarknetMessaging {
);
block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
let last_processed_nonce = self.provider.get_gather_message_nonce().unwrap_or(0.into());
let last_processed_nonce =
self.provider.get_gather_message_nonce().unwrap_or(0.into());
if tx.nonce > last_processed_nonce {
l1_handler_txs.push(tx)
}
Expand Down Expand Up @@ -239,12 +240,15 @@ impl Messenger for StarknetMessaging {
};
}

self.send_hashes(hashes.clone()).await?;
for (index, hash) in hashes.iter().enumerate() {
let stored_index = self.provider.get_send_from_index();
self.send_hashes(std::slice::from_ref(hash)).await?;
self.provider.set_send_from_index(*hash, index as u64).await?;
self.provider.set_send_from_index((index as u64) + 1).await?;
}

// reset the index
self.provider.set_send_from_index(0).await?;

Ok(hashes)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/primitives/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub type StorageValue = Felt;
pub type Nonce = Felt;

/// Represents the type for a message hash.
pub type MessageHash = FieldElement;
pub type MessageHash = Felt;

/// Represents a contract address.
#[derive(Default, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Deref)]
Expand Down
4 changes: 2 additions & 2 deletions crates/katana/primitives/src/genesis/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub struct GenesisJson {
pub accounts: HashMap<ContractAddress, GenesisAccountJson>,
#[serde(default)]
pub contracts: HashMap<ContractAddress, GenesisContractJson>,
pub gather_from_block: BlockNumber,
pub settlement_block_number: BlockNumber,
}

impl GenesisJson {
Expand Down Expand Up @@ -612,7 +612,7 @@ impl TryFrom<GenesisJson> for Genesis {
gas_prices: value.gas_prices,
state_root: value.state_root,
parent_hash: value.parent_hash,
gather_from_block: value.gather_from_block,
settlement_block_number: value.settlement_block_number,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/katana/primitives/src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub struct Genesis {
/// The genesis contract allocations.
pub allocations: BTreeMap<ContractAddress, GenesisAllocation>,
/// The block on settlement chain from where Katana will start fetching messages.
pub gather_from_block: BlockNumber,
pub settlement_block_number: BlockNumber,
}

impl Genesis {
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Default for Genesis {
allocations: BTreeMap::new(),
fee_token,
universal_deployer: Some(universal_deployer),
gather_from_block: 0,
settlement_block_number: 0,
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions crates/katana/storage/db/src/models/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,43 @@ impl Decompress for ContractStorageEntry {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MessagingCheckpointId {
SendBlock,
SendIndex,
GatherBlock,
GatherNonce,
}


impl Encode for MessagingCheckpointId {
type Encoded = [u8; 1];
fn encode(self) -> Self::Encoded {
let mut buf = [0u8; 1];
buf[0] = match self {
MessagingCheckpointId::SendBlock => 1,
MessagingCheckpointId::SendIndex => 2,
MessagingCheckpointId::GatherBlock => 3,
MessagingCheckpointId::GatherNonce => 4,
};
buf
}
}

impl Decode for MessagingCheckpointId {
fn decode<B: AsRef<[u8]>>(bytes: B) -> Result<Self, CodecError> {
let bytes = bytes.as_ref();
match bytes[0] {
1 => Ok(MessagingCheckpointId::SendBlock),
2 => Ok(MessagingCheckpointId::SendIndex),
3 => Ok(MessagingCheckpointId::GatherBlock),
4 => Ok(MessagingCheckpointId::GatherNonce),
_ => Err(CodecError::Decode("Invalid MessagingCheckpointId".into())),
}
}
}


#[cfg(test)]
mod tests {
use starknet::macros::felt;
Expand Down
29 changes: 15 additions & 14 deletions crates/katana/storage/db/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::codecs::{Compress, Decode, Decompress, Encode};
use crate::models::block::StoredBlockBodyIndices;
use crate::models::contract::{ContractClassChange, ContractInfoChangeList, ContractNonceChange};
use crate::models::list::BlockList;
use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry};
use crate::models::storage::{ContractStorageEntry, ContractStorageKey, MessagingCheckpointId, StorageEntry};

pub trait Key: Encode + Decode + Clone + std::fmt::Debug {}
pub trait Value: Compress + Decompress + std::fmt::Debug {}
Expand Down Expand Up @@ -168,10 +168,10 @@ define_tables_enum! {[
(ClassChangeHistory, TableType::DupSort),
(StorageChangeHistory, TableType::DupSort),
(StorageChangeSet, TableType::Table),
(MessagingInfo, TableType::Table),
(MessagingNonceInfo, TableType::Table),
(MessagingCheckpointBlock, TableType::Table),
(MessagingCheckpointNonce, TableType::Table),
(MessagingMessageNonceMapping, TableType::Table),
(MessagingIndexInfo, TableType::Table),
(MessagingCheckpointIndex, TableType::Table)
]}

tables! {
Expand Down Expand Up @@ -230,16 +230,16 @@ tables! {
StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry,

/// Stores the block number related to messaging service
MessagingInfo: (u64) => BlockNumber,
MessagingCheckpointBlock: (MessagingCheckpointId) => BlockNumber,

/// Stores the nonce related to messaging service
MessagingNonceInfo: (u64) => Nonce,
MessagingCheckpointNonce: (MessagingCheckpointId) => Nonce,

/// Map a message hash to a message nonce
MessagingMessageNonceMapping: (TxHash) => Nonce,

/// Stores the index of the messaging service
MessagingIndexInfo: (u64) => u64,
MessagingCheckpointIndex: (MessagingCheckpointId) => u64

}

Expand Down Expand Up @@ -274,10 +274,10 @@ mod tests {
assert_eq!(Tables::ALL[20].name(), ClassChangeHistory::NAME);
assert_eq!(Tables::ALL[21].name(), StorageChangeHistory::NAME);
assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME);
assert_eq!(Tables::ALL[23].name(), MessagingInfo::NAME);
assert_eq!(Tables::ALL[24].name(), MessagingNonceInfo::NAME);
assert_eq!(Tables::ALL[23].name(), MessagingCheckpointBlock::NAME);
assert_eq!(Tables::ALL[24].name(), MessagingCheckpointNonce::NAME);
assert_eq!(Tables::ALL[25].name(), MessagingMessageNonceMapping::NAME);
assert_eq!(Tables::ALL[26].name(), MessagingMessageNonceMapping::NAME);
assert_eq!(Tables::ALL[26].name(), MessagingCheckpointIndex::NAME);

assert_eq!(Tables::Headers.table_type(), TableType::Table);
assert_eq!(Tables::BlockHashes.table_type(), TableType::Table);
Expand All @@ -302,10 +302,10 @@ mod tests {
assert_eq!(Tables::ClassChangeHistory.table_type(), TableType::DupSort);
assert_eq!(Tables::StorageChangeHistory.table_type(), TableType::DupSort);
assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table);
assert_eq!(Tables::MessagingInfo.table_type(), TableType::Table);
assert_eq!(Tables::MessagingNonceInfo.table_type(), TableType::Table);
assert_eq!(Tables::MessagingMessageNonMapping.table_type(), TableType::Table);
assert_eq!(Tables::MessagingIndexInfo.table_type(), TableType::Table);
assert_eq!(Tables::MessagingCheckpointBlock.table_type(), TableType::Table);
assert_eq!(Tables::MessagingCheckpointNonce.table_type(), TableType::Table);
assert_eq!(Tables::MessagingMessageNonceMapping.table_type(), TableType::Table);
assert_eq!(Tables::MessagingCheckpointIndex.table_type(), TableType::Table);
}

use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header};
Expand All @@ -325,6 +325,7 @@ mod tests {
};
use crate::models::list::BlockList;
use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry};
use crate::tables::Tables::{MessagingCheckpointBlock, MessagingCheckpointNonce};

macro_rules! assert_key_encode_decode {
{ $( ($name:ty, $key:expr) ),* } => {
Expand Down
10 changes: 5 additions & 5 deletions crates/katana/storage/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use katana_primitives::Felt;
use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter};
use traits::contract::{ContractClassProvider, ContractClassWriter};
use traits::env::BlockEnvProvider;
use traits::messaging::MessagingProvider;
use traits::messaging::MessagingCheckpointProvider;
use traits::state::{StateRootProvider, StateWriter};
use traits::transaction::{TransactionStatusProvider, TransactionTraceProvider};

Expand Down Expand Up @@ -382,9 +382,9 @@ where
}
}

impl<Db> MessagingProvider for BlockchainProvider<Db>
impl<Db> MessagingCheckpointProvider for BlockchainProvider<Db>
where
Db: MessagingProvider,
Db: MessagingCheckpointProvider,
{
fn get_send_from_block(&self) -> ProviderResult<Option<BlockNumber>> {
self.provider.get_send_from_block()
Expand Down Expand Up @@ -422,7 +422,7 @@ where
self.provider.get_send_from_index()
}

fn set_send_from_index(&self, _send_from_index: u64) -> ProviderResult<()> {
self.provider.get_send_from_index(send_from_index)
fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> {
self.provider.set_send_from_index(send_from_index)
}
}
Loading

0 comments on commit fe2c75b

Please sign in to comment.