diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 0405a7b0a7084..7c8b15ebd9276 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -624,18 +624,25 @@ impl SessionClient { result } - /// Ends a transaction. + /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the + /// session and Coordinator has cleared its state. #[instrument(level = "debug")] pub async fn end_transaction( &mut self, action: EndTransactionAction, ) -> Result { - self.send(|tx, session| Command::Commit { - action, - session, - tx, - }) - .await + let res = self + .send(|tx, session| Command::Commit { + action, + session, + tx, + }) + .await; + // Commit isn't guaranteed to set the session's state to anything specific, so clear it + // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain + // any data that the Coordinator must act on for correctness. + let _ = self.session().clear_transaction(); + res } /// Fails a transaction. diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 2e4eeb847f968..08f2c10698459 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -71,6 +71,10 @@ pub enum Command { outer_ctx_extra: Option, }, + /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's + /// transaction state has been cleared, even if the commit or abort fails. (A failure can + /// happen, for example, if the session's role id has been dropped which will prevent + /// sequence_end_transaction from running.) Commit { action: EndTransactionAction, session: Session, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 74956ec147677..78381a4e5982c 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -22,9 +22,9 @@ use futures::FutureExt; use mz_adapter_types::connection::{ConnectionId, ConnectionIdType}; use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source}; use mz_catalog::SYSTEM_CONN_ID; -use mz_ore::instrument; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; +use mz_ore::{instrument, soft_panic_or_log}; use mz_repr::role_id::RoleId; use mz_repr::{ScalarType, Timestamp}; use mz_sql::ast::{ @@ -215,8 +215,12 @@ impl Coordinator { } }; + let conn_id = ctx.session().conn_id().clone(); self.sequence_plan(ctx, plan, ResolvedIds(BTreeSet::new())) .await; + // Part of the Command::Commit contract is that the Coordinator guarantees that + // it has cleared its transaction state for the connection. + self.clear_connection(&conn_id).await; } Command::CatalogSnapshot { tx } => { @@ -749,9 +753,11 @@ impl Coordinator { // (Terminating the connection is maybe what we would prefer to do, but is not // currently a thing we can do from the coordinator: calling handle_terminate // cleans up Coordinator state for the session but doesn't inform the - // AdapterClient that the session should terminate.) Once the bug here - // (https://github.com/MaterializeInc/materialize/issues/28400) is fixed, this - // could be changed to an assert. + // AdapterClient that the session should terminate.) + soft_panic_or_log!( + "session {} attempted to get ddl lock while already owning it", + ctx.session().conn_id() + ); ctx.retire(Err(AdapterError::Internal(format!( "session attempted to get ddl lock while already owning it" )))); diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index 0ddeb6edbd887..e0ec8f8f87892 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -192,6 +192,12 @@ impl Coordinator { &mut self, session: &mut Session, ) -> TransactionStatus { + // This function is *usually* called when transactions end, but it can fail to be called in + // some cases (for example if the session's role id was dropped, then we return early and + // don't go through the normal sequence_end_transaction path). The `Command::Commit` handler + // and `AdapterClient::end_transaction` protect against this by each executing their parts + // of this function. Thus, if this function changes, ensure that the changes are propogated + // to either of those components. self.clear_connection(session.conn_id()).await; session.clear_transaction() }