Skip to content

Commit

Permalink
fix(torii/core): rollback transaction when engine retries
Browse files Browse the repository at this point in the history
commit-id:16f58fa9
  • Loading branch information
lambda-0x committed Nov 14, 2024
1 parent ae4eabb commit d4daef3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Err(e) => {
error!(target: LOG_TARGET, error = %e, "Processing fetched data.");
erroring_out = true;
// incase of error rollback the transaction
self.db.rollback().await?;
sleep(backoff_delay).await;
if backoff_delay < max_backoff_delay {
backoff_delay *= 2;
Expand Down
39 changes: 39 additions & 0 deletions crates/torii/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ pub enum QueryType {
// similar to execute but doesn't create a new transaction
Flush,
Execute,
// rollback's the current transaction and starts a new one
Rollback,
Other,
}

Expand Down Expand Up @@ -208,6 +210,19 @@ impl QueryMessage {
rx,
)
}

pub fn rollback_recv() -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(
Self {
statement: "".to_string(),
arguments: vec![],
query_type: QueryType::Rollback,
tx: Some(tx),
},
rx,
)
}
}

impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
Expand Down Expand Up @@ -733,6 +748,20 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
// defer executing these queries since they depend on TokenRegister queries
self.deferred_query_messages.push(query_message);
}
QueryType::Rollback => {
debug!(target: LOG_TARGET, "Rolling back the transaction.");
// rollback's the current transaction and starts a new one
let res = self.rollback().await;
debug!(target: LOG_TARGET, "Rolled back the transaction.");

if let Some(sender) = query_message.tx {
sender
.send(res)
.map_err(|_| anyhow::anyhow!("Failed to send rollback result"))?;
} else {
res?;
}
}
QueryType::Other => {
query.execute(&mut **tx).await.with_context(|| {
format!(
Expand Down Expand Up @@ -785,6 +814,16 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {

Ok(())
}

async fn rollback(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.rollback().await?;

// NOTE: clear doesn't reset the capacity
self.publish_queue.clear();
self.deferred_query_messages.clear();
Ok(())
}
}

fn send_broker_message(message: BrokerMessage) {
Expand Down
6 changes: 6 additions & 0 deletions crates/torii/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1311,4 +1311,10 @@ impl Sql {
self.executor.send(flush)?;
recv.await?
}

pub async fn rollback(&self) -> Result<()> {
let (rollback, recv) = QueryMessage::rollback_recv();
self.executor.send(rollback)?;
recv.await?
}
}

0 comments on commit d4daef3

Please sign in to comment.