Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: add guarantees for AdapterClient::end_transaction #28549

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,18 +624,25 @@ impl SessionClient {
result
}

/// Ends a transaction.
/// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
/// session and Coordinator has cleared its state.
#[instrument(level = "debug")]
pub async fn end_transaction(
&mut self,
action: EndTransactionAction,
) -> Result<ExecuteResponse, AdapterError> {
self.send(|tx, session| Command::Commit {
action,
session,
tx,
})
.await
let res = self
.send(|tx, session| Command::Commit {
action,
session,
tx,
})
.await;
// Commit isn't guaranteed to set the session's state to anything specific, so clear it
// here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
// any data that the Coordinator must act on for correctness.
let _ = self.session().clear_transaction();
res
}

/// Fails a transaction.
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ pub enum Command {
outer_ctx_extra: Option<ExecuteContextExtra>,
},

/// Attempts to commit or abort the session's transaction. Guarantees that the Coordinator's
/// transaction state has been cleared, even if the commit or abort fails. (A failure can
/// happen, for example, if the session's role id has been dropped which will prevent
/// sequence_end_transaction from running.)
Commit {
action: EndTransactionAction,
session: Session,
Expand Down
14 changes: 10 additions & 4 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures::FutureExt;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_catalog::SYSTEM_CONN_ID;
use mz_ore::instrument;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{instrument, soft_panic_or_log};
use mz_repr::role_id::RoleId;
use mz_repr::{ScalarType, Timestamp};
use mz_sql::ast::{
Expand Down Expand Up @@ -215,8 +215,12 @@ impl Coordinator {
}
};

let conn_id = ctx.session().conn_id().clone();
self.sequence_plan(ctx, plan, ResolvedIds(BTreeSet::new()))
.await;
// Part of the Command::Commit contract is that the Coordinator guarantees that
// it has cleared its transaction state for the connection.
self.clear_connection(&conn_id).await;
}

Command::CatalogSnapshot { tx } => {
Expand Down Expand Up @@ -749,9 +753,11 @@ impl Coordinator {
// (Terminating the connection is maybe what we would prefer to do, but is not
// currently a thing we can do from the coordinator: calling handle_terminate
// cleans up Coordinator state for the session but doesn't inform the
// AdapterClient that the session should terminate.) Once the bug here
// (https://github.com/MaterializeInc/materialize/issues/28400) is fixed, this
// could be changed to an assert.
// AdapterClient that the session should terminate.)
soft_panic_or_log!(
"session {} attempted to get ddl lock while already owning it",
ctx.session().conn_id()
);
ctx.retire(Err(AdapterError::Internal(format!(
"session attempted to get ddl lock while already owning it"
))));
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ impl Coordinator {
&mut self,
session: &mut Session,
) -> TransactionStatus<mz_repr::Timestamp> {
// This function is *usually* called when transactions end, but it can fail to be called in
// some cases (for example if the session's role id was dropped, then we return early and
// don't go through the normal sequence_end_transaction path). The `Command::Commit` handler
// and `AdapterClient::end_transaction` protect against this by each executing their parts
// of this function. Thus, if this function changes, ensure that the changes are propogated
// to either of those components.
self.clear_connection(session.conn_id()).await;
session.clear_transaction()
}
Expand Down