Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tps metric to contracts table #2468

Merged
merged 16 commits into from
Oct 8, 2024
28 changes: 24 additions & 4 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head().await?;
if head == 0 {
self.db.set_head(self.config.start_block)?;
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand Down Expand Up @@ -389,6 +389,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

let timestamp = data.pending_block.timestamp;

let mut world_txns_count = 0;
for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = last_pending_block_tx_cursor {
Expand All @@ -409,7 +410,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(
data.block_number - 1,
timestamp,
world_txns_count,
self.world.address,
)
.await?;
if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
}
Expand All @@ -430,6 +438,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}
Ok(true) => {
world_txns_count += 1;
last_pending_block_world_tx = Some(*transaction_hash);
last_pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
Expand All @@ -446,7 +455,9 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1)?;
self.db
.set_head(data.block_number - 1, timestamp, world_txns_count, self.world.address)
.await?;

if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx))?;
Expand All @@ -466,6 +477,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<EngineHead> {
// Process all transactions
let mut last_block = 0;
let transactions_count = data.transactions.len();
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
Expand Down Expand Up @@ -498,7 +510,15 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// Process parallelized events
self.process_tasks().await?;

self.db.set_head(data.latest_block_number)?;
let last_block_timestamp = self.get_block_timestamp(data.latest_block_number).await?;
self.db
.set_head(
data.latest_block_number,
last_block_timestamp,
transactions_count as u64,
self.world.address,
)
.await?;
self.db.set_last_pending_block_world_tx(None)?;
self.db.set_last_pending_block_tx(None)?;

Expand Down
40 changes: 39 additions & 1 deletion crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
IndexerUpdate, Model as ModelRegistered,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand All @@ -31,6 +31,7 @@ pub enum Argument {

#[derive(Debug, Clone)]
pub enum BrokerMessage {
SetHead(IndexerUpdate),
ModelRegistered(ModelRegistered),
EntityUpdated(EntityUpdated),
EventMessageUpdated(EventMessageUpdated),
Expand All @@ -45,8 +46,17 @@ pub struct DeleteEntityQuery {
pub ty: Ty,
}

#[derive(Debug, Clone)]
pub struct SetHeadQuery {
pub head: u64,
pub last_block_timestamp: u64,
pub txns_count: u64,
pub contract_address: Felt,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetHead(SetHeadQuery),
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
Expand Down Expand Up @@ -178,6 +188,33 @@ impl<'c> Executor<'c> {
let tx = &mut self.transaction;

match query_type {
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
Comment on lines +191 to +199
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Watch out for that sneaky i64 to u64 conversion, sensei!

The conversion from i64 to u64 for previous_block_timestamp could fail if the value is negative. Consider using a safe conversion method or handling potential errors. Here's a suggestion:

let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
    "SELECT last_block_timestamp FROM contracts WHERE id = ?"
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp is negative or doesn't fit in u64"))?;

This approach will provide a more informative error message if the conversion fails.


let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
Comment on lines +191 to +205
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Potential underflow and division by zero in TPS calculation

Sensei, in the calculation of tps, subtracting previous_block_timestamp from set_head.last_block_timestamp without checking for underflow may lead to incorrect results or panic in debug mode. If set_head.last_block_timestamp is less than previous_block_timestamp, the subtraction will underflow. Consider using checked_sub to safely handle this scenario.

Apply this diff to prevent underflow:

+let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
+let tps: u64 = if time_diff != 0 {
+    set_head.txns_count / time_diff
+} else {
+    set_head.txns_count
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
let tps: u64 = if time_diff != 0 {
set_head.txns_count / time_diff
} else {
set_head.txns_count
};

Comment on lines +201 to +205
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Let's make that TPS calculation more robust, sensei!

The current TPS calculation might not handle edge cases well. Consider using a more precise calculation method that avoids potential issues with integer division. Here's a suggestion:

let time_diff = set_head.last_block_timestamp.saturating_sub(previous_block_timestamp);
let tps = if time_diff > 0 {
    (set_head.txns_count as f64 / time_diff as f64).round() as u64
} else {
    0 // or another appropriate default value
};

This approach uses floating-point division for more precise results and handles the case where time_diff is zero or when set_head.last_block_timestamp is less than previous_block_timestamp.


query.execute(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;

self.publish_queue.push(BrokerMessage::SetHead(IndexerUpdate {
head: set_head.head,
tps,
last_block_timestamp: set_head.last_block_timestamp,
contract_address: set_head.contract_address,
}));
}
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down Expand Up @@ -289,6 +326,7 @@ impl<'c> Executor<'c> {

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::SetHead(update) => SimpleBroker::publish(update),
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
Expand Down
33 changes: 24 additions & 9 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use starknet_crypto::poseidon_hash_many;
use tokio::sync::mpsc::UnboundedSender;

use crate::cache::{Model, ModelCache};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType, SetHeadQuery};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
Expand Down Expand Up @@ -86,17 +86,32 @@ impl Sql {
))
}

pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(
pub async fn set_head(
&mut self,
head: u64,
last_block_timestamp: u64,
world_txns_count: u64,
contract_address: Felt,
) -> Result<()> {
let head_arg = Argument::Int(
head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?,
);
let last_block_timestamp_arg =
Argument::Int(last_block_timestamp.try_into().map_err(|_| {
anyhow!("Last block timestamp value {} doesn't fit in i64", last_block_timestamp)
})?);
let id = Argument::FieldElement(self.world_address);
self.executor
.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
))
.map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;

self.executor.send(QueryMessage::new(
"UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(),
vec![head_arg, last_block_timestamp_arg, id],
Comment on lines +106 to +107
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Should world_txns_count and contract_address be included in the SQL update?

Currently, the SQL statement only updates head and last_block_timestamp in the contracts table. If world_txns_count and contract_address need to be persisted in the database, consider including them in the update query.

QueryType::SetHead(SetHeadQuery {
head,
last_block_timestamp,
txns_count: world_txns_count,
contract_address,
}),
))?;

Ok(())
}
Expand Down
8 changes: 8 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ pub struct Event {
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone)]
pub struct IndexerUpdate {
pub head: u64,
pub tps: u64,
pub last_block_timestamp: u64,
pub contract_address: Felt,
}
21 changes: 18 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import "google/protobuf/empty.proto";

// The World service provides information about the world.
service World {
// Subscribes to updates about the indexer. Like the head block number, tps, etc.
rpc SubscribeIndexer (SubscribeIndexerRequest) returns (stream SubscribeIndexerResponse);

// Retrieves metadata about the World including all the registered components and systems.
rpc WorldMetadata (MetadataRequest) returns (MetadataResponse);
rpc WorldMetadata (WorldMetadataRequest) returns (WorldMetadataResponse);

// Subscribes to models updates.
rpc SubscribeModels (SubscribeModelsRequest) returns (stream SubscribeModelsResponse);
Expand Down Expand Up @@ -38,14 +41,26 @@ service World {
rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse);
}

// A request to subscribe to indexer updates.
message SubscribeIndexerRequest {
bytes contract_address = 1;
}

// A response containing indexer updates.
message SubscribeIndexerResponse {
uint64 head = 1;
uint64 tps = 2;
uint64 last_block_timestamp = 3;
bytes contract_address = 4;
}

// A request to retrieve metadata for a specific world ID.
message MetadataRequest {
message WorldMetadataRequest {

}

// The metadata response contains addresses and class hashes for the world.
message MetadataResponse {
message WorldMetadataResponse {
types.WorldMetadata metadata = 1;
}

Expand Down
49 changes: 43 additions & 6 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate};
use tonic::transport::Endpoint;

use crate::proto::world::{
world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest,
RetrieveEventsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -68,7 +71,7 @@ impl WorldClient {
/// Retrieve the metadata of the World.
pub async fn metadata(&mut self) -> Result<dojo_types::WorldMetadata, Error> {
self.inner
.world_metadata(MetadataRequest {})
.world_metadata(WorldMetadataRequest {})
.await
.map_err(Error::Grpc)
.and_then(|res| {
Expand Down Expand Up @@ -107,6 +110,22 @@ impl WorldClient {
self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner())
}

/// Subscribe to indexer updates.
pub async fn subscribe_indexer(
&mut self,
contract_address: Felt,
) -> Result<IndexerUpdateStreaming, Error> {
let request =
SubscribeIndexerRequest { contract_address: contract_address.to_bytes_be().to_vec() };
let stream = self
.inner
.subscribe_indexer(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(IndexerUpdateStreaming(stream.map_ok(Box::new(|res| res.into()))))
}

/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
Expand Down Expand Up @@ -282,6 +301,24 @@ impl Stream for EventUpdateStreaming {
}
}

type IndexerMappedStream = MapOk<
tonic::Streaming<SubscribeIndexerResponse>,
Box<dyn Fn(SubscribeIndexerResponse) -> IndexerUpdate + Send>,
>;

#[derive(Debug)]
pub struct IndexerUpdateStreaming(IndexerMappedStream);

impl Stream for IndexerUpdateStreaming {
type Item = <IndexerMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

fn empty_state_update() -> StateUpdate {
StateUpdate {
block_hash: Felt::ZERO,
Expand Down
Loading
Loading