Skip to content

Commit

Permalink
Support reorg subscription (#653)
Browse files Browse the repository at this point in the history
* Make aborting a BackgroundDevnet method

* Fix restarting behavior; check notifiability only if appropriate request

* Improve fn matches
  • Loading branch information
FabijanC authored Nov 28, 2024
1 parent 55fe695 commit 6a07b69
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 105 deletions.
136 changes: 120 additions & 16 deletions crates/starknet-devnet-server/src/api/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use starknet_core::starknet::starknet_config::{DumpOn, StarknetConfig};
use starknet_core::{CasmContractClass, StarknetBlock};
use starknet_rs_core::types::{BlockId, BlockTag, ContractClass as CodegenContractClass, Felt};
use starknet_types::messaging::{MessageToL1, MessageToL2};
use starknet_types::rpc::block::{Block, PendingBlock};
use starknet_types::rpc::block::{Block, PendingBlock, ReorgData};
use starknet_types::rpc::estimate_message_fee::{
EstimateMessageFeeRequestWrapper, FeeEstimateWrapper,
};
Expand Down Expand Up @@ -213,7 +213,7 @@ impl JsonRpcHandler {
/// The latest and pending block are always defined, so to avoid having to deal with Err/None in
/// places where this method is called, it is defined to return an empty accepted block,
/// even though that case should never happen.
async fn get_block(&self, tag: BlockTag) -> StarknetBlock {
async fn get_block_by_tag(&self, tag: BlockTag) -> StarknetBlock {
let starknet = self.api.starknet.lock().await;
match starknet.get_block(&BlockId::Tag(tag)) {
Ok(block) => block.clone(),
Expand All @@ -225,7 +225,7 @@ impl JsonRpcHandler {
&self,
old_pending_block: StarknetBlock,
) -> Result<(), error::ApiError> {
let new_pending_block = self.get_block(BlockTag::Pending).await;
let new_pending_block = self.get_block_by_tag(BlockTag::Pending).await;
let old_pending_txs = old_pending_block.get_transactions();
let new_pending_txs = new_pending_block.get_transactions();

Expand Down Expand Up @@ -337,21 +337,62 @@ impl JsonRpcHandler {
/// Notify subscribers of what they are subscribed to.
async fn broadcast_changes(
&self,
old_latest_block: StarknetBlock,
old_latest_block: Option<StarknetBlock>,
old_pending_block: Option<StarknetBlock>,
) -> Result<(), error::ApiError> {
let new_latest_block = self.get_block(BlockTag::Latest).await;
let old_latest_block = if let Some(block) = old_latest_block {
block
} else {
return Ok(());
};

if let Some(old_pending_block) = old_pending_block {
self.broadcast_pending_tx_changes(old_pending_block).await?;
}

if new_latest_block.block_number() > old_latest_block.block_number() {
self.broadcast_latest_changes(new_latest_block).await?;
} else {
// TODO - possible only if an immutable request came or one of the following happened:
// blocks aborted, devnet restarted, devnet loaded. Or should loading cause websockets
// to be restarted too, thus not requiring notification?
let new_latest_block = self.get_block_by_tag(BlockTag::Latest).await;

match new_latest_block.block_number().cmp(&old_latest_block.block_number()) {
std::cmp::Ordering::Less => {
self.broadcast_reorg(old_latest_block, new_latest_block).await?
}
std::cmp::Ordering::Equal => { /* no changes required */ }
std::cmp::Ordering::Greater => self.broadcast_latest_changes(new_latest_block).await?,
}

Ok(())
}

async fn broadcast_reorg(
&self,
old_latest_block: StarknetBlock,
new_latest_block: StarknetBlock,
) -> Result<(), error::ApiError> {
// Since it is impossible to determine the hash of the former successor of new_latest_block
// directly, we iterate from old_latest_block all the way to the aborted successor of
// new_latest_block.
let new_latest_hash = new_latest_block.block_hash();
let mut orphan_starting_block_hash = old_latest_block.block_hash();
let starknet = self.api.starknet.lock().await;
loop {
let orphan_block = starknet.get_block(&BlockId::Hash(orphan_starting_block_hash))?;
let parent_hash = orphan_block.parent_hash();
if parent_hash == new_latest_hash {
break;
}
orphan_starting_block_hash = parent_hash;
}

let notification = SubscriptionNotification::Reorg(ReorgData {
starting_block_hash: orphan_starting_block_hash,
starting_block_number: new_latest_block.block_number().unchecked_next(),
ending_block_hash: old_latest_block.block_hash(),
ending_block_number: old_latest_block.block_number(),
});

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&notification).await;
}

Ok(())
Expand All @@ -366,13 +407,19 @@ impl JsonRpcHandler {
trace!(target: "JsonRpcHandler::execute", "executing starknet request");

// for later comparison and subscription notifications
let old_latest_block = self.get_block(BlockTag::Latest).await;
let old_pending_block = if self.starknet_config.uses_pending_block() {
Some(self.get_block(BlockTag::Pending).await)
let old_latest_block = if request.requires_notifying() {
Some(self.get_block_by_tag(BlockTag::Latest).await)
} else {
None
};

let old_pending_block =
if request.requires_notifying() && self.starknet_config.uses_pending_block() {
Some(self.get_block_by_tag(BlockTag::Pending).await)
} else {
None
};

// true if origin should be tried after request fails; relevant in forking mode
let mut forwardable = true;

Expand Down Expand Up @@ -530,8 +577,6 @@ impl JsonRpcHandler {
}
}

// TODO if request.modifies_state() { ... } - also in the beginning of this method to avoid
// unnecessary lock acquiring
if let Err(e) = self.broadcast_changes(old_latest_block, old_pending_block).await {
return ResponseResult::Error(e.api_error_to_rpc_error());
}
Expand Down Expand Up @@ -757,6 +802,65 @@ pub enum JsonRpcRequest {
DevnetConfig,
}

impl JsonRpcRequest {
pub fn requires_notifying(&self) -> bool {
#![warn(clippy::wildcard_enum_match_arm)]
match self {
JsonRpcRequest::SpecVersion => false,
JsonRpcRequest::BlockWithTransactionHashes(_) => false,
JsonRpcRequest::BlockWithFullTransactions(_) => false,
JsonRpcRequest::BlockWithReceipts(_) => false,
JsonRpcRequest::StateUpdate(_) => false,
JsonRpcRequest::StorageAt(_) => false,
JsonRpcRequest::TransactionByHash(_) => false,
JsonRpcRequest::TransactionByBlockAndIndex(_) => false,
JsonRpcRequest::TransactionReceiptByTransactionHash(_) => false,
JsonRpcRequest::TransactionStatusByHash(_) => false,
JsonRpcRequest::MessagesStatusByL1Hash(_) => false,
JsonRpcRequest::ClassByHash(_) => false,
JsonRpcRequest::CompiledCasmByClassHash(_) => false,
JsonRpcRequest::ClassHashAtContractAddress(_) => false,
JsonRpcRequest::ClassAtContractAddress(_) => false,
JsonRpcRequest::BlockTransactionCount(_) => false,
JsonRpcRequest::Call(_) => false,
JsonRpcRequest::EstimateFee(_) => false,
JsonRpcRequest::BlockNumber => false,
JsonRpcRequest::BlockHashAndNumber => false,
JsonRpcRequest::ChainId => false,
JsonRpcRequest::Syncing => false,
JsonRpcRequest::Events(_) => false,
JsonRpcRequest::ContractNonce(_) => false,
JsonRpcRequest::AddDeclareTransaction(_) => true,
JsonRpcRequest::AddDeployAccountTransaction(_) => true,
JsonRpcRequest::AddInvokeTransaction(_) => true,
JsonRpcRequest::EstimateMessageFee(_) => false,
JsonRpcRequest::SimulateTransactions(_) => false,
JsonRpcRequest::TraceTransaction(_) => false,
JsonRpcRequest::BlockTransactionTraces(_) => false,
JsonRpcRequest::ImpersonateAccount(_) => false,
JsonRpcRequest::StopImpersonateAccount(_) => false,
JsonRpcRequest::AutoImpersonate => false,
JsonRpcRequest::StopAutoImpersonate => false,
JsonRpcRequest::Dump(_) => false,
JsonRpcRequest::Load(_) => false,
JsonRpcRequest::PostmanLoadL1MessagingContract(_) => false,
JsonRpcRequest::PostmanFlush(_) => true,
JsonRpcRequest::PostmanSendMessageToL2(_) => true,
JsonRpcRequest::PostmanConsumeMessageFromL2(_) => false,
JsonRpcRequest::CreateBlock => true,
JsonRpcRequest::AbortBlocks(_) => true,
JsonRpcRequest::SetGasPrice(_) => false,
JsonRpcRequest::Restart(_) => false,
JsonRpcRequest::SetTime(_) => true,
JsonRpcRequest::IncreaseTime(_) => true,
JsonRpcRequest::PredeployedAccounts(_) => false,
JsonRpcRequest::AccountBalance(_) => false,
JsonRpcRequest::Mint(_) => true,
JsonRpcRequest::DevnetConfig => false,
}
}
}

#[derive(Deserialize, AllVariantsSerdeRenames, VariantName)]
#[cfg_attr(test, derive(Debug))]
#[serde(tag = "method", content = "params")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ impl JsonRpcHandler {
let restart_params = data.unwrap_or_default();
self.api.starknet.lock().await.restart(restart_params.restart_l1_to_l2_messaging)?;

self.api.sockets.lock().await.clear();
tracing::info!("Websocket memory cleared. No subscribers.");

Ok(super::JsonRpcResponse::Empty)
}

Expand Down
81 changes: 40 additions & 41 deletions crates/starknet-devnet-server/src/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use starknet_rs_core::types::{BlockTag, Felt};
use starknet_types::contract_address::ContractAddress;
use starknet_types::emitted_event::EmittedEvent;
use starknet_types::felt::TransactionHash;
use starknet_types::rpc::block::BlockHeader;
use starknet_types::rpc::block::{BlockHeader, ReorgData};
use starknet_types::rpc::transactions::{TransactionStatus, TransactionWithHash};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -58,48 +58,45 @@ impl Subscription {
}

pub fn matches(&self, notification: &SubscriptionNotification) -> bool {
match self {
Subscription::NewHeads => {
if let SubscriptionNotification::NewHeads(_) = notification {
return true;
}
}
Subscription::TransactionStatus { tag, transaction_hash: subscription_hash } => {
if let SubscriptionNotification::TransactionStatus(notification) = notification {
return tag == &notification.origin_tag
&& subscription_hash == &notification.transaction_hash;
}
}
Subscription::PendingTransactionsFull { address_filter, .. } => {
if let SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(tx),
) = notification
{
return match tx.get_sender_address() {
Some(address) => address_filter.passes(&address),
None => true,
};
}
}
Subscription::PendingTransactionsHash { address_filter } => {
if let SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(hash_wrapper),
) = notification
{
return match hash_wrapper.sender_address {
Some(address) => address_filter.passes(&address),
None => true,
};
}
}
Subscription::Events { address, keys_filter } => {
if let SubscriptionNotification::Event(event) = notification {
return check_if_filter_applies_for_event(address, keys_filter, &event.into());
}
match (self, notification) {
(Subscription::NewHeads, SubscriptionNotification::NewHeads(_)) => true,
(
Subscription::TransactionStatus { tag, transaction_hash: subscription_hash },
SubscriptionNotification::TransactionStatus(notification),
) => {
tag == &notification.origin_tag
&& subscription_hash == &notification.transaction_hash
}
(
Subscription::PendingTransactionsFull { address_filter },
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Full(
tx,
)),
) => match tx.get_sender_address() {
Some(address) => address_filter.passes(&address),
None => true,
},
(
Subscription::PendingTransactionsHash { address_filter },
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Hash(
hash_wrapper,
)),
) => match hash_wrapper.sender_address {
Some(address) => address_filter.passes(&address),
None => true,
},
(
Subscription::Events { address, keys_filter },
SubscriptionNotification::Event(event),
) => check_if_filter_applies_for_event(address, keys_filter, &event.into()),
(
Subscription::NewHeads
| Subscription::TransactionStatus { .. }
| Subscription::Events { .. },
SubscriptionNotification::Reorg(_),
) => true, // any subscription other than pending tx requires reorg notification
_ => false,
}

false
}
}

Expand Down Expand Up @@ -148,6 +145,7 @@ pub enum SubscriptionNotification {
TransactionStatus(NewTransactionStatus),
PendingTransaction(PendingTransactionNotification),
Event(EmittedEvent),
Reorg(ReorgData),
}

impl SubscriptionNotification {
Expand All @@ -161,6 +159,7 @@ impl SubscriptionNotification {
"starknet_subscriptionPendingTransactions"
}
SubscriptionNotification::Event(_) => "starknet_subscriptionEvents",
SubscriptionNotification::Reorg(_) => "starknet_subscriptionReorg",
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions crates/starknet-devnet-types/src/rpc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,17 @@ pub struct ResourcePrice {
pub price_in_fri: Felt,
pub price_in_wei: Felt,
}

#[derive(Debug, Clone, Serialize)]
#[serde(deny_unknown_fields)]
/// Data about reorganized blocks, starting and ending block number and hash
pub struct ReorgData {
/// Hash of the first known block of the orphaned chain
pub starting_block_hash: BlockHash,
/// Number of the first known block of the orphaned chain
pub starting_block_number: BlockNumber,
/// The last known block of the orphaned chain
pub ending_block_hash: BlockHash,
/// Number of the last known block of the orphaned chain
pub ending_block_number: BlockNumber,
}
24 changes: 24 additions & 0 deletions crates/starknet-devnet/tests/common/background_devnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,30 @@ impl BackgroundDevnet {
}
}

pub async fn abort_blocks(
&self,
starting_block_id: &BlockId,
) -> Result<Vec<Felt>, anyhow::Error> {
let mut aborted_blocks = self
.send_custom_rpc(
"devnet_abortBlocks",
json!({ "starting_block_id" : starting_block_id }),
)
.await
.map_err(|e| anyhow::Error::msg(e.to_string()))?;

let aborted_blocks = aborted_blocks["aborted"]
.take()
.as_array()
.ok_or(anyhow::Error::msg("Invalid abort response"))?
.clone();

Ok(aborted_blocks
.into_iter()
.map(|block_hash| serde_json::from_value(block_hash).unwrap())
.collect())
}

pub async fn get_config(&self) -> serde_json::Value {
self.send_custom_rpc("devnet_getConfig", json!({})).await.unwrap()
}
Expand Down
11 changes: 11 additions & 0 deletions crates/starknet-devnet/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,17 @@ pub async fn send_binary_rpc_via_ws(
Ok(resp_body)
}

pub async fn subscribe(
ws: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
subscription_method: &str,
params: serde_json::Value,
) -> Result<i64, anyhow::Error> {
let subscription_confirmation = send_text_rpc_via_ws(ws, subscription_method, params).await?;
subscription_confirmation["result"]
.as_i64()
.ok_or(anyhow::Error::msg("Subscription did not return a numeric ID"))
}

/// Tries to read from the provided ws stream. To prevent deadlock, waits for a second at most.
pub async fn receive_rpc_via_ws(
ws: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
Expand Down
Loading

0 comments on commit 6a07b69

Please sign in to comment.