From 2f6c863c59a6e406a7c68c1266244bdc932dacd0 Mon Sep 17 00:00:00 2001 From: Maddy Blue Date: Fri, 26 Jul 2024 04:49:19 +0000 Subject: [PATCH] adapter: add guarantees for AdapterClient::end_transaction Both the HTTP and pgwire protocols automatically commit implicit transactions by calling AdapterClient::end_transaction which sends a Command::Commit to the Coordinator. That command produced a Plan::CommitTransaction (equivalent to the user typing `COMMIT`). Since this was a normal plan, the same as if it came from a SQL statement, the RBAC checks run. One of the checks verifies that the user's role still exists (regardless of the statement being executed). It is thus possible for the Plan::CommitTransaction to fail and not sequence its plan (in this case `sequence_end_transaction`). However, both the AdapterClient (and HTTP and pgwire protocols) had assumed that Command::Commit would always clean up the session's transaction state. But since the cleanup action happens in `sequence_end_transaction` which isn't guaranteed to execute (if, say, the user's role was dropped), that assumption is not guaranteed. Change Command::Commit to guarantee that the Coordinator cleans up the txn state, and change AdapterClient::end_transaction to guarantee it cleans up the session's txn state. This leaves all usages of end_transaction as is by changing the implementation to do what the callers assumed it had been doing. No explicit test was added because the parallel DDL tests have reliably been finding this bug, so adding the soft panic should produce a panic if this bug is still present when running that nightly. Alternatives considered: - Add a Command::ClearTransaction command that AdapterClient::end_transaction calls which would cleanup the state. This seemed unneeded and another message sent to the Coordinator that could be avoided by adding the desired behavior to Commit and having the AdapterClient take care of the session's state. - Have callers of AdapterClient::end_transaction terminate the connection. This sounded nice but ended up being tricky because the higher level callers often support generic errors that get serialized and sent off to the user, leaving the connection intact. We'd have to plumb some new logic through that would indicate a termination was needed which didn't seem worth it. - Teach Command::Commit to produce a Plan that would skip the rbac checks. Skipping RBAC checks seemed dangerous. Fixes #28400 --- src/adapter/src/client.rs | 21 ++++++++++++++------- src/adapter/src/command.rs | 4 ++++ src/adapter/src/coord/command_handler.rs | 14 ++++++++++---- src/adapter/src/coord/sql.rs | 6 ++++++ 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 0405a7b0a7084..7c8b15ebd9276 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -624,18 +624,25 @@ impl SessionClient { result } - /// Ends a transaction. + /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the + /// session and Coordinator has cleared its state. #[instrument(level = "debug")] pub async fn end_transaction( &mut self, action: EndTransactionAction, ) -> Result { - self.send(|tx, session| Command::Commit { - action, - session, - tx, - }) - .await + let res = self + .send(|tx, session| Command::Commit { + action, + session, + tx, + }) + .await; + // Commit isn't guaranteed to set the session's state to anything specific, so clear it + // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain + // any data that the Coordinator must act on for correctness. + let _ = self.session().clear_transaction(); + res } /// Fails a transaction. diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 2e4eeb847f968..08f2c10698459 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -71,6 +71,10 @@ pub enum Command { outer_ctx_extra: Option, }, + /// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's + /// transaction state has been cleared, even if the commit or abort fails. (A failure can + /// happen, for example, if the session's role id has been dropped which will prevent + /// sequence_end_transaction from running.) Commit { action: EndTransactionAction, session: Session, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 74956ec147677..78381a4e5982c 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -22,9 +22,9 @@ use futures::FutureExt; use mz_adapter_types::connection::{ConnectionId, ConnectionIdType}; use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source}; use mz_catalog::SYSTEM_CONN_ID; -use mz_ore::instrument; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; +use mz_ore::{instrument, soft_panic_or_log}; use mz_repr::role_id::RoleId; use mz_repr::{ScalarType, Timestamp}; use mz_sql::ast::{ @@ -215,8 +215,12 @@ impl Coordinator { } }; + let conn_id = ctx.session().conn_id().clone(); self.sequence_plan(ctx, plan, ResolvedIds(BTreeSet::new())) .await; + // Part of the Command::Commit contract is that the Coordinator guarantees that + // it has cleared its transaction state for the connection. + self.clear_connection(&conn_id).await; } Command::CatalogSnapshot { tx } => { @@ -749,9 +753,11 @@ impl Coordinator { // (Terminating the connection is maybe what we would prefer to do, but is not // currently a thing we can do from the coordinator: calling handle_terminate // cleans up Coordinator state for the session but doesn't inform the - // AdapterClient that the session should terminate.) Once the bug here - // (https://github.com/MaterializeInc/materialize/issues/28400) is fixed, this - // could be changed to an assert. + // AdapterClient that the session should terminate.) + soft_panic_or_log!( + "session {} attempted to get ddl lock while already owning it", + ctx.session().conn_id() + ); ctx.retire(Err(AdapterError::Internal(format!( "session attempted to get ddl lock while already owning it" )))); diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index 0ddeb6edbd887..e0ec8f8f87892 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -192,6 +192,12 @@ impl Coordinator { &mut self, session: &mut Session, ) -> TransactionStatus { + // This function is *usually* called when transactions end, but it can fail to be called in + // some cases (for example if the session's role id was dropped, then we return early and + // don't go through the normal sequence_end_transaction path). The `Command::Commit` handler + // and `AdapterClient::end_transaction` protect against this by each executing their parts + // of this function. Thus, if this function changes, ensure that the changes are propogated + // to either of those components. self.clear_connection(session.conn_id()).await; session.clear_transaction() }