-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add tps metric to contracts table #2468
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2468 +/- ##
==========================================
- Coverage 68.96% 68.80% -0.17%
==========================================
Files 372 377 +5
Lines 48568 48883 +315
==========================================
+ Hits 33496 33635 +139
- Misses 15072 15248 +176 ☔ View full report in Codecov by Sentry. |
WalkthroughOhayo, sensei! This pull request introduces several enhancements across multiple files related to blockchain management and gRPC functionalities. Key changes include the addition of a new enum variant and struct for handling blockchain head updates in the Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
🧰 Additional context used🔇 Additional comments (2)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (7)
crates/torii/migrations/20240923155431_last_block_timestamp.sql (1)
1-2
: Ohayo, SQL sensei! Your migration looks solid, but let's level up!The addition of the
last_block_timestamp
column is a great move for TPS calculations. Here are some thoughts to make it even more sugoi:
- Consider adding a NOT NULL constraint with a DEFAULT value. This ensures data integrity for existing and new rows.
- If you need precise timing, a TIMESTAMP type might be more appropriate than INTEGER.
- Depending on your query patterns, an index on this column could boost performance if it's used in WHERE clauses or joins.
What do you think, sensei? Want to power up this migration even further?
Here's a potential power-up for your consideration:
-- Add last_block_timestamp column for TPS calculation ALTER TABLE contracts ADD COLUMN last_block_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP; CREATE INDEX idx_contracts_last_block_timestamp ON contracts(last_block_timestamp);This version uses TIMESTAMP, adds a NOT NULL constraint with a default value, and creates an index for potential performance benefits.
crates/torii/core/src/types.rs (1)
88-94
: Ohayo, sensei! The newIndexerUpdate
struct looks good!The addition of the
IndexerUpdate
struct aligns well with the PR objective of adding a TPS metric to the contracts table. The fields chosen are appropriate for tracking indexer updates.A few suggestions to consider:
- If this struct will be used in database operations or API responses, you might want to derive
Serialize
,Deserialize
, andFromRow
traits.- Consider adding documentation comments to explain the purpose of each field, especially
tps
andhead
.Would you like me to provide an example of how to add these traits and documentation, sensei?
crates/torii/grpc/proto/world.proto (1)
49-55
: Ohayo, sensei! This message is packed with goodies!The
SubscribeIndexerResponse
message is well-structured and provides comprehensive indexer status information. Great job including thetps
(transactions per second) field - it's a valuable addition that wasn't explicitly mentioned in the PR objectives.Consider adding a brief comment for each field to explain their purpose, especially for
head
andtps
, as their meanings might not be immediately clear to all developers.crates/torii/grpc/src/types/mod.rs (2)
19-25
: Ohayo, sensei! TheIndexerUpdate
struct looks great!The new
IndexerUpdate
struct is well-designed and contains all the necessary fields for representing indexer updates. The choice of types for each field is appropriate, and the derived traits provide useful functionality.Consider adding a brief doc comment to explain the purpose of this struct and possibly describe each field. For example:
/// Represents an update from an indexer. /// /// # Fields /// * `head` - The current head block number. /// * `tps` - Transactions per second. /// * `last_block_timestamp` - Timestamp of the last processed block. /// * `contract_address` - The address of the contract being indexed. #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct IndexerUpdate { // ... existing fields ... }This will help other developers understand the purpose and usage of this struct at a glance.
27-36
: Ohayo again, sensei! TheFrom
implementation looks solid!The implementation for converting
proto::world::SubscribeIndexerResponse
toIndexerUpdate
is straightforward and correct. It properly maps all fields and handles the conversion ofcontract_address
from bytes to aFelt
.Consider adding error handling for the
contract_address
conversion. While it's likely that the input will always be valid, it's generally a good practice to handle potential errors. You could useTryFrom
instead ofFrom
to allow for error handling. Here's an example:impl TryFrom<proto::world::SubscribeIndexerResponse> for IndexerUpdate { type Error = &'static str; fn try_from(value: proto::world::SubscribeIndexerResponse) -> Result<Self, Self::Error> { Ok(Self { head: value.head, tps: value.tps, last_block_timestamp: value.last_block_timestamp, contract_address: Felt::from_bytes_be_slice(&value.contract_address) .map_err(|_| "Invalid contract address")?, }) } }This approach allows for more robust error handling and makes it clear that the conversion might fail.
crates/torii/grpc/src/server/subscriptions/indexer.rs (1)
44-46
: Clarify Comment Regarding Unlock Issue with BrowsersOhayo sensei! The comment
// NOTE: unlock issue with firefox/safari
lacks context, making it difficult for others to understand its significance.Please expand the comment to explain the specific issue with Firefox and Safari browsers and how sending an initial empty response resolves it. For example:
// NOTE: Sending an initial response to prevent connection hang in Firefox and Safari. // These browsers expect an immediate response to establish the stream properly.This clarification aids in maintaining the code by providing necessary background.
crates/torii/core/src/sql.rs (1)
89-89
: Ohayo, sensei! Is theasync
keyword necessary here?The
set_head
function is marked asasync
, but there are noawait
points within the function. If asynchronous behavior isn't required, consider removingasync
to avoid unnecessary overhead.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (11)
- crates/torii/core/src/engine.rs (7 hunks)
- crates/torii/core/src/executor.rs (5 hunks)
- crates/torii/core/src/sql.rs (2 hunks)
- crates/torii/core/src/types.rs (1 hunks)
- crates/torii/grpc/proto/world.proto (2 hunks)
- crates/torii/grpc/src/client.rs (4 hunks)
- crates/torii/grpc/src/server/mod.rs (9 hunks)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
- crates/torii/grpc/src/server/subscriptions/mod.rs (1 hunks)
- crates/torii/grpc/src/types/mod.rs (1 hunks)
- crates/torii/migrations/20240923155431_last_block_timestamp.sql (1 hunks)
🔇 Additional comments (34)
crates/torii/grpc/src/server/subscriptions/mod.rs (1)
5-5
: Ohayo, sensei! The new module looks good!The addition of the
indexer
module is clean and consistent with the existing style. It's likely related to the tps metric mentioned in the PR objective.Could you enlighten this humble reviewer about the contents of the new
indexer
module, sensei? It would be most helpful to understand how it relates to the tps metric for the contracts table.✅ Verification successful
Ohayo, sensei! The
indexer
module has been successfully verified and aligns perfectly with the PR objectives. No issues found.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check the contents of the new indexer module cat crates/torii/grpc/src/server/subscriptions/indexer.rsLength of output: 3966
crates/torii/grpc/proto/world.proto (4)
10-11
: Ohayo, sensei! New RPC method looks sugoi!The addition of the
SubscribeIndexer
method is a great improvement. It allows clients to receive real-time updates about the indexer, which is super useful for monitoring purposes.
44-47
: Sugoi message definition, sensei!The
SubscribeIndexerRequest
message looks well-defined. Usingbytes
for thecontract_address
is a smart choice, as it provides flexibility for different address formats.
58-63
: Konnichiwa again, sensei! Nice consistency in renaming.The renaming of
MetadataRequest
andMetadataResponse
toWorldMetadataRequest
andWorldMetadataResponse
respectively is consistent with the earlier RPC method change. This improves the overall clarity of the protocol.
14-14
: Konnichiwa, sensei! Nice rename, but let's double-check something.The update to
WorldMetadata
method signature is a great improvement for clarity. It now clearly indicates that it's specific to World metadata.Could you please verify that the
MetadataRequest
andMetadataResponse
messages have been renamed toWorldMetadataRequest
andWorldMetadataResponse
respectively? Here's a script to help:✅ Verification successful
ohayo, sensei! It looks like the previous search didn't recognize
.proto
files. Let's try again by specifically targeting.proto
files:
ohayo, sensei! The renaming of
MetadataRequest
andMetadataResponse
toWorldMetadataRequest
andWorldMetadataResponse
incrates/torii/grpc/proto/world.proto
has been successfully verified.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the renaming of metadata messages # Test: Search for old and new message names echo "Checking for old message names:" rg --type proto 'message MetadataRequest|message MetadataResponse' echo "Checking for new message names:" rg --type proto 'message WorldMetadataRequest|message WorldMetadataResponse'Length of output: 353
Script:
#!/bin/bash # Description: Verify the renaming of metadata messages by searching .proto files echo "Checking for old message names in .proto files:" rg -g '*.proto' 'message MetadataRequest|message MetadataResponse' echo "Checking for new message names in .proto files:" rg -g '*.proto' 'message WorldMetadataRequest|message WorldMetadataResponse'Length of output: 492
crates/torii/grpc/src/client.rs (5)
11-15
: Well-placed imports for indexer subscription functionalityOhayo, sensei! The new imports including
SubscribeIndexerRequest
andSubscribeIndexerResponse
are correctly added, ensuring the necessary types are available for the indexer subscription features.
18-20
: Correct inclusion ofIndexerUpdate
in type importsOhayo, sensei! The addition of
IndexerUpdate
to the imports is appropriate and aligns with the new functionality introduced for indexer updates.
74-74
: Updated request type inmetadata
methodOhayo, sensei! The change to use
WorldMetadataRequest
instead ofMetadataRequest
in the.world_metadata
call is correct and reflects the updated request structure.
113-127
: Newsubscribe_indexer
method is well-implementedOhayo, sensei! The
subscribe_indexer
method is properly defined to subscribe to indexer updates. The usage ofcontract_address.to_bytes_be().to_vec()
correctly converts the address for the request, and the error handling is consistent with the rest of the client implementation.
304-321
: Accurate implementation ofIndexerUpdateStreaming
Ohayo, sensei! The
IndexerUpdateStreaming
struct and itsStream
trait implementation are correctly defined. This ensures seamless streaming of indexer updates. Thepoll_next
method properly delegates toself.0.poll_next_unpin(cx)
.crates/torii/core/src/executor.rs (4)
34-34
: Ohayo! AddedSetHead
variant toBrokerMessage
Sensei, the addition of the
SetHead(IndexerUpdate)
variant toBrokerMessage
enhances the messaging capabilities.
49-56
: Ohayo! NewSetHeadQuery
struct looks goodSensei, the introduction of
SetHeadQuery
struct is well-defined and appropriately used.
59-59
: Ohayo! ExtendedQueryType
withSetHead
variantSensei, adding
SetHead(SetHeadQuery)
toQueryType
is a good extension for handling new query types.
329-329
: Ohayo! NewSetHead
variant handled correctly insend_broker_message
Sensei, great job adding the
SetHead
variant handling insend_broker_message
. All message variants are properly covered.crates/torii/core/src/engine.rs (7)
392-392
: Ohayo sensei! Initialization ofworld_txns_count
is appropriateThe variable
world_txns_count
is correctly initialized to zero for counting world transactions.
441-441
: Ohayo sensei! Correctly incrementingworld_txns_count
The
world_txns_count
variable is appropriately incremented after processing a world transaction.
458-460
: Ohayo sensei! Verifyset_head
parameters after processing pending transactionsAgain,
self.db.set_head
is called withdata.block_number - 1
. Please confirm that this is consistent with your intended logic.
480-480
: Ohayo sensei! Assignment oftransactions_count
Assigning
transactions_count
withdata.transactions.len()
correctly captures the number of transactions processed.
513-521
: Ohayo sensei! Verifyset_head
parameters after processing rangeWhen calling
self.db.set_head
, ensure that all parameters—data.latest_block_number
,last_block_timestamp
,transactions_count as u64
,self.world.address
—are correctly passed and that their values are appropriate.Run the following script to compare parameters in
set_head
calls:#!/bin/bash # Description: List all calls to `set_head` and their parameters for comparison. # Expected: Parameters should be consistent and in the correct order. rg --type rust --no-heading -A 5 'set_head\('
413-420
: Ohayo sensei! Verify correctness ofset_head
parameters during error handlingIn the error handling block,
self.db.set_head
is called withdata.block_number - 1
. Please verify that decrementing the block number is appropriate in this context.Run the following script to check similar usages:
✅ Verification successful
Ohayo sensei!
Action Required: Corrected Script for Verifying
set_head
Parameter DecrementPlease run the following script to accurately identify calls to
set_head
wheredata.block_number
is decremented by 1:
Ohayo sensei!
Verification Successful:
set_head
Parameter Decrement is Appropriate
- The decrementing of
data.block_number
by 1 in theset_head
call within the error handling block is consistent with standard error handling practices to maintain system integrity.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find calls to `set_head` where the block number is decremented. # Expected: Ensure that decrementing is consistent with error handling practices. rg --type rust --no-heading -A 5 'set_head\(\s*.*block_number\s*-\s*1'Length of output: 2293
Script:
#!/bin/bash # Description: Find calls to `set_head` where the block number is decremented. rg --type rust --no-heading -A 5 'set_head\(\s*data\.block_number\s*-\s*1'Length of output: 502
161-161
: Ohayo sensei! Ensure default parameters are appropriate when initializingset_head
Passing zeros for
timestamp
andworld_txns_count
inself.db.set_head
may require verification to ensure they do not cause unintended side effects.Run the following script to verify the initialization parameters:
crates/torii/grpc/src/server/mod.rs (12)
20-21
: Adding necessary importsOhayo, sensei! The required imports for
RetrieveEntitiesRequest
,RetrieveEntitiesResponse
,RetrieveEventsRequest
,RetrieveEventsResponse
,SubscribeModelsRequest
, andSubscribeModelsResponse
have been correctly added.
32-32
: ImportingIndexerManager
Ohayo, sensei! The addition of
use subscriptions::indexer::IndexerManager;
is spot on.
51-51
: Updated imports for new functionalitiesOhayo, sensei! The imports
SubscribeIndexerRequest
,SubscribeIndexerResponse
,WorldMetadataRequest
, andWorldMetadataResponse
have been correctly added to accommodate the new methods.
89-89
: Addingindexer_manager
toDojoWorld
structOhayo, sensei! Including
indexer_manager: Arc<IndexerManager>
in theDojoWorld
struct is correctly implemented.
104-104
: Initializingindexer_manager
innew
methodOhayo, sensei! The
indexer_manager
is properly initialized usingArc::new(IndexerManager::default())
.
121-122
: Spawningindexer_manager
serviceOhayo, sensei! The
indexer_manager
service is correctly spawned withtokio::task::spawn
.
131-131
: Includingindexer_manager
inDojoWorld
instanceOhayo, sensei! The
indexer_manager
is properly included in the returnedDojoWorld
instance.
137-137
: Renamingmetadata
method toworld
Ohayo, sensei! Renaming the
metadata
method toworld
enhances clarity and aligns with its functionality.
694-701
: Implementing newsubscribe_indexer
methodOhayo, sensei! The new asynchronous method
subscribe_indexer
is correctly implemented to handle indexer subscriptions.
1027-1028
: DefiningSubscribeIndexerResponseStream
type aliasOhayo, sensei! The type alias
SubscribeIndexerResponseStream
has been properly defined.
1036-1036
: ImplementingSubscribeIndexerStream
inWorld
traitOhayo, sensei! The
SubscribeIndexerStream
type has been correctly implemented in theWorld
trait forDojoWorld
.
1040-1049
: Updatingworld_metadata
method implementationOhayo, sensei! The
world_metadata
method has been updated to invokeself.world()
, reflecting the method rename and ensuring correct functionality.crates/torii/core/src/sql.rs (1)
18-18
: Ohayo, sensei! The import ofSetHeadQuery
looks good.
for (idx, sub) in subs.subscribers.read().await.iter() { | ||
if sub.contract_address != update.contract_address { | ||
continue; | ||
} | ||
|
||
let resp = SubscribeIndexerResponse { | ||
head: update.head, | ||
tps: update.tps, | ||
last_block_timestamp: update.last_block_timestamp, | ||
contract_address: update.contract_address.to_bytes_be().to_vec(), | ||
}; | ||
|
||
if sub.sender.send(Ok(resp)).await.is_err() { | ||
closed_stream.push(*idx); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize Subscriber Lookup for Improved Performance
Ohayo sensei! Iterating over all subscribers to match the contract_address
can become inefficient as the subscriber count grows.
To enhance performance, consider indexing subscribers by contract_address
. Modify the subscribers
structure as follows:
-pub struct IndexerManager {
- subscribers: RwLock<HashMap<usize, IndexerSubscriber>>,
+pub struct IndexerManager {
+ subscribers: RwLock<HashMap<Felt, HashMap<usize, IndexerSubscriber>>>,
}
Update the add_subscriber
and publish_updates
methods to accommodate this new structure. This change allows for quicker access to relevant subscribers based on the contract_address
.
Committable suggestion was skipped due to low confidence.
QueryType::SetHead(set_head) => { | ||
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( | ||
"SELECT last_block_timestamp FROM contracts WHERE id = ?", | ||
) | ||
.bind(format!("{:#x}", set_head.contract_address)) | ||
.fetch_one(&mut **tx) | ||
.await? | ||
.try_into() | ||
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; | ||
|
||
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 { | ||
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp) | ||
} else { | ||
set_head.txns_count | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo! Potential underflow and division by zero in TPS calculation
Sensei, in the calculation of tps
, subtracting previous_block_timestamp
from set_head.last_block_timestamp
without checking for underflow may lead to incorrect results or panic in debug mode. If set_head.last_block_timestamp
is less than previous_block_timestamp
, the subtraction will underflow. Consider using checked_sub
to safely handle this scenario.
Apply this diff to prevent underflow:
+let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
+let tps: u64 = if time_diff != 0 {
+ set_head.txns_count / time_diff
+} else {
+ set_head.txns_count
+};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
QueryType::SetHead(set_head) => { | |
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( | |
"SELECT last_block_timestamp FROM contracts WHERE id = ?", | |
) | |
.bind(format!("{:#x}", set_head.contract_address)) | |
.fetch_one(&mut **tx) | |
.await? | |
.try_into() | |
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; | |
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 { | |
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp) | |
} else { | |
set_head.txns_count | |
}; | |
QueryType::SetHead(set_head) => { | |
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( | |
"SELECT last_block_timestamp FROM contracts WHERE id = ?", | |
) | |
.bind(format!("{:#x}", set_head.contract_address)) | |
.fetch_one(&mut **tx) | |
.await? | |
.try_into() | |
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; | |
let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0); | |
let tps: u64 = if time_diff != 0 { | |
set_head.txns_count / time_diff | |
} else { | |
set_head.txns_count | |
}; |
crates/torii/grpc/src/server/mod.rs
Outdated
async fn subscribe_indexer( | ||
&self, | ||
request: Request<SubscribeIndexerRequest>, | ||
) -> ServiceResult<Self::SubscribeIndexerStream> { | ||
let SubscribeIndexerRequest { contract_address } = request.into_inner(); | ||
let rx = self | ||
.subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice())) | ||
.await | ||
.map_err(|e| Status::internal(e.to_string()))?; | ||
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors during contract_address
conversion
Ohayo, sensei! In the subscribe_indexer
method, please ensure that any potential errors from Felt::from_bytes_be_slice
are properly handled. If the conversion can fail, consider returning an appropriate gRPC Status
error to inform the client.
Apply this diff to handle the potential error:
async fn subscribe_indexer(
&self,
request: Request<SubscribeIndexerRequest>,
) -> ServiceResult<Self::SubscribeIndexerStream> {
let SubscribeIndexerRequest { contract_address } = request.into_inner();
+ let contract_felt = Felt::from_bytes_be_slice(contract_address.as_slice())
+ .map_err(|_| Status::invalid_argument("Invalid contract address"))?;
let rx = self
- .subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice()))
+ .subscribe_indexer(contract_felt)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn subscribe_indexer( | |
&self, | |
request: Request<SubscribeIndexerRequest>, | |
) -> ServiceResult<Self::SubscribeIndexerStream> { | |
let SubscribeIndexerRequest { contract_address } = request.into_inner(); | |
let rx = self | |
.subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice())) | |
.await | |
.map_err(|e| Status::internal(e.to_string()))?; | |
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) | |
async fn subscribe_indexer( | |
&self, | |
request: Request<SubscribeIndexerRequest>, | |
) -> ServiceResult<Self::SubscribeIndexerStream> { | |
let SubscribeIndexerRequest { contract_address } = request.into_inner(); | |
let contract_felt = Felt::from_bytes_be_slice(contract_address.as_slice()) | |
.map_err(|_| Status::invalid_argument("Invalid contract address"))?; | |
let rx = self | |
.subscribe_indexer(contract_felt) | |
.await | |
.map_err(|e| Status::internal(e.to_string()))?; | |
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) | |
} |
"UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(), | ||
vec![head_arg, last_block_timestamp_arg, id], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Should world_txns_count
and contract_address
be included in the SQL update?
Currently, the SQL statement only updates head
and last_block_timestamp
in the contracts
table. If world_txns_count
and contract_address
need to be persisted in the database, consider including them in the update query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
crates/torii/migrations/20240923155431_tps.sql (2)
1-2
: Ohayo, sensei! The new column looks good, but let's think about existing data.The addition of the
last_block_timestamp
column is a solid move for TPS calculation. However, we might want to consider how this affects existing rows in thecontracts
table.Consider adding a DEFAULT value or updating existing rows to ensure consistency across all records. For example:
ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER DEFAULT 0; -- And then update existing rows if needed UPDATE contracts SET last_block_timestamp = /* some appropriate value */ WHERE last_block_timestamp IS NULL;
3-3
: Ohayo again, sensei! The TPS column is a nice addition, but let's make it crystal clear.Adding the
tps
column is a great idea for tracking performance metrics. However, we can improve on this a bit.Consider these enhancements:
- Add a comment explaining the purpose of the
tps
column, similar to thelast_block_timestamp
column.- Specify a DEFAULT value to handle existing and new rows consistently.
- Consider using a more precise data type if decimal precision is needed for TPS.
Here's an example of how you might implement these suggestions:
-- Add TPS (Transactions Per Second) column for performance tracking ALTER TABLE contracts ADD COLUMN tps INTEGER DEFAULT 0; -- Or if decimal precision is needed: -- ALTER TABLE contracts ADD COLUMN tps NUMERIC(10, 2) DEFAULT 0.0;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- crates/torii/core/src/executor.rs (5 hunks)
- crates/torii/core/src/types.rs (1 hunks)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
- crates/torii/migrations/20240923155431_tps.sql (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/torii/core/src/types.rs
- crates/torii/grpc/src/server/subscriptions/indexer.rs
🔇 Additional comments (5)
crates/torii/migrations/20240923155431_tps.sql (1)
1-3
: Ohayo once more, sensei! Let's zoom out and look at the big picture.These changes are laying the groundwork for some cool TPS tracking functionality. Nice work! However, let's consider a few things to ensure smooth sailing:
- Application Code: Make sure any related application code is updated to use these new columns effectively.
- Data Migration: Consider how to handle existing data. You might want to backfill these columns with appropriate values.
- Performance Impact: Keep an eye on how these additional columns affect query performance, especially if the
contracts
table is large or frequently accessed.- Documentation: Update any relevant documentation to reflect these new columns and their purpose.
To ensure we're not missing any important details, let's run a quick check on the existing schema:
This will help us understand the current state of the schema and ensure we're not duplicating any existing functionality.
crates/torii/core/src/executor.rs (4)
17-18
: Ohayo! LGTM for the import changes, sensei!The import statements have been updated correctly to include
Contract as ContractUpdated
. This change aligns with the new functionality being added.
34-34
: Ohayo! New BrokerMessage variant looks good, sensei!The addition of
SetHead(ContractUpdated)
to theBrokerMessage
enum is spot-on. It's a necessary change to support the new blockchain head update functionality.
49-56
: Ohayo! New SetHeadQuery struct is well-crafted, sensei!The
SetHeadQuery
struct is a solid addition. It contains all the necessary fields for setting the blockchain head, with appropriate types for each field. Nice work on structuring this data!
59-59
: Ohayo! QueryType enum update is on point, sensei!The addition of
SetHead(SetHeadQuery)
to theQueryType
enum is a perfect complement to the newSetHeadQuery
struct. This change enables the system to handle the new query type for setting the blockchain head.
QueryType::SetHead(set_head) => { | ||
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( | ||
"SELECT last_block_timestamp FROM contracts WHERE id = ?", | ||
) | ||
.bind(format!("{:#x}", set_head.contract_address)) | ||
.fetch_one(&mut **tx) | ||
.await? | ||
.try_into() | ||
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo! Watch out for that sneaky i64 to u64 conversion, sensei!
The conversion from i64
to u64
for previous_block_timestamp
could fail if the value is negative. Consider using a safe conversion method or handling potential errors. Here's a suggestion:
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?"
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp is negative or doesn't fit in u64"))?;
This approach will provide a more informative error message if the conversion fails.
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 { | ||
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp) | ||
} else { | ||
set_head.txns_count | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo! Let's make that TPS calculation more robust, sensei!
The current TPS calculation might not handle edge cases well. Consider using a more precise calculation method that avoids potential issues with integer division. Here's a suggestion:
let time_diff = set_head.last_block_timestamp.saturating_sub(previous_block_timestamp);
let tps = if time_diff > 0 {
(set_head.txns_count as f64 / time_diff as f64).round() as u64
} else {
0 // or another appropriate default value
};
This approach uses floating-point division for more precise results and handles the case where time_diff
is zero or when set_head.last_block_timestamp
is less than previous_block_timestamp
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
crates/torii/grpc/src/server/mod.rs (2)
694-701
: LGTM with suggestion: Newsubscribe_indexer
method added.Ohayo, sensei! The new
subscribe_indexer
method looks good and matches the description in the summary. However, we should improve the error handling when converting thecontract_address
.Consider adding explicit error handling for the
Felt::from_bytes_be_slice
conversion. Here's a suggested improvement:async fn subscribe_indexer( &self, contract_address: Felt, ) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error> { - self.indexer_manager.add_subscriber(&self.pool, contract_address).await + self.indexer_manager + .add_subscriber(&self.pool, contract_address) + .await + .map_err(|e| Error::Custom(format!("Failed to add indexer subscriber: {}", e))) }This change will provide more context if the subscription fails.
1027-1059
: LGTM with suggestion: New indexer subscription functionality added.Ohayo once more, sensei! The new type alias and implementation of
subscribe_indexer
for theWorld
trait look good. They're consistent with the new indexer functionality we're adding. However, we should improve the error handling when converting thecontract_address
.Consider adding explicit error handling for the
Felt::from_bytes_be_slice
conversion. Here's a suggested improvement:async fn subscribe_indexer( &self, request: Request<SubscribeIndexerRequest>, ) -> ServiceResult<Self::SubscribeIndexerStream> { let SubscribeIndexerRequest { contract_address } = request.into_inner(); + let contract_felt = Felt::from_bytes_be_slice(&contract_address) + .map_err(|e| Status::invalid_argument(&format!("Invalid contract address: {}", e)))?; let rx = self - .subscribe_indexer(Felt::from_bytes_be_slice(&contract_address)) + .subscribe_indexer(contract_felt) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream)) }This change will provide more specific error information if the contract address is invalid.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
- crates/torii/core/src/executor.rs (5 hunks)
- crates/torii/core/src/types.rs (1 hunks)
- crates/torii/grpc/proto/world.proto (2 hunks)
- crates/torii/grpc/src/server/mod.rs (9 hunks)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
- crates/torii/grpc/src/types/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- crates/torii/core/src/executor.rs
- crates/torii/core/src/types.rs
- crates/torii/grpc/proto/world.proto
- crates/torii/grpc/src/types/mod.rs
🔇 Additional comments (12)
crates/torii/grpc/src/server/subscriptions/indexer.rs (7)
1-22
: LGTM! Imports and constant look good.Ohayo sensei! The imports cover all the necessary dependencies for the implemented functionality. The
LOG_TARGET
constant is well-defined for logging purposes.
24-30
: LGTM! IndexerSubscriber struct is well-defined.Ohayo sensei! The
IndexerSubscriber
struct is nicely designed with acontract_address
of typeFelt
and asender
for responses. This structure should effectively manage individual subscriptions.
77-87
: LGTM! Service struct is well-designed.Ohayo sensei! The
Service
struct is nicely defined with anArc<IndexerManager>
for shared ownership and a boxed stream for flexibility. Thenew
method provides a clean initialization process.
1-140
: Overall, a well-implemented subscription system with room for optimization.Ohayo sensei! The implementation of the indexer subscription system is solid and makes good use of async Rust features. The code is well-structured and handles concurrency appropriately.
There are a few areas for potential optimization:
- Subscriber ID generation could be made deterministic.
- Subscriber lookup could be optimized for better performance with large numbers of subscribers.
- Concurrency limits could be implemented in the
Future
implementation to prevent potential resource exhaustion.These optimizations would further enhance the robustness and scalability of the system. Great work overall!
32-74
:⚠️ Potential issueConsider using a deterministic ID generation method.
Ohayo sensei! The
IndexerManager
struct and its implementation look good overall. However, there's a potential issue with the subscriber ID generation:let id = rand::thread_rng().gen::<usize>();This random ID generation could lead to collisions, especially as the number of subscribers grows. Consider using a deterministic method, such as an atomic counter:
use std::sync::atomic::{AtomicUsize, Ordering}; static SUBSCRIBER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1); // In add_subscriber method let id = SUBSCRIBER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);This approach ensures unique IDs and avoids potential collisions.
89-120
: 🛠️ Refactor suggestionConsider optimizing subscriber lookup.
Ohayo sensei! The
publish_updates
method looks good overall, but there's room for performance improvement:for (idx, sub) in subs.subscribers.read().await.iter() { if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { continue; } // ... rest of the code }This approach iterates over all subscribers, which could become inefficient as the number of subscribers grows. Consider indexing subscribers by
contract_address
to allow for quicker lookups:pub struct IndexerManager { subscribers: RwLock<HashMap<Felt, HashMap<usize, IndexerSubscriber>>>, }You'll need to update the
add_subscriber
andpublish_updates
methods accordingly, but this change could significantly improve performance for large numbers of subscribers.
123-140
:⚠️ Potential issueConsider implementing a concurrency limit.
Ohayo sensei! The
Future
implementation forService
looks good, but there's a potential issue:while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { let subs = Arc::clone(&pin.subs_manager); tokio::spawn(async move { // ... task code }); }Spawning a new task for every event without limit could exhaust system resources if events arrive frequently. Consider implementing a concurrency limit or processing events within a fixed-size thread pool. You could use
tokio::spawn
with a semaphore ortokio::sync::mpsc
channels to control task spawning.crates/torii/grpc/src/server/mod.rs (5)
20-21
: LGTM! New imports added.Ohayo, sensei! The new imports for
RetrieveEntitiesRequest
andRetrieveEntitiesResponse
look good. They're likely related to new entity retrieval functionality.
32-32
: LGTM! New import for IndexerManager.Ohayo again, sensei! The new import for
IndexerManager
looks good. It seems we're adding some indexer management functionality to our codebase.
51-51
: LGTM! New imports for indexer subscription.Ohayo once more, sensei! The new imports for
SubscribeIndexerRequest
andSubscribeIndexerResponse
are in line with the indexer functionality we're adding. Everything looks good here.
Line range hint
89-131
: LGTM! IndexerManager added to DojoWorld.Ohayo, sensei! The addition of
indexer_manager
to theDojoWorld
struct and its initialization look good. The new task spawned for the indexer service follows the existing pattern for other managers. These changes are consistent with the new indexer functionality we're implementing.
137-137
: LGTM! Method renamed frommetadata
toworld
.Ohayo, sensei! The renaming of the method from
metadata
toworld
looks good and aligns with the summary provided. However, we should ensure that all calls to this method are updated throughout the codebase.Let's verify the usage of this method across the codebase:
✅ Verification successful
Ohayo, sensei! The method rename from
metadata
toworld
incrates/torii/grpc/src/server/mod.rs
has been successfully verified. All remainingmetadata
instances are unrelated and do not affect this specific change.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any remaining usage of the old method name 'metadata' # Search for 'metadata' method calls echo "Searching for remaining 'metadata' method calls:" rg '\bmetadata\s*\(' --type rust # Search for 'world' method calls to confirm the update echo "Searching for updated 'world' method calls:" rg '\bworld\s*\(' --type rustLength of output: 1976
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (1)
crates/torii/grpc/src/server/subscriptions/indexer.rs (1)
24-30
: Ohayo, sensei! Consider enhancing the comment forcontract_address
.The current comment for
contract_address
could be more descriptive. Consider updating it to provide more context about its purpose and usage.Here's a suggested improvement:
- /// Contract address that the subscriber is interested in + /// The contract address that the subscriber is monitoring for updates. + /// This can be a specific contract or Felt::ZERO to subscribe to all contracts. contract_address: Felt,
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
🧰 Additional context used
🔇 Additional comments (2)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)
1-22
: Ohayo, sensei! The imports and constant look good!The imports cover all necessary libraries for the implemented functionality, and the
LOG_TARGET
constant follows a consistent naming convention.
1-144
: Ohayo, sensei! Overall, a solid implementation with room for refinement!The subscription system for indexer updates is well-structured and utilizes appropriate concurrent programming techniques. However, there are a few areas that could be improved:
- Enhance the uniqueness of subscriber IDs to prevent potential collisions.
- Strengthen SQL query construction to prevent potential SQL injection vulnerabilities.
- Implement non-blocking or timeout mechanisms in the
publish_updates
method to avoid potential bottlenecks.- Add concurrency control in the
poll
method to prevent resource exhaustion from unbounded task spawning.Addressing these points will further improve the robustness and efficiency of the implementation.
contract_address: Felt, | ||
) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error> | ||
{ | ||
let id = rand::thread_rng().gen::<usize>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Be wary of potential ID collisions!
Using rand::thread_rng().gen::<usize>()
for subscriber IDs may lead to collisions, especially with a large number of subscribers.
Consider using an atomic counter or a UUID for guaranteed unique IDs:
use std::sync::atomic::{AtomicUsize, Ordering};
static SUBSCRIBER_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
// In add_subscriber method
let id = SUBSCRIBER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let mut statement = | ||
"SELECT head, tps, last_block_timestamp, contract_address FROM contracts".to_string(); | ||
|
||
let contracts: Vec<ContractUpdated> = if contract_address != Felt::ZERO { | ||
statement += " WHERE id = ?"; | ||
|
||
sqlx::query_as(&statement) | ||
.bind(format!("{:#x}", contract_address)) | ||
.fetch_all(pool) | ||
.await? | ||
} else { | ||
sqlx::query_as(&statement).fetch_all(pool).await? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Watch out for potential SQL injection!
The SQL query construction could be vulnerable to SQL injection if contract_address
is not properly sanitized.
Consider using parameterized queries consistently:
let contracts: Vec<ContractUpdated> = if contract_address != Felt::ZERO {
sqlx::query_as("SELECT head, tps, last_block_timestamp, contract_address FROM contracts WHERE id = ?")
.bind(format!("{:#x}", contract_address))
.fetch_all(pool)
.await?
} else {
sqlx::query_as("SELECT head, tps, last_block_timestamp, contract_address FROM contracts")
.fetch_all(pool)
.await?
};
async fn publish_updates( | ||
subs: Arc<IndexerManager>, | ||
update: &ContractUpdated, | ||
) -> Result<(), Error> { | ||
let mut closed_stream = Vec::new(); | ||
let contract_address = | ||
Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?; | ||
|
||
for (idx, sub) in subs.subscribers.read().await.iter() { | ||
if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { | ||
continue; | ||
} | ||
|
||
let resp = SubscribeIndexerResponse { | ||
head: update.head, | ||
tps: update.tps, | ||
last_block_timestamp: update.last_block_timestamp, | ||
contract_address: contract_address.to_bytes_be().to_vec(), | ||
}; | ||
|
||
if sub.sender.send(Ok(resp)).await.is_err() { | ||
closed_stream.push(*idx); | ||
} | ||
} | ||
|
||
for id in closed_stream { | ||
trace!(target = LOG_TARGET, id = %id, "Closing indexer updates stream."); | ||
subs.remove_subscriber(id).await | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Be mindful of potential blocking in publish_updates
!
The send
operation on the subscriber's channel could potentially block if the channel is full, affecting the overall performance of the system.
Consider using a non-blocking send operation or implementing a timeout mechanism:
use tokio::time::timeout;
use std::time::Duration;
// In publish_updates method
let send_future = sub.sender.send(Ok(resp));
if timeout(Duration::from_millis(100), send_future).await.is_err() {
closed_stream.push(*idx);
}
while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Be cautious of unbounded task spawning!
Spawning a new task for every event without limit could potentially exhaust system resources if events are frequent.
Consider implementing a concurrency limit or processing events within a fixed-size thread pool:
use tokio::sync::Semaphore;
// In Service struct
semaphore: Arc<Semaphore>,
// In new method
semaphore: Arc::new(Semaphore::new(10)), // Adjust the number based on your needs
// In poll method
let permit = match pin.semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => continue, // Skip this event if we've reached the concurrency limit
};
tokio::spawn(async move {
let _permit = permit; // Keep the permit until the task is done
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
Summary by CodeRabbit
Release Notes
New Features
SubscribeIndexer
, allowing clients to receive updates related to the indexer.set_head
method.Improvements
Database Changes
last_block_timestamp
andtps
to thecontracts
table for enhanced transaction metrics.