Skip to content

Commit

Permalink
Merge pull request #28549 from maddyblue/end-txn-guarantees
Browse files Browse the repository at this point in the history
adapter: add guarantees for AdapterClient::end_transaction
  • Loading branch information
maddyblue authored Jul 26, 2024
2 parents 6bab7fc + 2f6c863 commit 7ca67d6
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
21 changes: 14 additions & 7 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,18 +624,25 @@ impl SessionClient {
result
}

/// Ends a transaction.
/// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
/// session and Coordinator has cleared its state.
#[instrument(level = "debug")]
pub async fn end_transaction(
&mut self,
action: EndTransactionAction,
) -> Result<ExecuteResponse, AdapterError> {
self.send(|tx, session| Command::Commit {
action,
session,
tx,
})
.await
let res = self
.send(|tx, session| Command::Commit {
action,
session,
tx,
})
.await;
// Commit isn't guaranteed to set the session's state to anything specific, so clear it
// here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
// any data that the Coordinator must act on for correctness.
let _ = self.session().clear_transaction();
res
}

/// Fails a transaction.
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ pub enum Command {
outer_ctx_extra: Option<ExecuteContextExtra>,
},

/// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
/// transaction state has been cleared, even if the commit or abort fails. (A failure can
/// happen, for example, if the session's role id has been dropped which will prevent
/// sequence_end_transaction from running.)
Commit {
action: EndTransactionAction,
session: Session,
Expand Down
14 changes: 10 additions & 4 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures::FutureExt;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_catalog::SYSTEM_CONN_ID;
use mz_ore::instrument;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{instrument, soft_panic_or_log};
use mz_repr::role_id::RoleId;
use mz_repr::{ScalarType, Timestamp};
use mz_sql::ast::{
Expand Down Expand Up @@ -215,8 +215,12 @@ impl Coordinator {
}
};

let conn_id = ctx.session().conn_id().clone();
self.sequence_plan(ctx, plan, ResolvedIds(BTreeSet::new()))
.await;
// Part of the Command::Commit contract is that the Coordinator guarantees that
// it has cleared its transaction state for the connection.
self.clear_connection(&conn_id).await;
}

Command::CatalogSnapshot { tx } => {
Expand Down Expand Up @@ -749,9 +753,11 @@ impl Coordinator {
// (Terminating the connection is maybe what we would prefer to do, but is not
// currently a thing we can do from the coordinator: calling handle_terminate
// cleans up Coordinator state for the session but doesn't inform the
// AdapterClient that the session should terminate.) Once the bug here
// (https://github.com/MaterializeInc/materialize/issues/28400) is fixed, this
// could be changed to an assert.
// AdapterClient that the session should terminate.)
soft_panic_or_log!(
"session {} attempted to get ddl lock while already owning it",
ctx.session().conn_id()
);
ctx.retire(Err(AdapterError::Internal(format!(
"session attempted to get ddl lock while already owning it"
))));
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ impl Coordinator {
&mut self,
session: &mut Session,
) -> TransactionStatus<mz_repr::Timestamp> {
// This function is *usually* called when transactions end, but it can fail to be called in
// some cases (for example if the session's role id was dropped, then we return early and
// don't go through the normal sequence_end_transaction path). The `Command::Commit` handler
// and `AdapterClient::end_transaction` protect against this by each executing their parts
// of this function. Thus, if this function changes, ensure that the changes are propogated
// to either of those components.
self.clear_connection(session.conn_id()).await;
session.clear_transaction()
}
Expand Down

0 comments on commit 7ca67d6

Please sign in to comment.