-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii): token balances subscription #2831
Conversation
WalkthroughOhayo, sensei! This pull request introduces a comprehensive token balance subscription system for the Torii gRPC server. The changes span multiple files, focusing on enhancing token balance management through an event-driven architecture. The implementation enables clients to subscribe to token balance updates, with new methods in the gRPC service, a dedicated token balance manager, and modifications to the executor to publish balance changes. Changes
Possibly related issues
Possibly related PRs
Suggested reviewers
Ohayo and happy coding, sensei! 🍵🥷 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (3)
52-58
: Ohayo sensei! Sending an initial empty response might need explicit confirmation.
While this ensures the client receives a subscription ID immediately, you may wish to confirm with client code that they can handle an empty balance gracefully.
139-146
: Ohayo sensei! Carefully validate address filters to prevent silent failures.
This logic will skip updates for addresses not present in sub.contract_addresses or sub.account_addresses. Consider adding callbacks or logs on skipped addresses to aid in debugging.
178-192
: Ohayo sensei! Good concurrency handling, but watch for dropped broker messages.
The poll method streams token balances from the SimpleBroker to balance_sender. If balance_sender is ever overwhelmed or not polled promptly, messages can be dropped. Monitor whether an unbounded channel suits your throughput.crates/torii/grpc/src/server/mod.rs (2)
1635-1651
: Ohayo sensei! Great job on the subscribe_token_balances method.
The code properly translates the gRPC request into addresses and sets up the subscription. Just be cautious with large lists of addresses, as it could impact performance on insertion or lookups in your manager.
1653-1670
: Ohayo sensei! updatable subscriptions look well thought out.
Remember to handle cases where the user attempts to update a subscription that does not exist or is closed. Possibly return a clearer error response if not found.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
crates/torii/core/src/executor/erc.rs
(2 hunks)crates/torii/grpc/proto/world.proto
(2 hunks)crates/torii/grpc/src/server/mod.rs
(10 hunks)crates/torii/grpc/src/server/subscriptions/mod.rs
(1 hunks)crates/torii/grpc/src/server/subscriptions/token_balance.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/torii/grpc/src/server/subscriptions/mod.rs
🔇 Additional comments (7)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (2)
164-167
: Ohayo sensei! Confirm channel closure conditions.
When sub.sender.send(...) fails, the subscriber is removed. Ensure no unintentional edge cases occur if a temporary network issue or other ephemeral condition leads to a channel error.
✅ Verification successful
Let me gather more context about how these subscriptions are managed.
Let me check one more thing about the subscription cleanup mechanism.
Ohayo sensei! The channel closure handling is working as intended!
The current implementation is correct for these reasons:
- The
closed_stream
mechanism is consistently implemented across all subscription types (events, models, token balance, etc.) - When a channel send fails, the subscriber is properly removed via
subs.remove_subscriber(id).await
- This is expected behavior for gRPC streaming, where client disconnections should lead to resource cleanup
- The channel errors here represent permanent conditions (client disconnected or stream closed) rather than temporary network issues
- Retry mechanisms in the codebase are implemented where needed (like in sync stages) but aren't applicable for subscription streams where the client has disconnected
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Searching the codebase to see how sub.sender is used elsewhere
rg -A 5 -B 5 'sub\.sender'
Length of output: 6402
Script:
#!/bin/bash
# Let's look for subscription management code
ast-grep --pattern 'struct $NAME {
$$$
sender: $TYPE,
$$$
}'
# Also check how these subscriptions are created/initialized
rg -A 5 "Subscription.*new"
Length of output: 4475
Script:
#!/bin/bash
# Let's see how subscriptions are cleaned up
rg -A 10 "closed_stream"
# And check for any retry mechanisms
rg "retry|reconnect" --type rust
Length of output: 16615
48-49
: Ohayo sensei! Consider ensuring ID uniqueness for subscribers.
Using a random u64 value as a subscription ID might result in collisions, which can lead to unexpected behavior or overwritten subscriptions in rare cases.
Here's a script to scan for references to the subscription_id and confirm no collisions:
crates/torii/core/src/executor/erc.rs (1)
18-20
: Ohayo sensei! Excellent event-driven approach with SimpleBroker.
This addition allows for consistent publication and subscription to real-time token balance updates. Just be mindful of potential performance impacts if the throughput is large.
crates/torii/grpc/src/server/mod.rs (2)
37-37
: Ohayo sensei! Adding TokenBalanceManager is a neat integration.
This new reference helps unify subscription logic in a single point. Ensure the manager is only created once and not repeatedly, to avoid duplication.
158-159
: Ohayo sensei! Token balance subscription service launched.
Spawning this subscription Service ensures updates are always processed. Confirm robust error-handling in case the task fails unexpectedly.
✅ Verification successful
Ohayo sensei! Your token balance subscription service is well-protected!
The implementation includes robust error handling:
- Errors during balance updates are properly logged:
error!(target = LOG_TARGET, error = %e, "Processing balance update.")
- Failed message deliveries are gracefully handled by removing closed streams
- Parse errors are properly propagated using
Result<(), Error>
- The service continues running even if individual updates fail
- Task failure is handled by Tokio's task system with automatic cleanup
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Searching references to token_balance::Service for potential error handling logic
rg 'token_balance::Service'
Length of output: 171
Script:
#!/bin/bash
# Let's check the implementation of token_balance::Service to see error handling
ast-grep --pattern 'impl Service {
$$$
}'
# Also check the new method implementation
ast-grep --pattern 'fn new($$$) -> $$ {
$$$
}'
# Let's also check for any error handling patterns in the module
rg -A 5 'error|Error|unwrap|expect|panic' crates/torii/grpc/src/server/subscriptions/token_balance.rs
Length of output: 35857
crates/torii/grpc/proto/world.proto (2)
37-42
: Ohayo sensei! New RPC methods for token balances look consistent.
The introduction of SubscribeTokenBalances and UpdateTokenBalancesSubscription effectively complements the rest of the subscription architecture.
59-76
: Ohayo sensei! These dedicated message types are well-structured.
clear fields for subscription_id, contract_addresses, and account_addresses. This fosters clarity and maintainability.
let token_balance: TokenBalance = sqlx::query_as(&format!( | ||
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \ | ||
token_id, balance) VALUES (?, ?, ?, ?, ?)", | ||
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *", | ||
)) | ||
.bind(id) | ||
.bind(contract_address) | ||
.bind(account_address) | ||
.bind(token_id) | ||
.bind(u256_to_sql_string(&balance)) | ||
.execute(&mut **tx) | ||
.fetch_one(&mut **tx) | ||
.await?; | ||
|
||
debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff"); | ||
SimpleBroker::publish(token_balance); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Watch out for concurrency with "INSERT OR REPLACE".
In high-concurrency environments, multiple updates could lead to race conditions where one subscriber's write might overwrite another's changes. Consider using transactions at a higher level or row-level locking.
- INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (...)
+ /* Evaluate if you need a more robust concurrency pattern, e.g. versioning or row locking */
Committable suggestion skipped: line range outside the PR's diff.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2831 +/- ##
==========================================
- Coverage 55.92% 55.72% -0.21%
==========================================
Files 439 440 +1
Lines 56156 56404 +248
==========================================
+ Hits 31408 31431 +23
- Misses 24748 24973 +225 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
crates/torii/grpc/src/server/mod.rs (2)
1068-1075
: Consider adding input validation for addresses, sensei!While the implementation is clean and follows the codebase patterns, it might be good to validate that at least one of contract_addresses or account_addresses is non-empty to avoid unnecessary subscriptions.
async fn subscribe_token_balances( &self, contract_addresses: Vec<Felt>, account_addresses: Vec<Felt>, ) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error> { + if contract_addresses.is_empty() && account_addresses.is_empty() { + return Err(Error::from(QueryError::MissingParam("At least one contract or account address required".into()))); + } self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await }
1665-1687
: The update implementation is consistent with other managers, sensei!The update_token_balances_subscription method follows the same pattern as other subscription updates in the codebase. Consider adding the same input validation as suggested for the subscribe method.
async fn update_token_balances_subscription( &self, request: Request<UpdateTokenBalancesSubscriptionRequest>, ) -> ServiceResult<()> { let UpdateTokenBalancesSubscriptionRequest { subscription_id, contract_addresses, account_addresses, } = request.into_inner(); + if contract_addresses.is_empty() && account_addresses.is_empty() { + return Err(Status::invalid_argument("At least one contract or account address required")); + } let contract_addresses = contract_addresses .iter() .map(|address| Felt::from_bytes_be_slice(address)) .collect::<Vec<_>>(); let account_addresses = account_addresses .iter() .map(|address| Felt::from_bytes_be_slice(address)) .collect::<Vec<_>>(); self.token_balance_manager .update_subscriber(subscription_id, contract_addresses, account_addresses) .await; Ok(Response::new(())) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/grpc/src/server/mod.rs
(10 hunks)
🔇 Additional comments (2)
crates/torii/grpc/src/server/mod.rs (2)
128-128
: Ohayo! The TokenBalanceManager integration looks good, sensei!
The TokenBalanceManager is properly integrated into the DojoWorld struct following the established pattern of other managers. The initialization and service spawning are implemented consistently.
Also applies to: 144-144, 163-165, 176-176
1529-1530
: Stream type definition looks perfect, sensei!
The SubscribeTokenBalancesResponseStream type is properly defined and follows the same pattern as other stream types in the codebase. The implementation includes the necessary Send trait for async operations.
Also applies to: 1540-1540
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (3)
25-35
: Consider enhancing documentation for empty set behavior, sensei!While the comments mention that empty sets mean "all" items, this important behavior should be more prominently documented. Consider adding a dedicated doc comment section explaining this filtering behavior.
#[derive(Debug)] +/// Represents a subscriber to token balance updates with filtering capabilities. +/// +/// # Filtering Behavior +/// - Empty `contract_addresses`: Matches ALL contracts +/// - Empty `account_addresses`: Matches ALL accounts +/// - Non-empty sets: Only matches addresses present in the respective sets pub struct TokenBalanceSubscriber {
42-67
: Consider adding collision handling for subscription IDs, sensei!The random ID generation could theoretically result in collisions. Consider implementing a retry mechanism or using a guaranteed unique ID generation approach.
- let subscription_id = rand::thread_rng().gen::<u64>(); + let subscription_id = loop { + let id = rand::thread_rng().gen::<u64>(); + if !self.subscribers.read().await.contains_key(&id) { + break id; + } + };
179-193
: Consider implementing error recovery strategy, sensei!The Future implementation only logs errors when sending balance updates. Consider implementing a retry mechanism or error recovery strategy to handle temporary failures.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.get_mut(); while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.balance_sender.send(balance) { - error!(target = LOG_TARGET, error = %e, "Sending balance update to processor."); + let mut retry_count = 0; + while let Err(e) = this.balance_sender.send(balance.clone()) { + if retry_count >= 3 { + error!(target = LOG_TARGET, error = %e, "Failed to send balance update after retries."); + break; + } + retry_count += 1; + std::thread::sleep(std::time::Duration::from_millis(100)); } }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/grpc/src/server/subscriptions/token_balance.rs
(1 hunks)
🔇 Additional comments (1)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (1)
1-24
: Ohayo! The imports and constants look well-organized, sensei!
The selection of dependencies shows good consideration for async operations, thread safety, and error handling.
pub async fn update_subscriber( | ||
&self, | ||
id: u64, | ||
contract_addresses: Vec<Felt>, | ||
account_addresses: Vec<Felt>, | ||
) { | ||
let sender = { | ||
let subscribers = self.subscribers.read().await; | ||
if let Some(subscriber) = subscribers.get(&id) { | ||
subscriber.sender.clone() | ||
} else { | ||
return; // Subscriber not found, exit early | ||
} | ||
}; | ||
|
||
self.subscribers.write().await.insert( | ||
id, | ||
TokenBalanceSubscriber { | ||
contract_addresses: contract_addresses.into_iter().collect(), | ||
account_addresses: account_addresses.into_iter().collect(), | ||
sender, | ||
}, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize sender handling in update_subscriber, sensei!
The current implementation acquires two locks and unnecessarily clones the sender. Consider refactoring to use a single write lock.
pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) {
- let sender = {
- let subscribers = self.subscribers.read().await;
- if let Some(subscriber) = subscribers.get(&id) {
- subscriber.sender.clone()
- } else {
- return; // Subscriber not found, exit early
- }
- };
-
- self.subscribers.write().await.insert(
- id,
- TokenBalanceSubscriber {
- contract_addresses: contract_addresses.into_iter().collect(),
- account_addresses: account_addresses.into_iter().collect(),
- sender,
- },
- );
+ let mut subscribers = self.subscribers.write().await;
+ if let Some(subscriber) = subscribers.get_mut(&id) {
+ subscriber.contract_addresses = contract_addresses.into_iter().collect();
+ subscriber.account_addresses = account_addresses.into_iter().collect();
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn update_subscriber( | |
&self, | |
id: u64, | |
contract_addresses: Vec<Felt>, | |
account_addresses: Vec<Felt>, | |
) { | |
let sender = { | |
let subscribers = self.subscribers.read().await; | |
if let Some(subscriber) = subscribers.get(&id) { | |
subscriber.sender.clone() | |
} else { | |
return; // Subscriber not found, exit early | |
} | |
}; | |
self.subscribers.write().await.insert( | |
id, | |
TokenBalanceSubscriber { | |
contract_addresses: contract_addresses.into_iter().collect(), | |
account_addresses: account_addresses.into_iter().collect(), | |
sender, | |
}, | |
); | |
} | |
pub async fn update_subscriber( | |
&self, | |
id: u64, | |
contract_addresses: Vec<Felt>, | |
account_addresses: Vec<Felt>, | |
) { | |
let mut subscribers = self.subscribers.write().await; | |
if let Some(subscriber) = subscribers.get_mut(&id) { | |
subscriber.contract_addresses = contract_addresses.into_iter().collect(); | |
subscriber.account_addresses = account_addresses.into_iter().collect(); | |
} | |
} |
async fn process_balance_update( | ||
subs: &Arc<TokenBalanceManager>, | ||
balance: &TokenBalance, | ||
) -> Result<(), Error> { | ||
let mut closed_stream = Vec::new(); | ||
|
||
for (idx, sub) in subs.subscribers.read().await.iter() { | ||
let contract_address = | ||
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?; | ||
let account_address = | ||
Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?; | ||
|
||
// Skip if contract address filter doesn't match | ||
if !sub.contract_addresses.is_empty() | ||
&& !sub.contract_addresses.contains(&contract_address) | ||
{ | ||
continue; | ||
} | ||
|
||
// Skip if account address filter doesn't match | ||
if !sub.account_addresses.is_empty() | ||
&& !sub.account_addresses.contains(&account_address) | ||
{ | ||
continue; | ||
} | ||
|
||
let resp = SubscribeTokenBalancesResponse { | ||
subscription_id: *idx, | ||
balance: Some(proto::types::TokenBalance { | ||
contract_address: balance.contract_address.clone(), | ||
account_address: balance.account_address.clone(), | ||
token_id: balance.token_id.clone(), | ||
balance: balance.balance.clone(), | ||
}), | ||
}; | ||
|
||
if sub.sender.send(Ok(resp)).await.is_err() { | ||
closed_stream.push(*idx); | ||
} | ||
} | ||
|
||
for id in closed_stream { | ||
trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); | ||
subs.remove_subscriber(id).await | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize balance update processing performance, sensei!
The current implementation parses addresses from strings for each subscriber. Consider parsing once before the loop.
async fn process_balance_update(
subs: &Arc<TokenBalanceManager>,
balance: &TokenBalance,
) -> Result<(), Error> {
+ let contract_address = Felt::from_str(&balance.contract_address)
+ .map_err(ParseError::FromStr)?;
+ let account_address = Felt::from_str(&balance.account_address)
+ .map_err(ParseError::FromStr)?;
+
let mut closed_stream = Vec::new();
for (idx, sub) in subs.subscribers.read().await.iter() {
- let contract_address =
- Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;
- let account_address =
- Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn process_balance_update( | |
subs: &Arc<TokenBalanceManager>, | |
balance: &TokenBalance, | |
) -> Result<(), Error> { | |
let mut closed_stream = Vec::new(); | |
for (idx, sub) in subs.subscribers.read().await.iter() { | |
let contract_address = | |
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?; | |
let account_address = | |
Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?; | |
// Skip if contract address filter doesn't match | |
if !sub.contract_addresses.is_empty() | |
&& !sub.contract_addresses.contains(&contract_address) | |
{ | |
continue; | |
} | |
// Skip if account address filter doesn't match | |
if !sub.account_addresses.is_empty() | |
&& !sub.account_addresses.contains(&account_address) | |
{ | |
continue; | |
} | |
let resp = SubscribeTokenBalancesResponse { | |
subscription_id: *idx, | |
balance: Some(proto::types::TokenBalance { | |
contract_address: balance.contract_address.clone(), | |
account_address: balance.account_address.clone(), | |
token_id: balance.token_id.clone(), | |
balance: balance.balance.clone(), | |
}), | |
}; | |
if sub.sender.send(Ok(resp)).await.is_err() { | |
closed_stream.push(*idx); | |
} | |
} | |
for id in closed_stream { | |
trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); | |
subs.remove_subscriber(id).await | |
} | |
Ok(()) | |
} | |
async fn process_balance_update( | |
subs: &Arc<TokenBalanceManager>, | |
balance: &TokenBalance, | |
) -> Result<(), Error> { | |
let contract_address = Felt::from_str(&balance.contract_address) | |
.map_err(ParseError::FromStr)?; | |
let account_address = Felt::from_str(&balance.account_address) | |
.map_err(ParseError::FromStr)?; | |
let mut closed_stream = Vec::new(); | |
for (idx, sub) in subs.subscribers.read().await.iter() { | |
// Skip if contract address filter doesn't match | |
if !sub.contract_addresses.is_empty() | |
&& !sub.contract_addresses.contains(&contract_address) | |
{ | |
continue; | |
} | |
// Skip if account address filter doesn't match | |
if !sub.account_addresses.is_empty() | |
&& !sub.account_addresses.contains(&account_address) | |
{ | |
continue; | |
} | |
let resp = SubscribeTokenBalancesResponse { | |
subscription_id: *idx, | |
balance: Some(proto::types::TokenBalance { | |
contract_address: balance.contract_address.clone(), | |
account_address: balance.account_address.clone(), | |
token_id: balance.token_id.clone(), | |
balance: balance.balance.clone(), | |
}), | |
}; | |
if sub.sender.send(Ok(resp)).await.is_err() { | |
closed_stream.push(*idx); | |
} | |
} | |
for id in closed_stream { | |
trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); | |
subs.remove_subscriber(id).await | |
} | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
crates/torii/client/src/client/mod.rs (1)
213-226
: Consider adding error handling for empty balance updates.Ohayo, sensei! The implementation looks good, but there's one potential improvement. The method should handle cases where the balance update stream might return empty or invalid responses.
Consider adding error handling:
pub async fn on_token_balance_updated( &self, contract_addresses: Vec<Felt>, account_addresses: Vec<Felt>, ) -> Result<TokenBalanceStreaming, Error> { let mut grpc_client = self.inner.write().await; + if contract_addresses.is_empty() && account_addresses.is_empty() { + log::warn!("Subscribing to all token balance updates - this might be resource-intensive"); + } let stream = grpc_client .subscribe_token_balances(contract_addresses, account_addresses) .await?; Ok(stream) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/torii/client/src/client/mod.rs
(2 hunks)crates/torii/grpc/src/client.rs
(2 hunks)
🔇 Additional comments (5)
crates/torii/client/src/client/mod.rs (2)
13-13
: LGTM! Clean import addition.
The import of TokenBalanceStreaming
aligns well with the new token balance subscription functionality.
228-238
: LGTM! Clean implementation of subscription updates.
The update method follows the same pattern as other subscription update methods in the codebase, maintaining consistency.
crates/torii/grpc/src/client.rs (3)
19-26
: LGTM! Clean import organization.
The imports are well-organized and include all necessary types for token balance functionality.
329-352
: LGTM! Clean implementation of subscription updates.
The update method follows the established patterns and properly handles Felt conversions.
355-370
: LGTM! Clean stream implementation.
The token balance streaming implementation follows the same pattern as other streaming types in the codebase, maintaining consistency.
/// Subscribe to token balances. | ||
pub async fn subscribe_token_balances( | ||
&mut self, | ||
contract_addresses: Vec<Felt>, | ||
account_addresses: Vec<Felt>, | ||
) -> Result<TokenBalanceStreaming, Error> { | ||
let request = RetrieveTokenBalancesRequest { | ||
contract_addresses: contract_addresses | ||
.into_iter() | ||
.map(|c| c.to_bytes_be().to_vec()) | ||
.collect(), | ||
account_addresses: account_addresses | ||
.into_iter() | ||
.map(|a| a.to_bytes_be().to_vec()) | ||
.collect(), | ||
}; | ||
let stream = self | ||
.inner | ||
.subscribe_token_balances(request) | ||
.await | ||
.map_err(Error::Grpc) | ||
.map(|res| res.into_inner())?; | ||
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| { | ||
(res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize")) | ||
})))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding validation for the unwrap operation.
Ohayo, sensei! While the implementation is generally good, the unwrap operation on line 325 could be made more robust.
Consider adding proper error handling:
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
- (res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
+ (
+ res.subscription_id,
+ res.balance
+ .ok_or_else(|| Error::Schema(SchemaError::MissingExpectedData("balance".to_string())))
+ .and_then(|b| b.try_into())
+ .expect("Failed to deserialize balance")
+ )
}))))
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
crates/torii/client/src/client/mod.rs (2)
215-228
: Ohayo! The subscription method implementation looks solid, sensei!The implementation follows the established pattern and handles the gRPC client lock appropriately. Consider enhancing the documentation with an example usage to make it even more developer-friendly.
Add a usage example to the documentation:
/// Subscribes to token balances updates. /// If no contract addresses are provided, it will subscribe to updates for all contract /// addresses. If no account addresses are provided, it will subscribe to updates for all /// account addresses. + /// + /// # Example + /// ```rust + /// let stream = client + /// .on_token_balance_updated( + /// vec![contract_address], + /// vec![account_address] + /// ) + /// .await?; + /// ```
230-246
: Documentation needs enhancement, sensei!While the implementation is solid, the method would benefit from more detailed documentation explaining:
- The purpose and behavior of the subscription_id parameter
- What happens when empty vectors are provided
- Any potential error conditions
Add comprehensive documentation:
/// Update the token balances subscription + /// + /// Updates an existing token balance subscription identified by the subscription_id. + /// Similar to on_token_balance_updated, empty contract_addresses or account_addresses + /// vectors will subscribe to all updates for that category. + /// + /// # Arguments + /// * `subscription_id` - The ID of the existing subscription to update + /// * `contract_addresses` - New list of contract addresses to monitor + /// * `account_addresses` - New list of account addresses to monitor + /// + /// # Errors + /// Returns an error if the subscription_id is invalid or if the gRPC call fails
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/client/src/client/mod.rs
(2 hunks)
🔇 Additional comments (2)
crates/torii/client/src/client/mod.rs (2)
13-15
: Ohayo! Import changes look good, sensei!
The TokenBalanceStreaming type is appropriately grouped with other streaming types from the same module.
214-246
: Verify integration with gRPC server, sensei!
Let's ensure the subscription management is properly integrated with the gRPC server implementation.
✅ Verification successful
Ohayo! The token balance subscription implementation is properly integrated, sensei!
The verification shows that the implementation is correctly integrated with the gRPC server:
- The proto definition in
world.proto
properly defines theSubscribeTokenBalances
andUpdateTokenBalancesSubscription
RPCs - The server implementation in
crates/torii/grpc/src/server/mod.rs
correctly implements these RPCs - The subscription management is handled in
crates/torii/grpc/src/server/subscriptions/token_balance.rs
- The client implementation in
crates/torii/client/src/client/mod.rs
properly connects to these endpoints
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the corresponding gRPC server implementation
# Check for the corresponding gRPC service definition
rg -l "service.*\{.*TokenBalance.*\}" -g "*.proto"
# Check for the server-side implementation of these methods
rg -l "impl.*World.*for.*Server" -g "*.rs"
Length of output: 97
Script:
#!/bin/bash
# Let's try a broader search for proto definitions and implementations
# Search for proto files
fd -e proto -x cat {}
# Search for token balance related implementations in Rust files
rg -l "token_balance" -g "*.rs"
# Search specifically for subscription-related code
rg "subscribe.*token.*balance" -g "*.rs" -i
Length of output: 31414
#2819
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation
Chores