-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii): parallalize event processing #2385
Conversation
WalkthroughOhayo, sensei! The changes involve adding the Changes
Sequence Diagram(s)sequenceDiagram
participant Main as Main Function
participant Provider as JsonRpcClient
participant Engine as Engine Struct
participant Processors as Processors<P>
Main->>Provider: Arc::clone(provider)
Main->>Engine: new(Arc::clone(provider))
Main->>Processors: new(Arc::clone(provider))
Engine->>Processors: process_event()
Recent review detailsConfiguration used: .coderabbit.yaml Files ignored due to path filters (3)
Files selected for processing (10)
Files skipped from review due to trivial changes (1)
Files skipped from review as they are similar to previous changes (7)
Additional comments not posted (8)
TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Outside diff range, codebase verification and nitpick comments (1)
crates/torii/core/src/engine.rs (1)
Line range hint
363-500
: Refactor processing of pending transactions to enhance clarity and maintainability.The method for processing pending transactions uses a complex logic involving multiple nested loops and conditionals. This could be refactored to improve readability and maintainability. Additionally, the use of
BTreeMap
for ordering events by entity ID is a thoughtful choice that ensures events are processed in a predictable order.Consider breaking down this method into smaller, more focused functions or methods. This can help isolate functionality and make the code easier to understand and maintain.
// TODO: we might not able able to properly handle this error case | ||
// since we are trying to do things in parallel so we don't exactly | ||
// know which task failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider improving error handling in parallel tasks.
The TODO comment highlights a potential issue with error handling in the context of parallel processing. It's crucial to identify which specific task failed when errors occur. This could be addressed by improving the logging of errors or by redesigning the task handling to better isolate and report errors.
Consider implementing more granular error handling or logging mechanisms to trace errors back to their origin tasks more effectively.
crates/torii/core/src/engine.rs
Outdated
let modulo = 8u64; | ||
let other = 0u64; | ||
let mut map = BTreeMap::<u8, Vec<(EmittedEvent, u64, u64)>>::new(); | ||
|
||
for ((block_number, _), events) in data.transactions { | ||
let block_timestamp = data.blocks[&block_number]; | ||
for event in events { | ||
let event_name = event_type_from_felt(event.keys[0]); | ||
let entity_id = match event_name { | ||
EventType::StoreSetRecord => { | ||
let keys_start = NUM_KEYS_INDEX + 1; | ||
let keys_end: usize = keys_start | ||
+ event.data[NUM_KEYS_INDEX].to_usize().context("invalid usize")?; | ||
|
||
let keys = event.data[keys_start..keys_end].to_vec(); | ||
let entity_id = poseidon_hash_many(&keys); | ||
entity_id.to_raw()[3] % modulo + 1 | ||
} | ||
EventType::StoreDeleteRecord => { | ||
let entity_id = event.data[ENTITY_ID_INDEX]; | ||
entity_id.to_raw()[3] % modulo + 1 | ||
} | ||
EventType::StoreUpdateMember => { | ||
let entity_id = event.data[ENTITY_ID_INDEX]; | ||
entity_id.to_raw()[3] % modulo + 1 | ||
} | ||
EventType::StoreUpdateRecord => { | ||
let entity_id = event.data[ENTITY_ID_INDEX]; | ||
entity_id.to_raw()[3] % modulo + 1 | ||
} | ||
EventType::Other => other, | ||
}; | ||
|
||
map.entry(entity_id as u8).or_default().push(( | ||
event, | ||
block_number, | ||
block_timestamp, | ||
)); | ||
} | ||
} | ||
|
||
let mut tasks = Vec::new(); | ||
|
||
// Process block | ||
if block_number > last_block { | ||
if let Some(ref block_tx) = self.block_tx { | ||
block_tx.send(block_number).await?; | ||
// loop over the collected events | ||
for (id, events) in map.into_iter() { | ||
let mut db = self.db.clone(); | ||
let processors = Arc::clone(&self.processors); | ||
let world = | ||
WorldContractReader::new(self.world.address, self.provider.as_ref().clone()); | ||
|
||
let task: JoinHandle<Result<()>> = task::spawn(async move { | ||
for (event_idx, (event, block_number, block_timestamp)) in events.iter().enumerate() | ||
{ | ||
if db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { | ||
db.execute().await?; | ||
} | ||
|
||
let transaction_hash = event.transaction_hash; | ||
let event_id = format!( | ||
"{:#064x}:{:#x}:{:#04x}", | ||
block_number, transaction_hash, event_idx | ||
); | ||
let event = Event { | ||
from_address: event.from_address, | ||
keys: event.keys.clone(), | ||
data: event.data.clone(), | ||
}; | ||
|
||
process_event( | ||
*block_number, | ||
*block_timestamp, | ||
&event_id, | ||
&event, | ||
transaction_hash, | ||
&mut db, | ||
Arc::clone(&processors), | ||
&world, | ||
) | ||
.await? | ||
} | ||
db.execute().await?; | ||
Ok(()) | ||
}); | ||
|
||
self.process_block(block_number, data.blocks[&block_number]).await?; | ||
last_block = block_number; | ||
if id as u64 == other { | ||
task.await??; | ||
} else { | ||
tasks.push(task); | ||
} | ||
} | ||
|
||
for task in tasks { | ||
task.await??; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization of event processing in range data.
The method for processing range data efficiently organizes events into a BTreeMap
based on entity IDs, which is a good strategy for ensuring that events are processed in a structured manner. However, the complex nesting and extensive use of asynchronous tasks within loops could be streamlined for better performance and readability.
Refactor this method to reduce complexity and improve the efficiency of task management. Consider using more of Rust's functional programming features to handle data transformations and concurrency more cleanly.
// if timeout doesn't work | ||
// fetch to get entity | ||
// if not available, insert into queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling timeouts and entity fetching more robustly.
The comments added in lines 207-209 suggest areas for potential future enhancements, specifically around handling timeouts and entity fetching. It's great to see proactive comments that guide future development and maintenance efforts. However, it would be beneficial to start creating more concrete plans or tickets for these enhancements to ensure they are not overlooked. Consider using issue trackers or TODOs with specific tags that can be easily searched and prioritized.
crates/torii/core/src/sql.rs
Outdated
if let Ty::Tuple(t) = &o.ty { | ||
t.is_empty() | ||
} else { | ||
false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the conditional logic within the Enum
handling.
The conditional logic in lines 777-781 for handling empty tuples in Enum
types is a bit cumbersome and could be simplified. Using a method or a helper function to determine if an Enum
option is empty could improve readability and maintainability. Here's a suggested refactor:
- if e.options.iter().all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) {
+ if e.options.iter().all(|o| o.ty.is_empty_tuple()) {
You would need to implement is_empty_tuple
in an appropriate place in your type definitions. This change would make the code cleaner and easier to understand at a glance.
Committable suggestion was skipped due to low confidence.
a737cfe
to
8a10542
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Outside diff range, codebase verification and nitpick comments (1)
crates/torii/core/src/engine.rs (1)
Line range hint
363-500
: Refactor processing of pending data to enhance clarity and maintainability.The method
process_pending
is quite lengthy and handles multiple responsibilities, which could impact the maintainability and readability of the code. Consider breaking down this method into smaller, more focused methods, each handling a specific part of the processing logic.For instance, handling of events and transactions could be separated into distinct methods, and the logic for updating the database state could be encapsulated in another method. This would not only improve readability but also make the code easier to test and maintain.
#[allow(clippy::too_many_arguments)] | ||
async fn process_event<P: Provider + Send + Sync + std::fmt::Debug>( | ||
block_number: u64, | ||
block_timestamp: u64, | ||
event_id: &str, | ||
event: &Event, | ||
transaction_hash: Felt, | ||
db: &mut Sql, | ||
processors: Arc<Processors<P>>, | ||
world: &WorldContractReader<P>, | ||
) -> Result<()> { | ||
db.store_event(event_id, event, transaction_hash, block_timestamp); | ||
let event_key = event.keys[0]; | ||
|
||
let Some(processor) = processors.event.get(&event_key) else { | ||
// if we dont have a processor for this event, we try the catch all processor | ||
if processors.catch_all_event.validate(event) { | ||
if let Err(e) = processors | ||
.catch_all_event | ||
.process(world, db, block_number, block_timestamp, event_id, event) | ||
.await | ||
{ | ||
error!(target: LOG_TARGET, error = %e, "Processing catch all event processor."); | ||
} | ||
} else { | ||
let unprocessed_event = UnprocessedEvent { | ||
keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(), | ||
data: event.data.iter().map(|d| format!("{:#x}", d)).collect(), | ||
}; | ||
|
||
// if processor.validate(event) { | ||
if let Err(e) = processor | ||
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) | ||
.await | ||
{ | ||
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); | ||
trace!( | ||
target: LOG_TARGET, | ||
keys = ?unprocessed_event.keys, | ||
data = ?unprocessed_event.data, | ||
"Unprocessed event.", | ||
); | ||
} | ||
// } | ||
|
||
Ok(()) | ||
return Ok(()); | ||
}; | ||
|
||
// if processor.validate(event) { | ||
if let Err(e) = | ||
processor.process(world, db, block_number, block_timestamp, event_id, event).await | ||
{ | ||
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); | ||
} | ||
// } | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance error handling and logging in process_event
.
The process_event
function could benefit from improved error handling. Currently, errors in event processing are logged but not rethrown, potentially swallowing exceptions that should be visible to callers. Consider modifying the error handling strategy to rethrow exceptions after logging, ensuring that errors are properly propagated and can be handled at a higher level.
- error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
+ let err = error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
+ return Err(err);
Committable suggestion was skipped due to low confidence.
8a10542
to
e4c88f6
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2385 +/- ##
==========================================
- Coverage 68.30% 68.28% -0.02%
==========================================
Files 357 357
Lines 47181 47257 +76
==========================================
+ Hits 32225 32270 +45
- Misses 14956 14987 +31 ☔ View full report in Codecov by Sentry. |
commit-id:b4b488fe
e4c88f6
to
8f096a4
Compare
@@ -193,6 +204,9 @@ impl Sql { | |||
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ | |||
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ | |||
event_id=EXCLUDED.event_id RETURNING *"; | |||
// if timeout doesn't work | |||
// fetch to get entity | |||
// if not available, insert into queue | |||
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason as to why we push our broker message to the queue? Because the insert_entities query is executed without the queue. Are we going at some point insrt it into queue in some cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have updated it to use queue in the next commit on stack
@@ -33,14 +33,25 @@ pub const FELT_DELIMITER: &str = "/"; | |||
#[path = "sql_test.rs"] | |||
mod test; | |||
|
|||
#[derive(Debug, Clone)] | |||
#[derive(Debug)] | |||
pub struct Sql { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also update the set_event_message and other functions. Like set_model_member. Or dont they need changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now we only run store_set
, store_del
, store_update_member
and store_update_record
in parallel
supersede by #2423 |
Stack: