Skip to content

Commit

Permalink
feat(torii-grpc): erc tokens and balances (#2698)
Browse files Browse the repository at this point in the history
* sqlx types for erc

* add to proto

* retriefe tokens

* retrieve token balances

* finish up fetching for token balances

* fmt

* wildcard conditions

* fmt

* fmt

* add toek nbalances api to client

* chore
  • Loading branch information
Larkooo authored Nov 19, 2024
1 parent c59f186 commit 0aa330d
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 11 deletions.
27 changes: 25 additions & 2 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::proto::world::{
RetrieveEntitiesResponse, RetrieveEventsResponse, RetrieveTokenBalancesResponse,
RetrieveTokensResponse,
};
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query};
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query, Token, TokenBalance};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -85,6 +88,26 @@ impl Client {
self.metadata.read()
}

/// Retrieves tokens matching contract addresses.
pub async fn tokens(&self, contract_addresses: Vec<Felt>) -> Result<Vec<Token>, Error> {
let mut grpc_client = self.inner.write().await;
let RetrieveTokensResponse { tokens } =
grpc_client.retrieve_tokens(contract_addresses).await?;
Ok(tokens.into_iter().map(TryInto::try_into).collect::<Result<Vec<Token>, _>>()?)
}

/// Retrieves token balances for account addresses and contract addresses.
pub async fn token_balances(
&self,
account_addresses: Vec<Felt>,
contract_addresses: Vec<Felt>,
) -> Result<Vec<TokenBalance>, Error> {
let mut grpc_client = self.inner.write().await;
let RetrieveTokenBalancesResponse { balances } =
grpc_client.retrieve_token_balances(account_addresses, contract_addresses).await?;
Ok(balances.into_iter().map(TryInto::try_into).collect::<Result<Vec<TokenBalance>, _>>()?)
}

