From 55fe695fbdaaab9aeeb3cca9c4bb4f3486dcf6e7 Mon Sep 17 00:00:00 2001 From: FabijanC Date: Wed, 27 Nov 2024 12:40:00 +0100 Subject: [PATCH] Support subscribing to events via ws (#652) * Simplify event receiving via new util function --- .../src/starknet/events.rs | 2 +- .../starknet-devnet-core/src/starknet/mod.rs | 13 +- .../src/api/json_rpc/endpoints_ws.rs | 59 ++- .../src/api/json_rpc/mod.rs | 19 +- .../src/api/json_rpc/models.rs | 12 +- .../starknet-devnet-server/src/subscribe.rs | 18 +- .../src/rpc/emitted_event.rs | 10 + crates/starknet-devnet/tests/common/utils.rs | 13 + .../tests/test_subscription_to_events.rs | 462 ++++++++++++++++++ 9 files changed, 589 insertions(+), 19 deletions(-) create mode 100644 crates/starknet-devnet/tests/test_subscription_to_events.rs diff --git a/crates/starknet-devnet-core/src/starknet/events.rs b/crates/starknet-devnet-core/src/starknet/events.rs index 34e2b263a..52145a043 100644 --- a/crates/starknet-devnet-core/src/starknet/events.rs +++ b/crates/starknet-devnet-core/src/starknet/events.rs @@ -86,7 +86,7 @@ pub(crate) fn get_events( /// * `address` - Optional. The address to filter the event by. /// * `keys_filter` - Optional. The keys to filter the event by. /// * `event` - The event to check if it applies to the filters. -fn check_if_filter_applies_for_event( +pub fn check_if_filter_applies_for_event( address: &Option, keys_filter: &Option>>, event: &Event, diff --git a/crates/starknet-devnet-core/src/starknet/mod.rs b/crates/starknet-devnet-core/src/starknet/mod.rs index db6bb94e9..0c1919a74 100644 --- a/crates/starknet-devnet-core/src/starknet/mod.rs +++ b/crates/starknet-devnet-core/src/starknet/mod.rs @@ -85,7 +85,7 @@ mod add_l1_handler_transaction; mod cheats; pub(crate) mod defaulter; mod estimations; -mod events; +pub mod events; mod get_class_impls; mod predeployed; pub mod starknet_config; @@ -1034,6 +1034,17 @@ impl Starknet { .ok_or(Error::NoTransaction) } + pub fn get_unlimited_events( + &self, + from_block: Option, + to_block: Option, + address: Option, + keys: Option>>, + ) -> DevnetResult> { + events::get_events(self, from_block, to_block, address, keys, 0, None) + .map(|(emitted_events, _)| emitted_events) + } + pub fn get_events( &self, from_block: Option, diff --git a/crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs b/crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs index c6ed7bb27..d88cd175f 100644 --- a/crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs +++ b/crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs @@ -6,7 +6,8 @@ use starknet_types::starknet_api::block::{BlockNumber, BlockStatus}; use super::error::ApiError; use super::models::{ - BlockInput, PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput, + BlockInput, EventsSubscriptionInput, PendingTransactionsSubscriptionInput, SubscriptionIdInput, + TransactionBlockInput, }; use super::{JsonRpcHandler, JsonRpcSubscriptionRequest}; use crate::rpc_core::request::Id; @@ -33,7 +34,9 @@ impl JsonRpcHandler { JsonRpcSubscriptionRequest::PendingTransactions(data) => { self.subscribe_pending_txs(data, rpc_request_id, socket_id).await } - JsonRpcSubscriptionRequest::Events => todo!(), + JsonRpcSubscriptionRequest::Events(data) => { + self.subscribe_events(data, rpc_request_id, socket_id).await + } JsonRpcSubscriptionRequest::Unsubscribe(SubscriptionIdInput { subscription_id }) => { let mut sockets = self.api.sockets.lock().await; let socket_context = sockets.get_mut(&socket_id).ok_or( @@ -42,15 +45,14 @@ impl JsonRpcHandler { }), )?; - socket_context.unsubscribe(rpc_request_id, subscription_id).await?; - Ok(()) + socket_context.unsubscribe(rpc_request_id, subscription_id).await } } } /// Returns (starting block number, latest block number). Returns an error in case the starting /// block does not exist or there are too many blocks. - async fn convert_to_block_number_range( + async fn get_validated_block_number_range( &self, mut starting_block_id: BlockId, ) -> Result<(u64, u64), ApiError> { @@ -105,7 +107,7 @@ impl JsonRpcHandler { }; let (query_block_number, latest_block_number) = - self.convert_to_block_number_range(block_id).await?; + self.get_validated_block_number_range(block_id).await?; // perform the actual subscription let mut sockets = self.api.sockets.lock().await; @@ -233,7 +235,7 @@ impl JsonRpcHandler { }; let (query_block_number, latest_block_number) = - self.convert_to_block_number_range(query_block_id).await?; + self.get_validated_block_number_range(query_block_id).await?; // perform the actual subscription let mut sockets = self.api.sockets.lock().await; @@ -280,4 +282,47 @@ impl JsonRpcHandler { Ok(()) } + + async fn subscribe_events( + &self, + maybe_subscription_input: Option, + rpc_request_id: Id, + socket_id: SocketId, + ) -> Result<(), ApiError> { + let address = maybe_subscription_input + .as_ref() + .and_then(|subscription_input| subscription_input.from_address); + + let starting_block_id = maybe_subscription_input + .as_ref() + .and_then(|subscription_input| subscription_input.block.as_ref()) + .map(|b| b.0) + .unwrap_or(BlockId::Tag(BlockTag::Latest)); + + self.get_validated_block_number_range(starting_block_id).await?; + + let keys_filter = + maybe_subscription_input.and_then(|subscription_input| subscription_input.keys); + + let mut sockets = self.api.sockets.lock().await; + let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError( + Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") }, + ))?; + + let subscription = Subscription::Events { address, keys_filter: keys_filter.clone() }; + let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await; + + let events = self.api.starknet.lock().await.get_unlimited_events( + Some(starting_block_id), + Some(BlockId::Tag(BlockTag::Latest)), + address, + keys_filter, + )?; + + for event in events { + socket_context.notify(subscription_id, SubscriptionNotification::Event(event)).await; + } + + Ok(()) + } } diff --git a/crates/starknet-devnet-server/src/api/json_rpc/mod.rs b/crates/starknet-devnet-server/src/api/json_rpc/mod.rs index 140e1d903..b6c6254de 100644 --- a/crates/starknet-devnet-server/src/api/json_rpc/mod.rs +++ b/crates/starknet-devnet-server/src/api/json_rpc/mod.rs @@ -17,9 +17,9 @@ use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; use models::{ BlockAndClassHashInput, BlockAndContractAddressInput, BlockAndIndexInput, BlockInput, - CallInput, EstimateFeeInput, EventsInput, GetStorageInput, L1TransactionHashInput, - PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput, - TransactionHashInput, TransactionHashOutput, + CallInput, EstimateFeeInput, EventsInput, EventsSubscriptionInput, GetStorageInput, + L1TransactionHashInput, PendingTransactionsSubscriptionInput, SubscriptionIdInput, + TransactionBlockInput, TransactionHashInput, TransactionHashOutput, }; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -311,6 +311,17 @@ impl JsonRpcHandler { }), )); } + + let events = starknet.get_unlimited_events( + Some(BlockId::Tag(BlockTag::Latest)), + Some(BlockId::Tag(BlockTag::Latest)), + None, + None, + )?; + + for event in events { + notifications.push(SubscriptionNotification::Event(event)); + } } let sockets = self.api.sockets.lock().await; @@ -757,7 +768,7 @@ pub enum JsonRpcSubscriptionRequest { #[serde(rename = "starknet_subscribePendingTransactions", with = "optional_params")] PendingTransactions(Option), #[serde(rename = "starknet_subscribeEvents")] - Events, + Events(Option), #[serde(rename = "starknet_unsubscribe")] Unsubscribe(SubscriptionIdInput), } diff --git a/crates/starknet-devnet-server/src/api/json_rpc/models.rs b/crates/starknet-devnet-server/src/api/json_rpc/models.rs index 0e57a3801..6a126e210 100644 --- a/crates/starknet-devnet-server/src/api/json_rpc/models.rs +++ b/crates/starknet-devnet-server/src/api/json_rpc/models.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; -use starknet_rs_core::types::{Hash256, TransactionExecutionStatus, TransactionFinalityStatus}; +use starknet_rs_core::types::{ + Felt, Hash256, TransactionExecutionStatus, TransactionFinalityStatus, +}; use starknet_types::contract_address::ContractAddress; use starknet_types::felt::{BlockHash, ClassHash, TransactionHash}; use starknet_types::patricia_key::PatriciaKey; @@ -205,6 +207,14 @@ pub struct PendingTransactionsSubscriptionInput { pub sender_address: Option>, } +#[derive(Deserialize, Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct EventsSubscriptionInput { + pub block: Option, + pub from_address: Option, + pub keys: Option>>, +} + #[cfg(test)] mod tests { use starknet_rs_core::types::{BlockId as ImportedBlockId, BlockTag, Felt}; diff --git a/crates/starknet-devnet-server/src/subscribe.rs b/crates/starknet-devnet-server/src/subscribe.rs index 6344aa206..8fdf15904 100644 --- a/crates/starknet-devnet-server/src/subscribe.rs +++ b/crates/starknet-devnet-server/src/subscribe.rs @@ -5,8 +5,10 @@ use axum::extract::ws::{Message, WebSocket}; use futures::stream::SplitSink; use futures::SinkExt; use serde::{self, Serialize}; -use starknet_rs_core::types::BlockTag; +use starknet_core::starknet::events::check_if_filter_applies_for_event; +use starknet_rs_core::types::{BlockTag, Felt}; use starknet_types::contract_address::ContractAddress; +use starknet_types::emitted_event::EmittedEvent; use starknet_types::felt::TransactionHash; use starknet_types::rpc::block::BlockHeader; use starknet_types::rpc::transactions::{TransactionStatus, TransactionWithHash}; @@ -39,7 +41,7 @@ pub enum Subscription { TransactionStatus { tag: BlockTag, transaction_hash: TransactionHash }, PendingTransactionsFull { address_filter: AddressFilter }, PendingTransactionsHash { address_filter: AddressFilter }, - Events, + Events { address: Option, keys_filter: Option>> }, } impl Subscription { @@ -51,7 +53,7 @@ impl Subscription { | Subscription::PendingTransactionsHash { .. } => { SubscriptionConfirmation::NewSubscription(id) } - Subscription::Events => SubscriptionConfirmation::NewSubscription(id), + Subscription::Events { .. } => SubscriptionConfirmation::NewSubscription(id), } } @@ -90,7 +92,11 @@ impl Subscription { }; } } - Subscription::Events => todo!(), + Subscription::Events { address, keys_filter } => { + if let SubscriptionNotification::Event(event) = notification { + return check_if_filter_applies_for_event(address, keys_filter, &event.into()); + } + } } false @@ -141,6 +147,7 @@ pub enum SubscriptionNotification { NewHeads(Box), TransactionStatus(NewTransactionStatus), PendingTransaction(PendingTransactionNotification), + Event(EmittedEvent), } impl SubscriptionNotification { @@ -152,7 +159,8 @@ impl SubscriptionNotification { } SubscriptionNotification::PendingTransaction(_) => { "starknet_subscriptionPendingTransactions" - } // SubscriptionNotification::Events => "starknet_subscriptionEvents", + } + SubscriptionNotification::Event(_) => "starknet_subscriptionEvents", } } } diff --git a/crates/starknet-devnet-types/src/rpc/emitted_event.rs b/crates/starknet-devnet-types/src/rpc/emitted_event.rs index 454a46700..db3e8357f 100644 --- a/crates/starknet-devnet-types/src/rpc/emitted_event.rs +++ b/crates/starknet-devnet-types/src/rpc/emitted_event.rs @@ -41,3 +41,13 @@ impl From<&blockifier::execution::call_info::OrderedEvent> for OrderedEvent { } } } + +impl From<&EmittedEvent> for Event { + fn from(emitted_event: &EmittedEvent) -> Self { + Self { + from_address: emitted_event.from_address, + keys: emitted_event.keys.clone(), + data: emitted_event.data.clone(), + } + } +} diff --git a/crates/starknet-devnet/tests/common/utils.rs b/crates/starknet-devnet/tests/common/utils.rs index f24f4082f..eb3e2b7b2 100644 --- a/crates/starknet-devnet/tests/common/utils.rs +++ b/crates/starknet-devnet/tests/common/utils.rs @@ -444,6 +444,19 @@ pub async fn receive_rpc_via_ws( Ok(serde_json::from_str(&msg.into_text()?)?) } +/// Extract `result` from the notification and assert general properties +pub async fn receive_notification( + ws: &mut WebSocketStream>, + method: &str, + expected_subscription_id: i64, +) -> Result { + let mut notification = receive_rpc_via_ws(ws).await?; + assert_eq!(notification["jsonrpc"], "2.0"); + assert_eq!(notification["method"], method); + assert_eq!(notification["params"]["subscription_id"], expected_subscription_id); + Ok(notification["params"].take()["result"].take()) +} + pub async fn assert_no_notifications(ws: &mut WebSocketStream>) { match receive_rpc_via_ws(ws).await { Ok(resp) => panic!("Expected no notifications; found: {resp}"), diff --git a/crates/starknet-devnet/tests/test_subscription_to_events.rs b/crates/starknet-devnet/tests/test_subscription_to_events.rs new file mode 100644 index 000000000..b8fe0a233 --- /dev/null +++ b/crates/starknet-devnet/tests/test_subscription_to_events.rs @@ -0,0 +1,462 @@ +#![cfg(test)] +pub mod common; + +mod event_subscription_support { + use serde::Serialize; + use serde_json::json; + use starknet_core::constants::{STRK_ERC20_CONTRACT_ADDRESS, UDC_CONTRACT_ADDRESS}; + use starknet_rs_accounts::{Account, ExecutionEncoding, SingleOwnerAccount}; + use starknet_rs_core::types::{BlockId, BlockTag, Call, Felt, InvokeTransactionResult}; + use starknet_rs_core::utils::get_selector_from_name; + use starknet_rs_providers::jsonrpc::HttpTransport; + use starknet_rs_providers::JsonRpcClient; + use starknet_rs_signers::LocalWallet; + use starknet_types::contract_address::ContractAddress; + use tokio::net::TcpStream; + use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + + use crate::common::background_devnet::BackgroundDevnet; + use crate::common::constants; + use crate::common::utils::{ + assert_no_notifications, declare_v3_deploy_v3, + get_events_contract_in_sierra_and_compiled_class_hash, receive_notification, + receive_rpc_via_ws, send_text_rpc_via_ws, unsubscribe, + }; + + async fn subscribe_events( + ws: &mut WebSocketStream>, + params: serde_json::Value, + ) -> Result { + let subscription_confirmation = + send_text_rpc_via_ws(ws, "starknet_subscribeEvents", params).await?; + subscription_confirmation["result"] + .as_i64() + .ok_or(anyhow::Error::msg("Subscription did not return a numeric ID")) + } + + async fn receive_event( + ws: &mut WebSocketStream>, + subscription_id: i64, + ) -> Result { + receive_notification(ws, "starknet_subscriptionEvents", subscription_id).await + } + + #[derive(Serialize)] + struct EventParams { + from_address: Option, + keys: Option>>, + block: Option, + } + + async fn get_single_owner_account( + devnet: &BackgroundDevnet, + ) -> SingleOwnerAccount<&JsonRpcClient, LocalWallet> { + let (signer, account_address) = devnet.get_first_predeployed_account().await; + + SingleOwnerAccount::new( + &devnet.json_rpc_client, + signer, + account_address, + constants::CHAIN_ID, + ExecutionEncoding::New, + ) + } + + /// Returns deployment address. + async fn deploy_events_contract( + account: &SingleOwnerAccount<&JsonRpcClient, LocalWallet>, + ) -> Felt { + let (sierra, casm_hash) = get_events_contract_in_sierra_and_compiled_class_hash(); + + let (_, address) = declare_v3_deploy_v3(account, sierra, casm_hash, &[]).await.unwrap(); + address + } + + async fn emit_static_event( + account: &SingleOwnerAccount<&JsonRpcClient, LocalWallet>, + contract_address: Felt, + ) -> Result { + account + .execute_v3(vec![Call { + to: contract_address, + selector: get_selector_from_name("emit_event").unwrap(), + calldata: vec![Felt::ZERO], // what kind of event to emit + }]) + .send() + .await + .map_err(|e| anyhow::Error::msg(e.to_string())) + } + + fn static_event_key() -> Felt { + get_selector_from_name("StaticEvent").unwrap() + } + + #[tokio::test] + async fn event_subscription_with_no_params_until_unsubscription() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let subscription_id = subscribe_events(&mut ws, json!({})).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + // discard notifications emitted by system contracts - asserted in a separate test + receive_rpc_via_ws(&mut ws).await.unwrap(); // erc20 - fee charge + receive_rpc_via_ws(&mut ws).await.unwrap(); // udc - deployment + receive_rpc_via_ws(&mut ws).await.unwrap(); // erc20 - fee charge + + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + receive_rpc_via_ws(&mut ws).await.unwrap(); // erc20 - fee charge + assert_no_notifications(&mut ws).await; + + unsubscribe(&mut ws, subscription_id).await.unwrap(); + + emit_static_event(&account, contract_address).await.unwrap(); + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_only_from_filtered_address() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + let subscription_params = json!({ "from_address": contract_address }); + let subscription_id = subscribe_events(&mut ws, subscription_params).await.unwrap(); + + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_of_new_events_only_from_filtered_key() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + let subscription_params = json!({ "keys": [[static_event_key()]] }); + let subscription_id = subscribe_events(&mut ws, subscription_params).await.unwrap(); + + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_if_already_in_latest_block_in_on_tx_mode() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + let subscription_id = + subscribe_events(&mut ws, json!({ "from_address": contract_address })).await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_if_already_in_latest_block_in_on_demand_mode() { + let devnet_args = ["--block-generation-on", "demand"]; + let devnet = BackgroundDevnet::spawn_with_additional_args(&devnet_args).await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let mut account = get_single_owner_account(&devnet).await; + account.set_block_id(BlockId::Tag(BlockTag::Pending)); // for correct nonce in deployment + + let contract_address = deploy_events_contract(&account).await; + // to have declare+deploy in one block and invoke in another + devnet.create_block().await.unwrap(); + + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block_hash = devnet.create_block().await.unwrap(); + + let subscription_id = + subscribe_events(&mut ws, json!({ "from_address": contract_address })).await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block_hash, + "block_number": 2, // genesis = 0, then two block creations + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_only_when_moved_from_pending_to_latest_block() { + let devnet_args = ["--block-generation-on", "demand"]; + let devnet = BackgroundDevnet::spawn_with_additional_args(&devnet_args).await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let mut account = get_single_owner_account(&devnet).await; + account.set_block_id(BlockId::Tag(BlockTag::Pending)); // for correct nonce in deployment + + let contract_address = deploy_events_contract(&account).await; + // to have declare+deploy in one block and invoke in another + devnet.create_block().await.unwrap(); + + let subscription_id = + subscribe_events(&mut ws, json!({ "from_address": contract_address })).await.unwrap(); + + // only receive event when pending->latest + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + assert_no_notifications(&mut ws).await; + let latest_block_hash = devnet.create_block().await.unwrap(); + + let event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block_hash, + "block_number": 2, // genesis = 0, then two block creations + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_of_events_in_old_blocks_with_no_filters() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + let invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + // The declaration happens at block_number=1 so we query from there to latest + let subscription_params = json!({ "block": BlockId::Number(1) }); + let subscription_id = subscribe_events(&mut ws, subscription_params).await.unwrap(); + + // declaration of events contract fee charge + let declaration_fee_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!(declaration_fee_event["block_number"], 1); + assert_eq!(declaration_fee_event["from_address"], json!(STRK_ERC20_CONTRACT_ADDRESS)); + + // deployment of events contract: udc invocation + let deployment_udc_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!(deployment_udc_event["block_number"], 2); + assert_eq!(deployment_udc_event["from_address"], json!(UDC_CONTRACT_ADDRESS)); + + // deployment of events contract: fee charge + let deployment_fee_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!(deployment_fee_event["block_number"], 2); + assert_eq!(deployment_fee_event["from_address"], json!(STRK_ERC20_CONTRACT_ADDRESS),); + + // invocation of events contract + let invocation_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + invocation_event, + json!({ + "transaction_hash": invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + + // invocation of events contract fee charge + let invocation_fee_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!(invocation_fee_event["block_number"], latest_block.block_number); + assert_eq!(invocation_fee_event["from_address"], json!(STRK_ERC20_CONTRACT_ADDRESS)); + + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_of_old_and_new_events_with_address_filter() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + let old_invocation = emit_static_event(&account, contract_address).await.unwrap(); + let block_before_subscription = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + // The declaration happens at block_number=1, but only invocation should be notified of + let subscription_params = + json!({ "block": BlockId::Number(1), "from_address": contract_address }); + let subscription_id = subscribe_events(&mut ws, subscription_params).await.unwrap(); + + // assert presence of old event (event that was triggered before the subscription) + let old_invocation_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + old_invocation_event, + json!({ + "transaction_hash": old_invocation.transaction_hash, + "block_hash": block_before_subscription.block_hash, + "block_number": block_before_subscription.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + assert_no_notifications(&mut ws).await; + + // new event (after subscription) + let new_invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + let new_invocation_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + new_invocation_event, + json!({ + "transaction_hash": new_invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_notify_of_old_and_new_events_with_key_filter() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + let old_invocation = emit_static_event(&account, contract_address).await.unwrap(); + let block_before_subscription = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + + // The declaration happens at block_number=1, but only invocation should be notified of + let subscription_params = + json!({ "block": BlockId::Number(1), "keys": [[static_event_key()]] }); + let subscription_id = subscribe_events(&mut ws, subscription_params).await.unwrap(); + + // assert presence of old event (event that was triggered before the subscription) + let invocation_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + invocation_event, + json!({ + "transaction_hash": old_invocation.transaction_hash, + "block_hash": block_before_subscription.block_hash, + "block_number": block_before_subscription.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + assert_no_notifications(&mut ws).await; + + // new event (after subscription) + let new_invocation = emit_static_event(&account, contract_address).await.unwrap(); + let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); + let invocation_event = receive_event(&mut ws, subscription_id).await.unwrap(); + assert_eq!( + invocation_event, + json!({ + "transaction_hash": new_invocation.transaction_hash, + "block_hash": latest_block.block_hash, + "block_number": latest_block.block_number, + "from_address": contract_address, + "keys": [static_event_key()], + "data": [] + }) + ); + assert_no_notifications(&mut ws).await; + } + + #[tokio::test] + async fn should_not_notify_of_events_in_too_old_blocks() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + let account = get_single_owner_account(&devnet).await; + let contract_address = deploy_events_contract(&account).await; + + emit_static_event(&account, contract_address).await.unwrap(); + let last_block_hash = devnet.create_block().await.unwrap(); + + subscribe_events(&mut ws, json!({ "block": BlockId::Hash(last_block_hash) })) + .await + .unwrap(); + + assert_no_notifications(&mut ws).await; + } +}