Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): token balances subscription #2831

Merged
merged 5 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::types::ContractType;
use crate::types::{ContractType, TokenBalance};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -159,18 +160,21 @@
}

// write the new balance to the database
sqlx::query(&format!(
let token_balance: TokenBalance = sqlx::query_as(&format!(

Check warning on line 163 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L163

Added line #L163 was not covered by tests
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \
token_id, balance) VALUES (?, ?, ?, ?, ?)",
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *",

Check warning on line 165 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L165

Added line #L165 was not covered by tests
))
.bind(id)
.bind(contract_address)
.bind(account_address)
.bind(token_id)
.bind(u256_to_sql_string(&balance))
.execute(&mut **tx)
.fetch_one(&mut **tx)

Check warning on line 172 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L172

Added line #L172 was not covered by tests
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(token_balance);

Check warning on line 177 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L175-L177

Added lines #L175 - L177 were not covered by tests
Comment on lines +163 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Watch out for concurrency with "INSERT OR REPLACE".
In high-concurrency environments, multiple updates could lead to race conditions where one subscriber's write might overwrite another's changes. Consider using transactions at a higher level or row-level locking.

- INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (...)
+ /* Evaluate if you need a more robust concurrency pattern, e.g. versioning or row locking */

Committable suggestion skipped: line range outside the PR's diff.

Ok(())
}

Expand Down
24 changes: 24 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ service World {
// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token balance updates.
rpc SubscribeTokenBalances (RetrieveTokenBalancesRequest) returns (stream SubscribeTokenBalancesResponse);

// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand All @@ -50,6 +56,24 @@ service World {
rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse);
}

// A request to update a token balance subscription
message UpdateTokenBalancesSubscriptionRequest {
// The subscription ID
uint64 subscription_id = 1;
// The list of contract addresses to subscribe to
repeated bytes contract_addresses = 2;
// The list of account addresses to subscribe to
repeated bytes account_addresses = 3;
}

// A response containing token balances
message SubscribeTokenBalancesResponse {
// The subscription ID
uint64 subscription_id = 1;
// The token balance
types.TokenBalance balance = 2;
}

// A request to retrieve tokens
message RetrieveTokensRequest {
// The list of contract addresses to retrieve tokens for
Expand Down
59 changes: 54 additions & 5 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use starknet::providers::JsonRpcClient;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token_balance::TokenBalanceManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Receiver};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
Expand All @@ -55,11 +56,7 @@
use crate::proto::types::LogicalOperator;
use crate::proto::world::world_server::WorldServer;
use crate::proto::world::{
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -123,6 +120,7 @@
event_manager: Arc<EventManager>,
state_diff_manager: Arc<StateDiffManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
}

impl DojoWorld {
Expand All @@ -138,6 +136,7 @@
let event_manager = Arc::new(EventManager::default());
let state_diff_manager = Arc::new(StateDiffManager::default());
let indexer_manager = Arc::new(IndexerManager::default());
let token_balance_manager = Arc::new(TokenBalanceManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
block_rx,
Expand All @@ -156,6 +155,8 @@

tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager)));

tokio::task::spawn(subscriptions::token_balance::Service::new(Arc::clone(&token_balance_manager)));

Self {
pool,
world_address,
Expand All @@ -165,6 +166,7 @@
event_manager,
state_diff_manager,
indexer_manager,
token_balance_manager,
}
}
}
Expand Down Expand Up @@ -1056,6 +1058,14 @@
Ok(RetrieveTokenBalancesResponse { balances })
}

async fn subscribe_token_balances(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error> {
self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await
}

Check warning on line 1067 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1061-L1067

Added lines #L1061 - L1067 were not covered by tests

async fn subscribe_indexer(
&self,
contract_address: Felt,
Expand Down Expand Up @@ -1508,6 +1518,8 @@
Pin<Box<dyn Stream<Item = Result<SubscribeIndexerResponse, Status>> + Send>>;
type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1517,6 +1529,7 @@
type SubscribeEventsStream = SubscribeEventsResponseStream;
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -1619,6 +1632,42 @@
Ok(Response::new(()))
}

async fn subscribe_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
) -> ServiceResult<Self::SubscribeTokenBalancesStream> {
let RetrieveTokenBalancesRequest { contract_addresses, account_addresses } = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let rx = self.subscribe_token_balances(contract_addresses, account_addresses).await.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream))
}

Check warning on line 1651 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1638-L1651

Added lines #L1638 - L1651 were not covered by tests

async fn update_token_balances_subscription(
&self,
request: Request<UpdateTokenBalancesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenBalancesSubscriptionRequest { subscription_id, contract_addresses, account_addresses } = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

self.token_balance_manager.update_subscriber(subscription_id, contract_addresses, account_addresses).await;
Ok(Response::new(()))
}

Check warning on line 1669 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1656-L1669

Added lines #L1656 - L1669 were not covered by tests

async fn retrieve_entities(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
Loading
Loading