/// Retrieves entities matching query parameter.
///
/// The query param includes an optional clause for filtering. Without clause, it fetches ALL
Expand Down
22 changes: 22 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ pub struct Event {
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Token {
pub id: String,
pub contract_address: String,
pub name: String,
pub symbol: String,
pub decimals: u8,
pub metadata: String,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct TokenBalance {
pub id: String,
pub balance: String,
pub account_address: String,
pub contract_address: String,
pub token_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct Contract {
pub address: Felt,
Expand Down
15 changes: 15 additions & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ enum ComparisonOperator {
GTE = 3;
LT = 4;
LTE = 5;
}

message Token {
string contract_address = 2;
string name = 3;
string symbol = 4;
uint32 decimals = 5;
string metadata = 6;
}

message TokenBalance {
string balance = 1;
string account_address = 2;
string contract_address = 3;
string token_id = 4;
}
30 changes: 30 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,36 @@ service World {

// Subscribe to events
rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse);

// Retrieve tokens
rpc RetrieveTokens (RetrieveTokensRequest) returns (RetrieveTokensResponse);

// Retrieve token balances
rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse);
}

// A request to retrieve tokens
message RetrieveTokensRequest {
// The list of contract addresses to retrieve tokens for
repeated bytes contract_addresses = 1;
}

// A response containing tokens
message RetrieveTokensResponse {
repeated types.Token tokens = 1;
}

// A request to retrieve token balances
message RetrieveTokenBalancesRequest {
// The account addresses to retrieve balances for
repeated bytes account_addresses = 1;
// The list of token contract addresses to retrieve balances for
repeated bytes contract_addresses = 2;
}

// A response containing token balances
message RetrieveTokenBalancesResponse {
repeated types.TokenBalance balances = 1;
}

// A request to subscribe to indexer updates.
Expand Down
49 changes: 44 additions & 5 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use tonic::transport::Endpoint;

use crate::proto::world::{
world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventMessagesRequest,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest,
RetrieveEventsRequest, RetrieveEventsResponse, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest,
WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};
Expand Down Expand Up @@ -90,6 +92,43 @@ impl WorldClient {
.and_then(|metadata| metadata.try_into().map_err(Error::ParseStr))
}

pub async fn retrieve_tokens(
&mut self,
contract_addresses: Vec<Felt>,
) -> Result<RetrieveTokensResponse, Error> {
self.inner
.retrieve_tokens(RetrieveTokensRequest {
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

pub async fn retrieve_token_balances(
&mut self,
account_addresses: Vec<Felt>,
contract_addresses: Vec<Felt>,
) -> Result<RetrieveTokenBalancesResponse, Error> {
self.inner
.retrieve_token_balances(RetrieveTokenBalancesRequest {
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

pub async fn retrieve_entities(
&mut self,
query: Query,
Expand Down
140 changes: 136 additions & 4 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use torii_core::error::{Error, ParseError, QueryError};
use torii_core::model::{build_sql_query, map_row_to_ty};
use torii_core::sql::cache::ModelCache;
use torii_core::sql::utils::sql_string_to_felts;
use torii_core::types::{Token, TokenBalance};
use tower_http::cors::{AllowOrigin, CorsLayer};

use self::subscriptions::entity::EntityManager;
Expand All @@ -53,10 +54,11 @@ use crate::proto::types::member_value::ValueType;
use crate::proto::types::LogicalOperator;
use crate::proto::world::world_server::WorldServer;
use crate::proto::world::{
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse,
SubscribeIndexerRequest, SubscribeIndexerResponse, UpdateEventMessagesSubscriptionRequest,
WorldMetadataRequest, WorldMetadataResponse,
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -87,6 +89,29 @@ impl From<SchemaError> for Error {
}
}

impl From<Token> for proto::types::Token {
fn from(value: Token) -> Self {
Self {
contract_address: value.contract_address,
name: value.name,
symbol: value.symbol,
decimals: value.decimals as u32,
metadata: value.metadata,
}
}
}

impl From<TokenBalance> for proto::types::TokenBalance {
fn from(value: TokenBalance) -> Self {
Self {
balance: value.balance,
account_address: value.account_address,
contract_address: value.contract_address,
token_id: value.token_id,
}
}
}

#[derive(Debug, Clone)]
pub struct DojoWorld {
pool: Pool<Sqlite>,
Expand Down Expand Up @@ -789,6 +814,74 @@ impl DojoWorld {
})
}

async fn retrieve_tokens(
&self,
contract_addresses: Vec<Felt>,
) -> Result<RetrieveTokensResponse, Status> {
let query = if contract_addresses.is_empty() {
"SELECT * FROM tokens".to_string()
} else {
format!(
"SELECT * FROM tokens WHERE contract_address IN ({})",
contract_addresses
.iter()
.map(|address| format!("{:#x}", address))
.collect::<Vec<_>>()
.join(", ")
)
};

let tokens: Vec<Token> = sqlx::query_as(&query)
.fetch_all(&self.pool)
.await
.map_err(|e| Status::internal(e.to_string()))?;

let tokens = tokens.iter().map(|token| token.clone().into()).collect();
Ok(RetrieveTokensResponse { tokens })
}

async fn retrieve_token_balances(
&self,
account_addresses: Vec<Felt>,
contract_addresses: Vec<Felt>,
) -> Result<RetrieveTokenBalancesResponse, Status> {
let mut query = "SELECT * FROM token_balances".to_string();

let mut conditions = Vec::new();
if !account_addresses.is_empty() {
conditions.push(format!(
"account_address IN ({})",
account_addresses
.iter()
.map(|address| format!("{:#x}", address))
.collect::<Vec<_>>()
.join(", ")
));
}
if !contract_addresses.is_empty() {
conditions.push(format!(
"contract_address IN ({})",
contract_addresses
.iter()
.map(|address| format!("{:#x}", address))
.collect::<Vec<_>>()
.join(", ")
));
}

if !conditions.is_empty() {
query += &format!(" WHERE {}", conditions.join(" AND "));
}

let balances: Vec<TokenBalance> = sqlx::query_as(&query)
.fetch_all(&self.pool)
.await
.map_err(|e| Status::internal(e.to_string()))?;

let balances = balances.iter().map(|balance| balance.clone().into()).collect();
Ok(RetrieveTokenBalancesResponse { balances })
}

async fn subscribe_indexer(
&self,
contract_address: Felt,
Expand Down Expand Up @@ -1165,6 +1258,45 @@ impl proto::world::world_server::World for DojoWorld {
Ok(Response::new(WorldMetadataResponse { metadata }))
}

async fn retrieve_tokens(
&self,
request: Request<RetrieveTokensRequest>,
) -> Result<Response<RetrieveTokensResponse>, Status> {
let RetrieveTokensRequest { contract_addresses } = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let tokens = self
.retrieve_tokens(contract_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(tokens))
}

async fn retrieve_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
) -> Result<Response<RetrieveTokenBalancesResponse>, Status> {
let RetrieveTokenBalancesRequest { account_addresses, contract_addresses } =
request.into_inner();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let balances = self
.retrieve_token_balances(account_addresses, contract_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(balances))
}

async fn subscribe_indexer(
&self,
request: Request<SubscribeIndexerRequest>,
Expand Down
Loading

0 comments on commit 0aa330d

Please sign in to comment.