Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reorg subscription #653

Merged
merged 16 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 120 additions & 16 deletions crates/starknet-devnet-server/src/api/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use starknet_core::starknet::starknet_config::{DumpOn, StarknetConfig};
use starknet_core::{CasmContractClass, StarknetBlock};
use starknet_rs_core::types::{BlockId, BlockTag, ContractClass as CodegenContractClass, Felt};
use starknet_types::messaging::{MessageToL1, MessageToL2};
use starknet_types::rpc::block::{Block, PendingBlock};
use starknet_types::rpc::block::{Block, PendingBlock, ReorgData};
use starknet_types::rpc::estimate_message_fee::{
EstimateMessageFeeRequestWrapper, FeeEstimateWrapper,
};
Expand Down Expand Up @@ -213,7 +213,7 @@ impl JsonRpcHandler {
/// The latest and pending block are always defined, so to avoid having to deal with Err/None in
/// places where this method is called, it is defined to return an empty accepted block,
/// even though that case should never happen.
async fn get_block(&self, tag: BlockTag) -> StarknetBlock {
async fn get_block_by_tag(&self, tag: BlockTag) -> StarknetBlock {
let starknet = self.api.starknet.lock().await;
match starknet.get_block(&BlockId::Tag(tag)) {
Ok(block) => block.clone(),
Expand All @@ -225,7 +225,7 @@ impl JsonRpcHandler {
&self,
old_pending_block: StarknetBlock,
) -> Result<(), error::ApiError> {
let new_pending_block = self.get_block(BlockTag::Pending).await;
let new_pending_block = self.get_block_by_tag(BlockTag::Pending).await;
let old_pending_txs = old_pending_block.get_transactions();
let new_pending_txs = new_pending_block.get_transactions();

Expand Down Expand Up @@ -337,21 +337,62 @@ impl JsonRpcHandler {
/// Notify subscribers of what they are subscribed to.
async fn broadcast_changes(
&self,
old_latest_block: StarknetBlock,
old_latest_block: Option<StarknetBlock>,
old_pending_block: Option<StarknetBlock>,
) -> Result<(), error::ApiError> {
let new_latest_block = self.get_block(BlockTag::Latest).await;
let old_latest_block = if let Some(block) = old_latest_block {
block
} else {
return Ok(());
};

if let Some(old_pending_block) = old_pending_block {
self.broadcast_pending_tx_changes(old_pending_block).await?;
}

if new_latest_block.block_number() > old_latest_block.block_number() {
self.broadcast_latest_changes(new_latest_block).await?;
} else {
// TODO - possible only if an immutable request came or one of the following happened:
// blocks aborted, devnet restarted, devnet loaded. Or should loading cause websockets
// to be restarted too, thus not requiring notification?
let new_latest_block = self.get_block_by_tag(BlockTag::Latest).await;

match new_latest_block.block_number().cmp(&old_latest_block.block_number()) {
std::cmp::Ordering::Less => {
self.broadcast_reorg(old_latest_block, new_latest_block).await?
}
std::cmp::Ordering::Equal => { /* no changes required */ }
std::cmp::Ordering::Greater => self.broadcast_latest_changes(new_latest_block).await?,
}

Ok(())
}

async fn broadcast_reorg(
&self,
old_latest_block: StarknetBlock,
new_latest_block: StarknetBlock,
) -> Result<(), error::ApiError> {
// Since it is impossible to determine the hash of the former successor of new_latest_block
// directly, we iterate from old_latest_block all the way to the aborted successor of
// new_latest_block.
let new_latest_hash = new_latest_block.block_hash();
let mut orphan_starting_block_hash = old_latest_block.block_hash();
let starknet = self.api.starknet.lock().await;
loop {
let orphan_block = starknet.get_block(&BlockId::Hash(orphan_starting_block_hash))?;
let parent_hash = orphan_block.parent_hash();
if parent_hash == new_latest_hash {
break;
}
orphan_starting_block_hash = parent_hash;
}

let notification = SubscriptionNotification::Reorg(ReorgData {
starting_block_hash: orphan_starting_block_hash,
starting_block_number: new_latest_block.block_number().unchecked_next(),
ending_block_hash: old_latest_block.block_hash(),
ending_block_number: old_latest_block.block_number(),
});

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&notification).await;
}

Ok(())
Expand All @@ -366,13 +407,19 @@ impl JsonRpcHandler {
trace!(target: "JsonRpcHandler::execute", "executing starknet request");

// for later comparison and subscription notifications
let old_latest_block = self.get_block(BlockTag::Latest).await;
let old_pending_block = if self.starknet_config.uses_pending_block() {
Some(self.get_block(BlockTag::Pending).await)
let old_latest_block = if request.requires_notifying() {
Some(self.get_block_by_tag(BlockTag::Latest).await)
} else {
None
};

let old_pending_block =
if request.requires_notifying() && self.starknet_config.uses_pending_block() {
Some(self.get_block_by_tag(BlockTag::Pending).await)
} else {
None
};

// true if origin should be tried after request fails; relevant in forking mode
let mut forwardable = true;

Expand Down Expand Up @@ -530,8 +577,6 @@ impl JsonRpcHandler {
}
}

// TODO if request.modifies_state() { ... } - also in the beginning of this method to avoid
// unnecessary lock acquiring
if let Err(e) = self.broadcast_changes(old_latest_block, old_pending_block).await {
return ResponseResult::Error(e.api_error_to_rpc_error());
}
Expand Down Expand Up @@ -757,6 +802,65 @@ pub enum JsonRpcRequest {
DevnetConfig,
}

impl JsonRpcRequest {
pub fn requires_notifying(&self) -> bool {
#![warn(clippy::wildcard_enum_match_arm)]
match self {
JsonRpcRequest::SpecVersion => false,
JsonRpcRequest::BlockWithTransactionHashes(_) => false,
JsonRpcRequest::BlockWithFullTransactions(_) => false,
JsonRpcRequest::BlockWithReceipts(_) => false,
JsonRpcRequest::StateUpdate(_) => false,
JsonRpcRequest::StorageAt(_) => false,
JsonRpcRequest::TransactionByHash(_) => false,
JsonRpcRequest::TransactionByBlockAndIndex(_) => false,
JsonRpcRequest::TransactionReceiptByTransactionHash(_) => false,
JsonRpcRequest::TransactionStatusByHash(_) => false,
JsonRpcRequest::MessagesStatusByL1Hash(_) => false,
JsonRpcRequest::ClassByHash(_) => false,
JsonRpcRequest::CompiledCasmByClassHash(_) => false,
JsonRpcRequest::ClassHashAtContractAddress(_) => false,
JsonRpcRequest::ClassAtContractAddress(_) => false,
JsonRpcRequest::BlockTransactionCount(_) => false,
JsonRpcRequest::Call(_) => false,
JsonRpcRequest::EstimateFee(_) => false,
JsonRpcRequest::BlockNumber => false,
JsonRpcRequest::BlockHashAndNumber => false,
JsonRpcRequest::ChainId => false,
JsonRpcRequest::Syncing => false,
JsonRpcRequest::Events(_) => false,
JsonRpcRequest::ContractNonce(_) => false,
JsonRpcRequest::AddDeclareTransaction(_) => true,
JsonRpcRequest::AddDeployAccountTransaction(_) => true,
JsonRpcRequest::AddInvokeTransaction(_) => true,
JsonRpcRequest::EstimateMessageFee(_) => false,
JsonRpcRequest::SimulateTransactions(_) => false,
JsonRpcRequest::TraceTransaction(_) => false,
JsonRpcRequest::BlockTransactionTraces(_) => false,
JsonRpcRequest::ImpersonateAccount(_) => false,
JsonRpcRequest::StopImpersonateAccount(_) => false,
JsonRpcRequest::AutoImpersonate => false,
JsonRpcRequest::StopAutoImpersonate => false,
JsonRpcRequest::Dump(_) => false,
JsonRpcRequest::Load(_) => false,
JsonRpcRequest::PostmanLoadL1MessagingContract(_) => false,
JsonRpcRequest::PostmanFlush(_) => true,
JsonRpcRequest::PostmanSendMessageToL2(_) => true,
JsonRpcRequest::PostmanConsumeMessageFromL2(_) => false,
JsonRpcRequest::CreateBlock => true,
JsonRpcRequest::AbortBlocks(_) => true,
JsonRpcRequest::SetGasPrice(_) => false,
JsonRpcRequest::Restart(_) => false,
JsonRpcRequest::SetTime(_) => true,
JsonRpcRequest::IncreaseTime(_) => true,
JsonRpcRequest::PredeployedAccounts(_) => false,
JsonRpcRequest::AccountBalance(_) => false,
JsonRpcRequest::Mint(_) => true,
JsonRpcRequest::DevnetConfig => false,
}
}
}

#[derive(Deserialize, AllVariantsSerdeRenames, VariantName)]
#[cfg_attr(test, derive(Debug))]
#[serde(tag = "method", content = "params")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ impl JsonRpcHandler {
let restart_params = data.unwrap_or_default();
self.api.starknet.lock().await.restart(restart_params.restart_l1_to_l2_messaging)?;

self.api.sockets.lock().await.clear();
tracing::info!("Websocket memory cleared. No subscribers.");

Ok(super::JsonRpcResponse::Empty)
}

Expand Down
81 changes: 40 additions & 41 deletions crates/starknet-devnet-server/src/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use starknet_rs_core::types::{BlockTag, Felt};
use starknet_types::contract_address::ContractAddress;
use starknet_types::emitted_event::EmittedEvent;
use starknet_types::felt::TransactionHash;
use starknet_types::rpc::block::BlockHeader;
use starknet_types::rpc::block::{BlockHeader, ReorgData};
use starknet_types::rpc::transactions::{TransactionStatus, TransactionWithHash};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -58,48 +58,45 @@ impl Subscription {
}

pub fn matches(&self, notification: &SubscriptionNotification) -> bool {
match self {
Subscription::NewHeads => {
if let SubscriptionNotification::NewHeads(_) = notification {
return true;
}
}
Subscription::TransactionStatus { tag, transaction_hash: subscription_hash } => {
if let SubscriptionNotification::TransactionStatus(notification) = notification {
return tag == &notification.origin_tag
&& subscription_hash == &notification.transaction_hash;
}
}
Subscription::PendingTransactionsFull { address_filter, .. } => {
if let SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(tx),
) = notification
{
return match tx.get_sender_address() {
Some(address) => address_filter.passes(&address),
None => true,
};
}
}
Subscription::PendingTransactionsHash { address_filter } => {
if let SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(hash_wrapper),
) = notification
{
return match hash_wrapper.sender_address {
Some(address) => address_filter.passes(&address),
None => true,
};
}
}
Subscription::Events { address, keys_filter } => {
if let SubscriptionNotification::Event(event) = notification {
return check_if_filter_applies_for_event(address, keys_filter, &event.into());
}
match (self, notification) {
(Subscription::NewHeads, SubscriptionNotification::NewHeads(_)) => true,
(
Subscription::TransactionStatus { tag, transaction_hash: subscription_hash },
SubscriptionNotification::TransactionStatus(notification),
) => {
tag == &notification.origin_tag
&& subscription_hash == &notification.transaction_hash
}
(
Subscription::PendingTransactionsFull { address_filter },
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Full(
tx,
)),
) => match tx.get_sender_address() {
Some(address) => address_filter.passes(&address),
None => true,
},
(
Subscription::PendingTransactionsHash { address_filter },
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Hash(
hash_wrapper,
)),
) => match hash_wrapper.sender_address {
Some(address) => address_filter.passes(&address),
None => true,
},
(
Subscription::Events { address, keys_filter },
SubscriptionNotification::Event(event),
) => check_if_filter_applies_for_event(address, keys_filter, &event.into()),
(
Subscription::NewHeads
| Subscription::TransactionStatus { .. }
| Subscription::Events { .. },
SubscriptionNotification::Reorg(_),
) => true, // any subscription other than pending tx requires reorg notification
_ => false,
}

false
}
}

Expand Down Expand Up @@ -148,6 +145,7 @@ pub enum SubscriptionNotification {
TransactionStatus(NewTransactionStatus),
PendingTransaction(PendingTransactionNotification),
Event(EmittedEvent),
Reorg(ReorgData),
}

impl SubscriptionNotification {
Expand All @@ -161,6 +159,7 @@ impl SubscriptionNotification {
"starknet_subscriptionPendingTransactions"
}
SubscriptionNotification::Event(_) => "starknet_subscriptionEvents",
SubscriptionNotification::Reorg(_) => "starknet_subscriptionReorg",
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions crates/starknet-devnet-types/src/rpc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,17 @@ pub struct ResourcePrice {
pub price_in_fri: Felt,
pub price_in_wei: Felt,
}

#[derive(Debug, Clone, Serialize)]
#[serde(deny_unknown_fields)]
/// Data about reorganized blocks, starting and ending block number and hash
pub struct ReorgData {
/// Hash of the first known block of the orphaned chain
pub starting_block_hash: BlockHash,
/// Number of the first known block of the orphaned chain
pub starting_block_number: BlockNumber,
/// The last known block of the orphaned chain
pub ending_block_hash: BlockHash,
/// Number of the last known block of the orphaned chain
pub ending_block_number: BlockNumber,
}
24 changes: 24 additions & 0 deletions crates/starknet-devnet/tests/common/background_devnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,30 @@ impl BackgroundDevnet {
}
}

pub async fn abort_blocks(
&self,
starting_block_id: &BlockId,
) -> Result<Vec<Felt>, anyhow::Error> {
let mut aborted_blocks = self
.send_custom_rpc(
"devnet_abortBlocks",
json!({ "starting_block_id" : starting_block_id }),
)
.await
.map_err(|e| anyhow::Error::msg(e.to_string()))?;

let aborted_blocks = aborted_blocks["aborted"]
.take()
.as_array()
.ok_or(anyhow::Error::msg("Invalid abort response"))?
.clone();

Ok(aborted_blocks
.into_iter()
.map(|block_hash| serde_json::from_value(block_hash).unwrap())
.collect())
}

pub async fn get_config(&self) -> serde_json::Value {
self.send_custom_rpc("devnet_getConfig", json!({})).await.unwrap()
}
Expand Down
11 changes: 11 additions & 0 deletions crates/starknet-devnet/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,17 @@ pub async fn send_binary_rpc_via_ws(
Ok(resp_body)
}

pub async fn subscribe(
ws: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
subscription_method: &str,
params: serde_json::Value,
) -> Result<i64, anyhow::Error> {
let subscription_confirmation = send_text_rpc_via_ws(ws, subscription_method, params).await?;
subscription_confirmation["result"]
.as_i64()
.ok_or(anyhow::Error::msg("Subscription did not return a numeric ID"))
}

/// Tries to read from the provided ws stream. To prevent deadlock, waits for a second at most.
pub async fn receive_rpc_via_ws(
ws: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
Expand Down
Loading