diff --git a/crates/starknet-devnet-server/src/api/json_rpc/mod.rs b/crates/starknet-devnet-server/src/api/json_rpc/mod.rs index b6c6254de..602cc7cd9 100644 --- a/crates/starknet-devnet-server/src/api/json_rpc/mod.rs +++ b/crates/starknet-devnet-server/src/api/json_rpc/mod.rs @@ -28,7 +28,7 @@ use starknet_core::starknet::starknet_config::{DumpOn, StarknetConfig}; use starknet_core::{CasmContractClass, StarknetBlock}; use starknet_rs_core::types::{BlockId, BlockTag, ContractClass as CodegenContractClass, Felt}; use starknet_types::messaging::{MessageToL1, MessageToL2}; -use starknet_types::rpc::block::{Block, PendingBlock}; +use starknet_types::rpc::block::{Block, PendingBlock, ReorgData}; use starknet_types::rpc::estimate_message_fee::{ EstimateMessageFeeRequestWrapper, FeeEstimateWrapper, }; @@ -213,7 +213,7 @@ impl JsonRpcHandler { /// The latest and pending block are always defined, so to avoid having to deal with Err/None in /// places where this method is called, it is defined to return an empty accepted block, /// even though that case should never happen. - async fn get_block(&self, tag: BlockTag) -> StarknetBlock { + async fn get_block_by_tag(&self, tag: BlockTag) -> StarknetBlock { let starknet = self.api.starknet.lock().await; match starknet.get_block(&BlockId::Tag(tag)) { Ok(block) => block.clone(), @@ -225,7 +225,7 @@ impl JsonRpcHandler { &self, old_pending_block: StarknetBlock, ) -> Result<(), error::ApiError> { - let new_pending_block = self.get_block(BlockTag::Pending).await; + let new_pending_block = self.get_block_by_tag(BlockTag::Pending).await; let old_pending_txs = old_pending_block.get_transactions(); let new_pending_txs = new_pending_block.get_transactions(); @@ -337,21 +337,62 @@ impl JsonRpcHandler { /// Notify subscribers of what they are subscribed to. async fn broadcast_changes( &self, - old_latest_block: StarknetBlock, + old_latest_block: Option, old_pending_block: Option, ) -> Result<(), error::ApiError> { - let new_latest_block = self.get_block(BlockTag::Latest).await; + let old_latest_block = if let Some(block) = old_latest_block { + block + } else { + return Ok(()); + }; if let Some(old_pending_block) = old_pending_block { self.broadcast_pending_tx_changes(old_pending_block).await?; } - if new_latest_block.block_number() > old_latest_block.block_number() { - self.broadcast_latest_changes(new_latest_block).await?; - } else { - // TODO - possible only if an immutable request came or one of the following happened: - // blocks aborted, devnet restarted, devnet loaded. Or should loading cause websockets - // to be restarted too, thus not requiring notification? + let new_latest_block = self.get_block_by_tag(BlockTag::Latest).await; + + match new_latest_block.block_number().cmp(&old_latest_block.block_number()) { + std::cmp::Ordering::Less => { + self.broadcast_reorg(old_latest_block, new_latest_block).await? + } + std::cmp::Ordering::Equal => { /* no changes required */ } + std::cmp::Ordering::Greater => self.broadcast_latest_changes(new_latest_block).await?, + } + + Ok(()) + } + + async fn broadcast_reorg( + &self, + old_latest_block: StarknetBlock, + new_latest_block: StarknetBlock, + ) -> Result<(), error::ApiError> { + // Since it is impossible to determine the hash of the former successor of new_latest_block + // directly, we iterate from old_latest_block all the way to the aborted successor of + // new_latest_block. + let new_latest_hash = new_latest_block.block_hash(); + let mut orphan_starting_block_hash = old_latest_block.block_hash(); + let starknet = self.api.starknet.lock().await; + loop { + let orphan_block = starknet.get_block(&BlockId::Hash(orphan_starting_block_hash))?; + let parent_hash = orphan_block.parent_hash(); + if parent_hash == new_latest_hash { + break; + } + orphan_starting_block_hash = parent_hash; + } + + let notification = SubscriptionNotification::Reorg(ReorgData { + starting_block_hash: orphan_starting_block_hash, + starting_block_number: new_latest_block.block_number().unchecked_next(), + ending_block_hash: old_latest_block.block_hash(), + ending_block_number: old_latest_block.block_number(), + }); + + let sockets = self.api.sockets.lock().await; + for (_, socket_context) in sockets.iter() { + socket_context.notify_subscribers(¬ification).await; } Ok(()) @@ -366,13 +407,19 @@ impl JsonRpcHandler { trace!(target: "JsonRpcHandler::execute", "executing starknet request"); // for later comparison and subscription notifications - let old_latest_block = self.get_block(BlockTag::Latest).await; - let old_pending_block = if self.starknet_config.uses_pending_block() { - Some(self.get_block(BlockTag::Pending).await) + let old_latest_block = if request.requires_notifying() { + Some(self.get_block_by_tag(BlockTag::Latest).await) } else { None }; + let old_pending_block = + if request.requires_notifying() && self.starknet_config.uses_pending_block() { + Some(self.get_block_by_tag(BlockTag::Pending).await) + } else { + None + }; + // true if origin should be tried after request fails; relevant in forking mode let mut forwardable = true; @@ -530,8 +577,6 @@ impl JsonRpcHandler { } } - // TODO if request.modifies_state() { ... } - also in the beginning of this method to avoid - // unnecessary lock acquiring if let Err(e) = self.broadcast_changes(old_latest_block, old_pending_block).await { return ResponseResult::Error(e.api_error_to_rpc_error()); } @@ -757,6 +802,65 @@ pub enum JsonRpcRequest { DevnetConfig, } +impl JsonRpcRequest { + pub fn requires_notifying(&self) -> bool { + #![warn(clippy::wildcard_enum_match_arm)] + match self { + JsonRpcRequest::SpecVersion => false, + JsonRpcRequest::BlockWithTransactionHashes(_) => false, + JsonRpcRequest::BlockWithFullTransactions(_) => false, + JsonRpcRequest::BlockWithReceipts(_) => false, + JsonRpcRequest::StateUpdate(_) => false, + JsonRpcRequest::StorageAt(_) => false, + JsonRpcRequest::TransactionByHash(_) => false, + JsonRpcRequest::TransactionByBlockAndIndex(_) => false, + JsonRpcRequest::TransactionReceiptByTransactionHash(_) => false, + JsonRpcRequest::TransactionStatusByHash(_) => false, + JsonRpcRequest::MessagesStatusByL1Hash(_) => false, + JsonRpcRequest::ClassByHash(_) => false, + JsonRpcRequest::CompiledCasmByClassHash(_) => false, + JsonRpcRequest::ClassHashAtContractAddress(_) => false, + JsonRpcRequest::ClassAtContractAddress(_) => false, + JsonRpcRequest::BlockTransactionCount(_) => false, + JsonRpcRequest::Call(_) => false, + JsonRpcRequest::EstimateFee(_) => false, + JsonRpcRequest::BlockNumber => false, + JsonRpcRequest::BlockHashAndNumber => false, + JsonRpcRequest::ChainId => false, + JsonRpcRequest::Syncing => false, + JsonRpcRequest::Events(_) => false, + JsonRpcRequest::ContractNonce(_) => false, + JsonRpcRequest::AddDeclareTransaction(_) => true, + JsonRpcRequest::AddDeployAccountTransaction(_) => true, + JsonRpcRequest::AddInvokeTransaction(_) => true, + JsonRpcRequest::EstimateMessageFee(_) => false, + JsonRpcRequest::SimulateTransactions(_) => false, + JsonRpcRequest::TraceTransaction(_) => false, + JsonRpcRequest::BlockTransactionTraces(_) => false, + JsonRpcRequest::ImpersonateAccount(_) => false, + JsonRpcRequest::StopImpersonateAccount(_) => false, + JsonRpcRequest::AutoImpersonate => false, + JsonRpcRequest::StopAutoImpersonate => false, + JsonRpcRequest::Dump(_) => false, + JsonRpcRequest::Load(_) => false, + JsonRpcRequest::PostmanLoadL1MessagingContract(_) => false, + JsonRpcRequest::PostmanFlush(_) => true, + JsonRpcRequest::PostmanSendMessageToL2(_) => true, + JsonRpcRequest::PostmanConsumeMessageFromL2(_) => false, + JsonRpcRequest::CreateBlock => true, + JsonRpcRequest::AbortBlocks(_) => true, + JsonRpcRequest::SetGasPrice(_) => false, + JsonRpcRequest::Restart(_) => false, + JsonRpcRequest::SetTime(_) => true, + JsonRpcRequest::IncreaseTime(_) => true, + JsonRpcRequest::PredeployedAccounts(_) => false, + JsonRpcRequest::AccountBalance(_) => false, + JsonRpcRequest::Mint(_) => true, + JsonRpcRequest::DevnetConfig => false, + } + } +} + #[derive(Deserialize, AllVariantsSerdeRenames, VariantName)] #[cfg_attr(test, derive(Debug))] #[serde(tag = "method", content = "params")] diff --git a/crates/starknet-devnet-server/src/api/json_rpc/write_endpoints.rs b/crates/starknet-devnet-server/src/api/json_rpc/write_endpoints.rs index 49f3c7417..e9062baaa 100644 --- a/crates/starknet-devnet-server/src/api/json_rpc/write_endpoints.rs +++ b/crates/starknet-devnet-server/src/api/json_rpc/write_endpoints.rs @@ -168,6 +168,9 @@ impl JsonRpcHandler { let restart_params = data.unwrap_or_default(); self.api.starknet.lock().await.restart(restart_params.restart_l1_to_l2_messaging)?; + self.api.sockets.lock().await.clear(); + tracing::info!("Websocket memory cleared. No subscribers."); + Ok(super::JsonRpcResponse::Empty) } diff --git a/crates/starknet-devnet-server/src/subscribe.rs b/crates/starknet-devnet-server/src/subscribe.rs index 8fdf15904..b136ac638 100644 --- a/crates/starknet-devnet-server/src/subscribe.rs +++ b/crates/starknet-devnet-server/src/subscribe.rs @@ -10,7 +10,7 @@ use starknet_rs_core::types::{BlockTag, Felt}; use starknet_types::contract_address::ContractAddress; use starknet_types::emitted_event::EmittedEvent; use starknet_types::felt::TransactionHash; -use starknet_types::rpc::block::BlockHeader; +use starknet_types::rpc::block::{BlockHeader, ReorgData}; use starknet_types::rpc::transactions::{TransactionStatus, TransactionWithHash}; use tokio::sync::Mutex; @@ -58,48 +58,45 @@ impl Subscription { } pub fn matches(&self, notification: &SubscriptionNotification) -> bool { - match self { - Subscription::NewHeads => { - if let SubscriptionNotification::NewHeads(_) = notification { - return true; - } - } - Subscription::TransactionStatus { tag, transaction_hash: subscription_hash } => { - if let SubscriptionNotification::TransactionStatus(notification) = notification { - return tag == ¬ification.origin_tag - && subscription_hash == ¬ification.transaction_hash; - } - } - Subscription::PendingTransactionsFull { address_filter, .. } => { - if let SubscriptionNotification::PendingTransaction( - PendingTransactionNotification::Full(tx), - ) = notification - { - return match tx.get_sender_address() { - Some(address) => address_filter.passes(&address), - None => true, - }; - } - } - Subscription::PendingTransactionsHash { address_filter } => { - if let SubscriptionNotification::PendingTransaction( - PendingTransactionNotification::Hash(hash_wrapper), - ) = notification - { - return match hash_wrapper.sender_address { - Some(address) => address_filter.passes(&address), - None => true, - }; - } - } - Subscription::Events { address, keys_filter } => { - if let SubscriptionNotification::Event(event) = notification { - return check_if_filter_applies_for_event(address, keys_filter, &event.into()); - } + match (self, notification) { + (Subscription::NewHeads, SubscriptionNotification::NewHeads(_)) => true, + ( + Subscription::TransactionStatus { tag, transaction_hash: subscription_hash }, + SubscriptionNotification::TransactionStatus(notification), + ) => { + tag == ¬ification.origin_tag + && subscription_hash == ¬ification.transaction_hash } + ( + Subscription::PendingTransactionsFull { address_filter }, + SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Full( + tx, + )), + ) => match tx.get_sender_address() { + Some(address) => address_filter.passes(&address), + None => true, + }, + ( + Subscription::PendingTransactionsHash { address_filter }, + SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Hash( + hash_wrapper, + )), + ) => match hash_wrapper.sender_address { + Some(address) => address_filter.passes(&address), + None => true, + }, + ( + Subscription::Events { address, keys_filter }, + SubscriptionNotification::Event(event), + ) => check_if_filter_applies_for_event(address, keys_filter, &event.into()), + ( + Subscription::NewHeads + | Subscription::TransactionStatus { .. } + | Subscription::Events { .. }, + SubscriptionNotification::Reorg(_), + ) => true, // any subscription other than pending tx requires reorg notification + _ => false, } - - false } } @@ -148,6 +145,7 @@ pub enum SubscriptionNotification { TransactionStatus(NewTransactionStatus), PendingTransaction(PendingTransactionNotification), Event(EmittedEvent), + Reorg(ReorgData), } impl SubscriptionNotification { @@ -161,6 +159,7 @@ impl SubscriptionNotification { "starknet_subscriptionPendingTransactions" } SubscriptionNotification::Event(_) => "starknet_subscriptionEvents", + SubscriptionNotification::Reorg(_) => "starknet_subscriptionReorg", } } } diff --git a/crates/starknet-devnet-types/src/rpc/block.rs b/crates/starknet-devnet-types/src/rpc/block.rs index da99d5c2b..c95b83621 100644 --- a/crates/starknet-devnet-types/src/rpc/block.rs +++ b/crates/starknet-devnet-types/src/rpc/block.rs @@ -118,3 +118,17 @@ pub struct ResourcePrice { pub price_in_fri: Felt, pub price_in_wei: Felt, } + +#[derive(Debug, Clone, Serialize)] +#[serde(deny_unknown_fields)] +/// Data about reorganized blocks, starting and ending block number and hash +pub struct ReorgData { + /// Hash of the first known block of the orphaned chain + pub starting_block_hash: BlockHash, + /// Number of the first known block of the orphaned chain + pub starting_block_number: BlockNumber, + /// The last known block of the orphaned chain + pub ending_block_hash: BlockHash, + /// Number of the last known block of the orphaned chain + pub ending_block_number: BlockNumber, +} diff --git a/crates/starknet-devnet/tests/common/background_devnet.rs b/crates/starknet-devnet/tests/common/background_devnet.rs index cbd5a1edf..046712712 100644 --- a/crates/starknet-devnet/tests/common/background_devnet.rs +++ b/crates/starknet-devnet/tests/common/background_devnet.rs @@ -370,6 +370,30 @@ impl BackgroundDevnet { } } + pub async fn abort_blocks( + &self, + starting_block_id: &BlockId, + ) -> Result, anyhow::Error> { + let mut aborted_blocks = self + .send_custom_rpc( + "devnet_abortBlocks", + json!({ "starting_block_id" : starting_block_id }), + ) + .await + .map_err(|e| anyhow::Error::msg(e.to_string()))?; + + let aborted_blocks = aborted_blocks["aborted"] + .take() + .as_array() + .ok_or(anyhow::Error::msg("Invalid abort response"))? + .clone(); + + Ok(aborted_blocks + .into_iter() + .map(|block_hash| serde_json::from_value(block_hash).unwrap()) + .collect()) + } + pub async fn get_config(&self) -> serde_json::Value { self.send_custom_rpc("devnet_getConfig", json!({})).await.unwrap() } diff --git a/crates/starknet-devnet/tests/common/utils.rs b/crates/starknet-devnet/tests/common/utils.rs index eb3e2b7b2..876d1e70b 100644 --- a/crates/starknet-devnet/tests/common/utils.rs +++ b/crates/starknet-devnet/tests/common/utils.rs @@ -434,6 +434,17 @@ pub async fn send_binary_rpc_via_ws( Ok(resp_body) } +pub async fn subscribe( + ws: &mut WebSocketStream>, + subscription_method: &str, + params: serde_json::Value, +) -> Result { + let subscription_confirmation = send_text_rpc_via_ws(ws, subscription_method, params).await?; + subscription_confirmation["result"] + .as_i64() + .ok_or(anyhow::Error::msg("Subscription did not return a numeric ID")) +} + /// Tries to read from the provided ws stream. To prevent deadlock, waits for a second at most. pub async fn receive_rpc_via_ws( ws: &mut WebSocketStream>, diff --git a/crates/starknet-devnet/tests/test_abort_blocks.rs b/crates/starknet-devnet/tests/test_abort_blocks.rs index 8c9eae968..a5fe03a50 100644 --- a/crates/starknet-devnet/tests/test_abort_blocks.rs +++ b/crates/starknet-devnet/tests/test_abort_blocks.rs @@ -15,25 +15,6 @@ mod abort_blocks_tests { static DUMMY_ADDRESS: u128 = 1; static DUMMY_AMOUNT: u128 = 1; - async fn abort_blocks(devnet: &BackgroundDevnet, starting_block_id: &BlockId) -> Vec { - let mut aborted_blocks = devnet - .send_custom_rpc( - "devnet_abortBlocks", - json!({ - "starting_block_id" : starting_block_id - }), - ) - .await - .unwrap(); - - let aborted_blocks = aborted_blocks["aborted"].take().as_array().unwrap().clone(); - - aborted_blocks - .into_iter() - .map(|block_hash| serde_json::from_value(block_hash).unwrap()) - .collect() - } - async fn abort_blocks_error( devnet: &BackgroundDevnet, starting_block_id: &BlockId, @@ -42,9 +23,7 @@ mod abort_blocks_tests { let aborted_blocks_error = devnet .send_custom_rpc( "devnet_abortBlocks", - json!({ - "starting_block_id" : starting_block_id - }), + json!({ "starting_block_id" : starting_block_id }), ) .await .unwrap_err(); @@ -74,7 +53,7 @@ mod abort_blocks_tests { let genesis_block_hash = devnet.get_latest_block_with_tx_hashes().await.unwrap().block_hash; let new_block_hash = devnet.create_block().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(new_block_hash)).await; + let aborted_blocks = devnet.abort_blocks(&BlockId::Hash(new_block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![new_block_hash]); // Check if the genesis block still has ACCEPTED_ON_L2 status @@ -109,7 +88,7 @@ mod abort_blocks_tests { let first_block_hash = devnet.create_block().await.unwrap(); let second_block_hash = devnet.create_block().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(first_block_hash)).await; + let aborted_blocks = devnet.abort_blocks(&BlockId::Hash(first_block_hash)).await.unwrap(); assert_eq!(json!(aborted_blocks), json!([second_block_hash, first_block_hash])); assert_block_rejected(&devnet, &first_block_hash).await; @@ -127,7 +106,8 @@ mod abort_blocks_tests { let latest_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(latest_block.block_hash)).await; + let aborted_blocks = + devnet.abort_blocks(&BlockId::Hash(latest_block.block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![latest_block.block_hash]); assert_tx_reverted(&mint_hash, &devnet.json_rpc_client, &["Block aborted manually"]).await; @@ -141,7 +121,7 @@ mod abort_blocks_tests { .expect("Could not start Devnet"); let new_block_hash = devnet.create_block().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(new_block_hash)).await; + let aborted_blocks = devnet.abort_blocks(&BlockId::Hash(new_block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![new_block_hash]); assert_block_rejected(&devnet, &new_block_hash).await; @@ -170,14 +150,16 @@ mod abort_blocks_tests { devnet.mint(DUMMY_ADDRESS, DUMMY_AMOUNT).await; let second_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(second_block.block_hash)).await; + let aborted_blocks = + devnet.abort_blocks(&BlockId::Hash(second_block.block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![second_block.block_hash]); let balance = devnet.get_balance_latest(&Felt::from(DUMMY_ADDRESS), FeeUnit::WEI).await.unwrap(); assert_eq!(balance.to_string(), DUMMY_AMOUNT.to_string()); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(first_block.block_hash)).await; + let aborted_blocks = + devnet.abort_blocks(&BlockId::Hash(first_block.block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![first_block.block_hash]); let balance = @@ -217,7 +199,7 @@ mod abort_blocks_tests { let first_block_hash = devnet.create_block().await.unwrap(); let second_block_hash = devnet.create_block().await.unwrap(); - let aborted_blocks = abort_blocks(&devnet, &BlockId::Hash(first_block_hash)).await; + let aborted_blocks = devnet.abort_blocks(&BlockId::Hash(first_block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![second_block_hash, first_block_hash]); abort_blocks_error(&devnet, &BlockId::Hash(first_block_hash), "Block is already aborted") @@ -237,7 +219,8 @@ mod abort_blocks_tests { let fork_block_hash = fork_devnet.create_block().await.unwrap(); - let aborted_blocks = abort_blocks(&fork_devnet, &BlockId::Hash(fork_block_hash)).await; + let aborted_blocks = + fork_devnet.abort_blocks(&BlockId::Hash(fork_block_hash)).await.unwrap(); assert_eq!(aborted_blocks, vec![fork_block_hash]); abort_blocks_error( @@ -259,7 +242,7 @@ mod abort_blocks_tests { devnet.create_block().await.unwrap(); } for _ in 0..3 { - abort_blocks(&devnet, &BlockId::Tag(BlockTag::Latest)).await; + devnet.abort_blocks(&BlockId::Tag(BlockTag::Latest)).await.unwrap(); } abort_blocks_error( &devnet, @@ -288,7 +271,7 @@ mod abort_blocks_tests { .unwrap(); assert_eq!(pending_balance, (2 * DUMMY_AMOUNT).into()); - abort_blocks(&devnet, &BlockId::Tag(BlockTag::Pending)).await; + devnet.abort_blocks(&BlockId::Tag(BlockTag::Pending)).await.unwrap(); let latest_balance = devnet.get_balance_latest(&Felt::from(DUMMY_ADDRESS), FeeUnit::WEI).await.unwrap(); assert_eq!(latest_balance, DUMMY_AMOUNT.into()); diff --git a/crates/starknet-devnet/tests/test_get_class.rs b/crates/starknet-devnet/tests/test_get_class.rs index 95818e63b..24fe0fcbd 100644 --- a/crates/starknet-devnet/tests/test_get_class.rs +++ b/crates/starknet-devnet/tests/test_get_class.rs @@ -240,13 +240,7 @@ mod get_class_tests { let abortable_block = devnet.get_latest_block_with_tx_hashes().await.unwrap(); - devnet - .send_custom_rpc( - "devnet_abortBlocks", - serde_json::json!({ "starting_block_id": BlockId::Hash(abortable_block.block_hash) }), - ) - .await - .unwrap(); + devnet.abort_blocks(&BlockId::Hash(abortable_block.block_hash)).await.unwrap(); // Getting class at the following block IDs should NOT be successful after abortion; these // blocks exist, but their states don't contain the class. diff --git a/crates/starknet-devnet/tests/test_subscription_to_blocks.rs b/crates/starknet-devnet/tests/test_subscription_to_blocks.rs index 2ffcfe74e..4c677ab66 100644 --- a/crates/starknet-devnet/tests/test_subscription_to_blocks.rs +++ b/crates/starknet-devnet/tests/test_subscription_to_blocks.rs @@ -338,13 +338,7 @@ mod block_subscription_support { let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); let new_block_hash = devnet.create_block().await.unwrap(); - devnet - .send_custom_rpc( - "devnet_abortBlocks", - json!({ "starting_block_id": BlockId::Hash(new_block_hash) }), - ) - .await - .unwrap(); + devnet.abort_blocks(&BlockId::Hash(new_block_hash)).await.unwrap(); let subscription_resp = send_text_rpc_via_ws( &mut ws, diff --git a/crates/starknet-devnet/tests/test_subscription_to_reorg.rs b/crates/starknet-devnet/tests/test_subscription_to_reorg.rs new file mode 100644 index 000000000..0735fac6d --- /dev/null +++ b/crates/starknet-devnet/tests/test_subscription_to_reorg.rs @@ -0,0 +1,127 @@ +#![cfg(test)] +pub mod common; + +mod reorg_subscription_support { + use std::collections::{HashMap, HashSet}; + + use serde_json::json; + use starknet_rs_core::types::BlockId; + use tokio_tungstenite::connect_async; + + use crate::common::background_devnet::BackgroundDevnet; + use crate::common::utils::{ + assert_no_notifications, receive_rpc_via_ws, subscribe, unsubscribe, + }; + + #[tokio::test] + async fn reorg_notification_for_all_subscriptions_except_pending_tx() { + let devnet_args = ["--state-archive-capacity", "full"]; + let devnet = BackgroundDevnet::spawn_with_additional_args(&devnet_args).await.unwrap(); + + // create blocks for later abortion + let starting_block_hash = devnet.create_block().await.unwrap(); + let ending_block_hash = devnet.create_block().await.unwrap(); + + let mut notifiable_subscribers = HashMap::new(); + for (subscription_method, subscription_params) in [ + ("starknet_subscribeNewHeads", json!({})), + ("starknet_subscribeTransactionStatus", json!({ "transaction_hash": "0x1" })), + ("starknet_subscribeEvents", json!({})), + ] { + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + let subscription_id = + subscribe(&mut ws, subscription_method, subscription_params).await.unwrap(); + notifiable_subscribers.insert(subscription_id, ws); + } + + let (mut unnotifiable_ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + subscribe(&mut unnotifiable_ws, "starknet_subscribePendingTransactions", json!({})) + .await + .unwrap(); + + // assert that block-, tx_status- and event-subscribers got notified; unsubscribe them + devnet.abort_blocks(&BlockId::Hash(starting_block_hash)).await.unwrap(); + for (subscription_id, ws) in notifiable_subscribers.iter_mut() { + let notification = receive_rpc_via_ws(ws).await.unwrap(); + assert_eq!( + notification, + json!({ + "jsonrpc": "2.0", + "method": "starknet_subscriptionReorg", + "params": { + "result": { + "starting_block_hash": starting_block_hash, + "starting_block_number": 1, + "ending_block_hash": ending_block_hash, + "ending_block_number": 2, + }, + "subscription_id": subscription_id, + } + }) + ); + unsubscribe(ws, *subscription_id).await.unwrap(); + } + + // now that all sockets are unsubscribed, abort a new block and assert no notifications + let additional_block_hash = devnet.create_block().await.unwrap(); + devnet.abort_blocks(&BlockId::Hash(additional_block_hash)).await.unwrap(); + for (_, mut ws) in notifiable_subscribers { + assert_no_notifications(&mut ws).await; + } + + assert_no_notifications(&mut unnotifiable_ws).await; + } + + #[tokio::test] + async fn socket_with_n_subscriptions_should_get_n_reorg_notifications() { + let devnet_args = ["--state-archive-capacity", "full"]; + let devnet = BackgroundDevnet::spawn_with_additional_args(&devnet_args).await.unwrap(); + + let created_block_hash = devnet.create_block().await.unwrap(); + + // Create one socket with n subscriptions. + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + let mut subscription_ids = vec![]; + for subscription_method in ["starknet_subscribeNewHeads", "starknet_subscribeEvents"] { + let subscription_id = subscribe(&mut ws, subscription_method, json!({})).await.unwrap(); + subscription_ids.push(subscription_id); + } + + // Trigger reorg. + devnet.abort_blocks(&BlockId::Hash(created_block_hash)).await.unwrap(); + + // Assert n reorg notifications received. The notifications only differ in subscription_id. + let mut notification_ids = HashSet::new(); + for _ in subscription_ids.iter() { + let mut notification = receive_rpc_via_ws(&mut ws).await.unwrap(); + + // Reorg notifications may be received in any order. To assert one reorg subscription + // was received per subscription_id, we extract the IDs from notifications, store them + // in a set, and later assert equality with the set of expected subscription IDs. + let notification_id = + notification["params"]["subscription_id"].take().as_i64().unwrap(); + notification_ids.insert(notification_id); + + assert_eq!( + notification, + json!({ + "jsonrpc": "2.0", + "method": "starknet_subscriptionReorg", + "params": { + "result": { + "starting_block_hash": created_block_hash, + "starting_block_number": 1, + "ending_block_hash": created_block_hash, + "ending_block_number": 1, + }, + "subscription_id": null, + } + }) + ); + } + + assert_eq!(notification_ids, HashSet::from_iter(subscription_ids)); + + assert_no_notifications(&mut ws).await; + } +} diff --git a/crates/starknet-devnet/tests/test_websocket.rs b/crates/starknet-devnet/tests/test_websocket.rs index 7a5c943df..116ac128d 100644 --- a/crates/starknet-devnet/tests/test_websocket.rs +++ b/crates/starknet-devnet/tests/test_websocket.rs @@ -8,7 +8,9 @@ mod websocket_support { use tokio_tungstenite::connect_async; use crate::common::background_devnet::BackgroundDevnet; - use crate::common::utils::{send_binary_rpc_via_ws, send_text_rpc_via_ws}; + use crate::common::utils::{ + assert_no_notifications, send_binary_rpc_via_ws, send_text_rpc_via_ws, subscribe, + }; #[tokio::test] /// Testing for all non-ws methods would be longsome, so we just test for one devnet_ and one @@ -126,4 +128,18 @@ mod websocket_support { let resp = send_text_rpc_via_ws(&mut ws, "devnet_mint", json!({})).await.unwrap(); assert_eq!(resp["error"]["message"], "missing field `address`"); } + + #[tokio::test] + async fn restarting_should_forget_all_websocket_subscriptions() { + let devnet = BackgroundDevnet::spawn().await.unwrap(); + let (mut ws, _) = connect_async(devnet.ws_url()).await.unwrap(); + + devnet.create_block().await.unwrap(); + + subscribe(&mut ws, "starknet_subscribeNewHeads", json!({})).await.unwrap(); + + devnet.restart().await; + + assert_no_notifications(&mut ws).await; + } } diff --git a/website/docs/blocks.md b/website/docs/blocks.md index e4f301182..12c991772 100644 --- a/website/docs/blocks.md +++ b/website/docs/blocks.md @@ -97,6 +97,10 @@ Aborted blocks can only be queried by block hash. Devnet does not support the ab - already aborted blocks - Devnet's genesis block +### Reorg + +On block abortion, a reorg subscription notification will be sent to all websocket subscribers requiring so according to [JSON-RPC websocket API specification](https://github.com/starkware-libs/starknet-specs/blob/v0.8.0-rc1/api/starknet_ws_api.json#L256). The `starting_block` of the orphaned chain is the successor of the new latest block and the `ending_block` of the orphaned chain is the block that was latest before aborting. One reorg notification is sent per subscription, not per websocket, meaning that if a websocket has n subscriptions, it will receive n reorg notifications, each with its own subscription ID. + ### Request and response To abort, send one of the following: diff --git a/website/docs/dump-load-restart.md b/website/docs/dump-load-restart.md index 157cce749..45748990b 100644 --- a/website/docs/dump-load-restart.md +++ b/website/docs/dump-load-restart.md @@ -103,7 +103,7 @@ If you dumped a Devnet utilizing one class for account predeployment (e.g. `--ac ## Restarting -Devnet can be restarted by making a `POST /restart` request (no body required) or `JSON-RPC` request with method name `devnet_restart`. All of the deployed contracts (including predeployed), blocks and storage updates will be restarted to the original state, without the transactions and requests that may have been loaded from a dump file on startup. +Devnet can be restarted by making a `POST /restart` request (no body required) or `JSON-RPC` request with method name `devnet_restart`. All of the deployed contracts (including predeployed), blocks and storage updates will be restarted to the original state, without the transactions and requests that may have been loaded from a dump file on startup. Websocket subscriptions will also be forgotten. ### Restarting and L1-L2 messaging