Skip to content

Commit

Permalink
fix: change KatanaHooker to AsyncRwLock instead of Arc + add hook for…
Browse files Browse the repository at this point in the history
… incoming messages
  • Loading branch information
glihm committed Nov 27, 2023
1 parent 3d8a935 commit 491a380
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 47 deletions.
46 changes: 30 additions & 16 deletions crates/katana/core/src/hooker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
//! This module contains a hooker trait, that is added to katana in order to
//! allow external code to react at some precise moment of katana processing.
//!
use std::sync::Arc;

use async_trait::async_trait;
use starknet::core::types::BroadcastedInvokeTransaction;
use starknet::accounts::Call;
use starknet::core::types::{BroadcastedInvokeTransaction, FieldElement};

use crate::sequencer::KatanaSequencer;

#[async_trait]
pub trait KatanaHooker {
/// Sets a reference to the underlying sequencer.
fn set_sequencer(&mut self, sequencer: Arc<KatanaSequencer>);

/// Runs code right before a message from the L1 is converted
/// into a `L1HandlerTransaction`. This hook is usefull to
/// apply conditions on the message being captured.
///
/// # Arguments
///
/// * `from` - The contract on L2 sending the message.
/// * `to` - The recipient contract on the appchain.
/// * `selector` - The l1_handler of the appchain contract to execute.
async fn verify_message_to_appchain(
&self,
from: FieldElement,
to: FieldElement,
selector: FieldElement,
) -> bool;

/// Runs code right before an invoke transaction
/// is being added to the pool.
/// Returns true if the transaction should be included
Expand All @@ -15,10 +37,8 @@ pub trait KatanaHooker {
/// # Arguments
///
/// * `transaction` - The invoke transaction to be verified.
async fn verify_invoke_tx_before_pool(
&self,
transaction: BroadcastedInvokeTransaction,
) -> bool;
async fn verify_invoke_tx_before_pool(&self, transaction: BroadcastedInvokeTransaction)
-> bool;

/// Runs code right before a message to starknet
/// is being sent via a direct transaction.
Expand All @@ -29,20 +49,14 @@ pub trait KatanaHooker {
///
/// * `call` - The `Call` to inspect, built from the
/// message.
async fn verify_message_to_starknet_before_tx(
&self,
call: Call,
) -> bool;
async fn verify_message_to_starknet_before_tx(&self, call: Call) -> bool;

/// Runs when Solis attempts to execute an order on Starknet,
/// but it fails.
///
/// # Arguments
///
/// * `call` - The `Call` of the transaction that has failed.
/// Usually the same as `verify_message_to_starknet_before_tx`.
async fn react_on_starknet_tx_failed(
&self,
call: Call,
);
/// * `call` - The `Call` of the transaction that has failed. Usually the same as
/// `verify_message_to_starknet_before_tx`.
async fn react_on_starknet_tx_failed(&self, call: Call);
}
21 changes: 16 additions & 5 deletions crates/katana/core/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ use starknet::core::types::{
use starknet_api::core::{ChainId, ClassHash, ContractAddress, Nonce};
use starknet_api::hash::StarkFelt;
use starknet_api::state::StorageKey;
use tokio::sync::RwLock as AsyncRwLock;

use crate::backend::config::StarknetConfig;
use crate::backend::contract::StarknetContract;
use crate::backend::storage::block::{ExecutedBlock, PartialBlock, PartialHeader};
use crate::backend::storage::transaction::{
DeclareTransaction, DeployAccountTransaction, InvokeTransaction, KnownTransaction,
PendingTransaction, Transaction,
L1HandlerTransaction, PendingTransaction, Transaction,
};
use crate::backend::{Backend, ExternalFunctionCall};
use crate::db::{AsStateRefDb, StateExtRef, StateRefDb};
use crate::execution::{MaybeInvalidExecutedTransaction, PendingState};
use crate::hooker::KatanaHooker;
use crate::pool::TransactionPool;
use crate::sequencer_error::SequencerError;
use crate::service::block_producer::{BlockProducer, BlockProducerMode};
Expand All @@ -33,7 +35,6 @@ use crate::service::messaging::MessagingConfig;
use crate::service::messaging::MessagingService;
use crate::service::{NodeService, TransactionMiner};
use crate::utils::event::{ContinuationToken, ContinuationTokenError};
use crate::hooker::KatanaHooker;
type SequencerResult<T> = Result<T, SequencerError>;

#[derive(Debug, Default)]
Expand All @@ -49,11 +50,15 @@ pub struct KatanaSequencer {
pub pool: Arc<TransactionPool>,
pub backend: Arc<Backend>,
pub block_producer: BlockProducer,
pub hooker: Arc<dyn KatanaHooker + Send + Sync>,
pub hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
}

impl KatanaSequencer {
pub async fn new(config: SequencerConfig, starknet_config: StarknetConfig, hooker: Arc<dyn KatanaHooker + Send + Sync>) -> Self {
pub async fn new(
config: SequencerConfig,
starknet_config: StarknetConfig,
hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
) -> Self {
let backend = Arc::new(Backend::new(starknet_config).await);

let pool = Arc::new(TransactionPool::new());
Expand All @@ -73,7 +78,9 @@ impl KatanaSequencer {

#[cfg(feature = "messaging")]
let messaging = if let Some(config) = config.messaging.clone() {
MessagingService::new(config, Arc::clone(&pool), Arc::clone(&backend), Arc::clone(&hooker)).await.ok()
MessagingService::new(config, Arc::clone(&pool), Arc::clone(&backend), hooker.clone())
.await
.ok()
} else {
None
};
Expand Down Expand Up @@ -165,6 +172,10 @@ impl KatanaSequencer {
self.pool.add_transaction(Transaction::Invoke(transaction))
}

pub fn add_l1_handler_transaction(&self, transaction: L1HandlerTransaction) {
self.pool.add_transaction(Transaction::L1Handler(transaction))
}

pub async fn estimate_fee(
&self,
transactions: Vec<Transaction>,
Expand Down
5 changes: 3 additions & 2 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ use async_trait::async_trait;
use ethereum::EthereumMessaging;
use ethers::providers::ProviderError as EthereumProviderError;
use serde::Deserialize;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, info};
use crate::hooker::KatanaHooker;

pub use self::service::{MessagingOutcome, MessagingService};
#[cfg(feature = "starknet-messaging")]
use self::starknet::StarknetMessaging;
use crate::hooker::KatanaHooker;

pub(crate) const LOG_TARGET: &str = "messaging";
pub(crate) const CONFIG_CHAIN_ETHEREUM: &str = "ethereum";
Expand Down Expand Up @@ -172,7 +173,7 @@ pub enum MessengerMode {
impl MessengerMode {
pub async fn from_config(
config: MessagingConfig,
hooker: Arc<dyn KatanaHooker + Send + Sync>
hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
) -> MessengerResult<Self> {
match config.chain.as_str() {
CONFIG_CHAIN_ETHEREUM => match EthereumMessaging::new(config).await {
Expand Down
5 changes: 3 additions & 2 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use std::time::Duration;

use ::starknet::core::types::{FieldElement, MsgToL1};
use futures::{Future, FutureExt, Stream};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval_at, Instant, Interval};
use tracing::{error, info};

use super::{MessagingConfig, Messenger, MessengerMode, MessengerResult, LOG_TARGET};
use crate::backend::storage::transaction::{L1HandlerTransaction, Transaction};
use crate::backend::Backend;
use crate::pool::TransactionPool;
use crate::hooker::KatanaHooker;
use crate::pool::TransactionPool;

type MessagingFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
type MessageGatheringFuture = MessagingFuture<MessengerResult<(u64, usize)>>;
Expand Down Expand Up @@ -42,7 +43,7 @@ impl MessagingService {
config: MessagingConfig,
pool: Arc<TransactionPool>,
backend: Arc<Backend>,
hooker: Arc<dyn KatanaHooker + Send + Sync>,
hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
) -> anyhow::Result<Self> {
let gather_from_block = config.from_block;
let interval = interval_from_seconds(config.interval);
Expand Down
74 changes: 53 additions & 21 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use starknet_api::stark_felt;
use starknet_api::transaction::{
Calldata, L1HandlerTransaction as ApiL1HandlerTransaction, TransactionHash, TransactionVersion,
};
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, trace, warn};
use url::Url;

use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET};
use crate::backend::storage::transaction::L1HandlerTransaction;
use crate::utils::transaction::compute_l1_handler_transaction_hash_felts;
use crate::hooker::KatanaHooker;
use crate::utils::transaction::compute_l1_handler_transaction_hash_felts;

/// As messaging in starknet is only possible with EthAddress in the `to_address`
/// field, we have to set magic value to understand what the user want to do.
Expand All @@ -40,11 +41,14 @@ pub struct StarknetMessaging {
wallet: LocalWallet,
sender_account_address: FieldElement,
messaging_contract_address: FieldElement,
hooker: Arc<dyn KatanaHooker + Send + Sync>,
hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
}

impl StarknetMessaging {
pub async fn new(config: MessagingConfig, hooker: Arc<dyn KatanaHooker + Send + Sync>) -> Result<StarknetMessaging> {
pub async fn new(
config: MessagingConfig,
hooker: Arc<AsyncRwLock<dyn KatanaHooker + Send + Sync>>,
) -> Result<StarknetMessaging> {
let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new(
Url::parse(&config.rpc_url)?,
)));
Expand Down Expand Up @@ -198,25 +202,44 @@ impl Messenger for StarknetMessaging {

let mut l1_handler_txs: Vec<L1HandlerTransaction> = vec![];

self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
for (block_number, block_events) in self
.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
debug!(
target: LOG_TARGET,
"Converting events of block {} into L1HandlerTx ({} events)",
block_number,
block_events.len(),
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
{
debug!(
target: LOG_TARGET,
"Converting events of block {} into L1HandlerTx ({} events)",
block_number,
block_events.len(),
);

for e in block_events.iter() {
if let Ok((tx, info)) = l1_handler_tx_from_event(e, chain_id) {
let is_message_accepted = self
.hooker
.read()
.await
.verify_message_to_appchain(
info.from_address,
info.to_address,
info.selector,
)
.await;

if is_message_accepted {
l1_handler_txs.push(tx)
} else {
warn!(target: LOG_TARGET, "Message to appchain rejected by hooker: from:{:?} to:{:?} selector:{:?}",
info.from_address,
info.to_address,
info.selector);
}
})
});
}
}
}

Ok((to_block, l1_handler_txs))
}
Expand All @@ -230,7 +253,7 @@ impl Messenger for StarknetMessaging {

for call in &calls {
// 1. Verify before TX.
if !self.hooker.verify_message_to_starknet_before_tx(call.clone()).await {
if !self.hooker.read().await.verify_message_to_starknet_before_tx(call.clone()).await {
continue;
}

Expand All @@ -240,7 +263,7 @@ impl Messenger for StarknetMessaging {
}
Err(e) => {
// 2. React on TX error.
self.hooker.react_on_starknet_tx_failed(call.clone()).await;
self.hooker.read().await.react_on_starknet_tx_failed(call.clone()).await;
error!("Error sending invoke tx on Starknet: {:?}", e);
}
};
Expand Down Expand Up @@ -322,10 +345,17 @@ fn parse_messages(messages: &[MsgToL1]) -> MessengerResult<(Vec<FieldElement>, V
Ok((hashes, calls))
}

// TODO: this will be reworked in Katana main to use `MsgToL2` from starknet-rs.
struct MessageToL2Info {
pub from_address: FieldElement,
pub to_address: FieldElement,
pub selector: FieldElement,
}

fn l1_handler_tx_from_event(
event: &EmittedEvent,
chain_id: FieldElement,
) -> Result<L1HandlerTransaction> {
) -> Result<(L1HandlerTransaction, MessageToL2Info)> {
if event.keys[0] != selector!("MessageSentToAppchain") {
debug!(
target: LOG_TARGET,
Expand All @@ -345,6 +375,8 @@ fn l1_handler_tx_from_event(
let nonce = event.data[1];
let version = 0_u32;

let info = MessageToL2Info { from_address, to_address, selector };

// Skip the length of the serialized array for the payload which is data[2].
// Payload starts at data[3].
let mut calldata = vec![from_address];
Expand Down Expand Up @@ -378,7 +410,7 @@ fn l1_handler_tx_from_event(
paid_l1_fee: 30000_u128,
};

Ok(tx)
Ok((tx, info))
}

#[cfg(test)]
Expand Down Expand Up @@ -497,7 +529,7 @@ mod tests {
paid_l1_fee: 30000_u128,
};

let tx = l1_handler_tx_from_event(&event, chain_id).unwrap();
let (tx, _) = l1_handler_tx_from_event(&event, chain_id).unwrap();

assert_eq!(tx.inner, expected.inner);
}
Expand Down
9 changes: 8 additions & 1 deletion crates/katana/rpc/src/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,14 @@ impl StarknetApiServer for StarknetApi {
let chain_id = FieldElement::from_hex_be(&self.sequencer.chain_id().await.as_hex())
.map_err(|_| StarknetApiError::UnexpectedError)?;

if !self.sequencer.hooker.verify_invoke_tx_before_pool(invoke_transaction.clone()).await {
if !self
.sequencer
.hooker
.read()
.await
.verify_invoke_tx_before_pool(invoke_transaction.clone())
.await
{
return Err(StarknetApiError::SolisAssetFault.into());
}

Expand Down

0 comments on commit 491a380

Please sign in to comment.