diff --git a/nexus/db-queries/src/db/sec_store.rs b/nexus/db-queries/src/db/sec_store.rs index f8fd4ab86d..72de02ff54 100644 --- a/nexus/db-queries/src/db/sec_store.rs +++ b/nexus/db-queries/src/db/sec_store.rs @@ -70,8 +70,6 @@ impl steno::SecStore for CockroachDbSecStore { // This is an internal service query to CockroachDB. backoff::retry_policy_internal_service(), || { - // An interesting question is how to handle errors. - // // In general, there are some kinds of database errors that are // temporary/server errors (e.g. network failures), and some // that are permanent/client errors (e.g. conflict during @@ -85,10 +83,9 @@ impl steno::SecStore for CockroachDbSecStore { // errors that likely require operator intervention.) // // At a higher level, callers should plan for the fact that - // record_event could potentially loop forever. See - // https://github.com/oxidecomputer/omicron/issues/5406 and the - // note in `nexus/src/app/saga.rs`'s `execute_saga` for more - // details. + // record_event (and, so, saga execution) could potentially loop + // indefinitely while the datastore (or other dependent + // services) are down. self.datastore .saga_create_event(&our_event) .map_err(backoff::BackoffError::transient) diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index 78ae002dd3..adb0010e9a 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -203,7 +203,8 @@ impl super::Nexus { create_params: params.clone(), }; let saga_outputs = self - .execute_saga::(saga_params) + .sagas + .saga_execute::(saga_params) .await?; let disk_created = saga_outputs .lookup_node_output::("created_disk") @@ -342,7 +343,8 @@ impl super::Nexus { disk_id: authz_disk.id(), volume_id: db_disk.volume_id, }; - self.execute_saga::(saga_params) + self.sagas + .saga_execute::(saga_params) .await?; Ok(()) } @@ -585,10 +587,9 @@ impl super::Nexus { snapshot_name: finalize_params.snapshot_name.clone(), }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::(saga_params) + .await?; Ok(()) } diff --git a/nexus/src/app/image.rs b/nexus/src/app/image.rs index 03c9c9d6a4..a3fa722d36 100644 --- a/nexus/src/app/image.rs +++ b/nexus/src/app/image.rs @@ -270,7 +270,8 @@ impl super::Nexus { image_param, }; - self.execute_saga::(saga_params) + self.sagas + .saga_execute::(saga_params) .await?; Ok(()) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 6b4d87063a..d15eb2c18a 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -379,7 +379,8 @@ impl super::Nexus { }; let saga_outputs = self - .execute_saga::( + .sagas + .saga_execute::( saga_params, ) .await?; @@ -462,10 +463,11 @@ impl super::Nexus { instance, boundary_switches, }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::( + saga_params, + ) + .await?; Ok(()) } @@ -510,10 +512,11 @@ impl super::Nexus { src_vmm: vmm.clone(), migrate_params: params, }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::( + saga_params, + ) + .await?; // TODO correctness TODO robustness TODO design // Should we lookup the instance again here? @@ -757,10 +760,11 @@ impl super::Nexus { db_instance: instance.clone(), }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::( + saga_params, + ) + .await?; self.db_datastore.instance_fetch_with_vmm(opctx, &authz_instance).await } @@ -1938,7 +1942,8 @@ impl super::Nexus { }; let saga_outputs = self - .execute_saga::( + .sagas + .saga_execute::( saga_params, ) .await?; @@ -1967,7 +1972,8 @@ impl super::Nexus { }; let saga_outputs = self - .execute_saga::( + .sagas + .saga_execute::( saga_params, ) .await?; diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 510b6b5b16..707a807d3d 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -5,6 +5,7 @@ //! Nexus, the service that operates much of the control plane in an Oxide fleet use self::external_endpoints::NexusCertResolver; +use self::saga::SagaExecutor; use crate::app::oximeter::LazyTimeseriesClient; use crate::app::sagas::SagaRequest; use crate::populate::populate_start; @@ -132,7 +133,7 @@ pub struct Nexus { authz: Arc, /// saga execution coordinator - sec_client: Arc, + sagas: SagaExecutor, /// Task representing completion of recovered Sagas recovery_task: std::sync::Mutex>, @@ -238,6 +239,7 @@ impl Nexus { Arc::clone(&db_datastore), log.new(o!("component" => "SecStore")), )) as Arc; + let sec_client = Arc::new(steno::sec( log.new(o!( "component" => "SEC", @@ -246,6 +248,11 @@ impl Nexus { sec_store, )); + let sagas = SagaExecutor::new( + Arc::clone(&sec_client), + log.new(o!("component" => "SagaExecutor")), + ); + let client_state = dpd_client::ClientState { tag: String::from("nexus"), log: log.new(o!( @@ -425,7 +432,7 @@ impl Nexus { log: log.new(o!()), db_datastore: Arc::clone(&db_datastore), authz: Arc::clone(&authz), - sec_client: Arc::clone(&sec_client), + sagas, recovery_task: std::sync::Mutex::new(None), external_server: std::sync::Mutex::new(None), techport_external_server: std::sync::Mutex::new(None), @@ -467,6 +474,7 @@ impl Nexus { // TODO-cleanup all the extra Arcs here seems wrong let nexus = Arc::new(nexus); + nexus.sagas.set_nexus(nexus.clone()); let opctx = OpContext::for_background( log.new(o!("component" => "SagaRecoverer")), Arc::clone(&authz), @@ -480,7 +488,6 @@ impl Nexus { Arc::new(Arc::new(SagaContext::new( Arc::clone(&nexus), saga_logger, - Arc::clone(&authz), ))), db_datastore, Arc::clone(&sec_client), @@ -554,6 +561,10 @@ impl Nexus { &self.tunables } + pub fn authz(&self) -> &Arc { + &self.authz + } + pub(crate) async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { let mut my_rx = self.populate_status.clone(); loop { @@ -934,7 +945,8 @@ impl Nexus { let nexus = self.clone(); tokio::spawn(async move { let saga_result = nexus - .execute_saga::( + .sagas + .saga_execute::( params, ) .await; @@ -958,7 +970,8 @@ impl Nexus { let nexus = self.clone(); tokio::spawn(async move { let saga_result = nexus - .execute_saga::( + .sagas + .saga_execute::( params, ) .await; @@ -982,7 +995,8 @@ impl Nexus { let nexus = self.clone(); tokio::spawn(async move { let saga_result = nexus - .execute_saga::( + .sagas + .saga_execute::( params, ) .await; diff --git a/nexus/src/app/project.rs b/nexus/src/app/project.rs index 2e852ba2d3..8f31e74728 100644 --- a/nexus/src/app/project.rs +++ b/nexus/src/app/project.rs @@ -59,7 +59,8 @@ impl super::Nexus { authz_silo, }; let saga_outputs = self - .execute_saga::( + .sagas + .saga_execute::( saga_params, ) .await?; diff --git a/nexus/src/app/rack.rs b/nexus/src/app/rack.rs index 780cb85f3f..ee3818f40c 100644 --- a/nexus/src/app/rack.rs +++ b/nexus/src/app/rack.rs @@ -360,9 +360,9 @@ impl super::Nexus { // TODO // configure rack networking / boundary services here - // Currently calling some of the apis directly, but should we be using sagas - // going forward via self.run_saga()? Note that self.create_runnable_saga and - // self.execute_saga are currently not available within this scope. + // Currently calling some of the apis directly, but should we be using + // sagas going forward via self.sagas.saga_execute()? Note that + // this may not be available within this scope. info!(log, "Recording Rack Network Configuration"); let address_lot_name = Name::from_str(INFRA_LOT).map_err(|e| { Error::internal_error(&format!( diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 8a717839f0..70118fb620 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -2,12 +2,57 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Saga management and execution +//! Nexus-level saga management and execution +//! +//! Steno provides its own interfaces for managing sagas. The interface here is +//! a thin wrapper aimed at the mini framework we've built at the Nexus level +//! that makes it easier to define and manage sagas in a uniform way. +//! +//! The basic lifecycle at the Nexus level is: +//! +//! ```text +//! input: saga type (impls [`NexusSaga`]), +//! saga parameters (specific to the saga's type) +//! | +//! | [`create_saga_dag()`] +//! v +//! SagaDag +//! | +//! | [`SagaExecutor::saga_prepare()`] +//! v +//! RunnableSaga +//! | +//! | [`RunnableSaga::start()`] +//! v +//! RunningSaga +//! | +//! | [`RunningSaga::wait_until_stopped()`] +//! v +//! StoppedSaga +//! ``` +//! +//! At the end, you can use [`StoppedSaga::into_omicron_result()`] to get at the +//! success output of the saga or convert any saga failure along the way to an +//! Omicron [`Error`]. +//! +//! This interface allows a few different use cases: +//! +//! * A common case is that some code in Nexus wants to do all of this: create +//! the saga DAG, run it, wait for it to finish, and get the result. +//! [`SagaExecutor::saga_execute()`] does all this using these lower-level +//! interfaces. +//! * An expected use case is that some code in Nexus wants to kick off a saga +//! but not wait for it to finish. In this case, they can just stop after +//! calling [`RunnableSaga::start()`]. The saga will continue running; they +//! just won't be able to directly wait for it to finish or get the result. +//! * Tests can use any of the lower-level pieces to examine intermediate state +//! or inject errors. use super::sagas::NexusSaga; use super::sagas::SagaInitError; use super::sagas::ACTION_REGISTRY; use crate::saga_interface::SagaContext; +use crate::Nexus; use anyhow::Context; use futures::future::BoxFuture; use futures::StreamExt; @@ -20,6 +65,7 @@ use omicron_common::api::external::LookupResult; use omicron_common::api::external::ResourceType; use omicron_common::bail_unless; use std::sync::Arc; +use std::sync::OnceLock; use steno::DagBuilder; use steno::SagaDag; use steno::SagaId; @@ -28,23 +74,8 @@ use steno::SagaResult; use steno::SagaResultOk; use uuid::Uuid; -/// Encapsulates a saga to be run before we actually start running it -/// -/// At this point, we've built the DAG, loaded it into the SEC, etc. but haven't -/// started it running. This is a useful point to inject errors, inspect the -/// DAG, etc. -pub(crate) struct RunnableSaga { - id: SagaId, - fut: BoxFuture<'static, SagaResult>, -} - -impl RunnableSaga { - #[cfg(test)] - pub(crate) fn id(&self) -> SagaId { - self.id - } -} - +/// Given a particular kind of Nexus saga (the type parameter `N`) and +/// parameters for that saga, construct a [`SagaDag`] for it pub(crate) fn create_saga_dag( params: N::Params, ) -> Result { @@ -56,70 +87,95 @@ pub(crate) fn create_saga_dag( Ok(SagaDag::new(dag, params)) } -impl super::Nexus { - pub(crate) async fn sagas_list( - &self, - opctx: &OpContext, - pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResult { - // The endpoint we're serving only supports `ScanById`, which only - // supports an ascending scan. - bail_unless!( - pagparams.direction == dropshot::PaginationOrder::Ascending - ); - opctx.authorize(authz::Action::Read, &authz::FLEET).await?; - let marker = pagparams.marker.map(|s| SagaId::from(*s)); - let saga_list = self - .sec_client - .saga_list(marker, pagparams.limit) - .await - .into_iter() - .map(nexus_types::internal_api::views::Saga::from) - .map(Ok); - Ok(futures::stream::iter(saga_list).boxed()) - } +/// Handle to a self-contained subsystem for kicking off sagas +/// +/// See the module-level documentation for details. +pub(crate) struct SagaExecutor { + sec_client: Arc, + log: slog::Logger, + nexus: OnceLock>, +} - pub(crate) async fn saga_get( - &self, - opctx: &OpContext, - id: Uuid, - ) -> LookupResult { - opctx.authorize(authz::Action::Read, &authz::FLEET).await?; - self.sec_client - .saga_get(SagaId::from(id)) - .await - .map(nexus_types::internal_api::views::Saga::from) - .map(Ok) - .map_err(|_: ()| { - Error::not_found_by_id(ResourceType::SagaDbg, &id) - })? +impl SagaExecutor { + pub(crate) fn new( + sec_client: Arc, + log: slog::Logger, + ) -> SagaExecutor { + SagaExecutor { sec_client, log, nexus: OnceLock::new() } } - pub(crate) async fn create_runnable_saga( - self: &Arc, - dag: SagaDag, - ) -> Result { - // Construct the context necessary to execute this saga. - let saga_id = SagaId(Uuid::new_v4()); + // This is a little gross. We want to hang the SagaExecutor off of Nexus, + // but we also need to refer to Nexus, which thus can't exist when + // SagaExecutor is constructed. So we have the caller hand it to us after + // initialization. + // + // This isn't as statically verifiable as we'd normally like. But it's only + // one call site, it does fail cleanly if someone tries to use + // `SagaExecutor` before this has been set, and the result is much cleaner + // for all the other users of `SagaExecutor`. + // + // # Panics + // + // This function should be called exactly once in the lifetime of any + // `SagaExecutor` object. If it gets called more than once, concurrently or + // not, it panics. + pub(crate) fn set_nexus(&self, nexus: Arc) { + self.nexus.set(nexus).unwrap_or_else(|_| { + panic!("multiple initialization of SagaExecutor") + }) + } - self.create_runnable_saga_with_id(dag, saga_id).await + fn nexus(&self) -> Result<&Arc, Error> { + self.nexus + .get() + .ok_or_else(|| Error::unavail("saga are not available yet")) } - pub(crate) async fn create_runnable_saga_with_id( - self: &Arc, + // Low-level interface + // + // The low-level interface for running sagas starts with `saga_prepare()` + // and then uses the `RunnableSaga`, `RunningSaga`, and `StoppedSaga` types + // to drive execution forward. + + /// Given a DAG (which has generally been specifically created for a + /// particular saga and includes the saga's parameters), prepare to start + /// running the saga. This does not actually start the saga running. + /// + /// ## Async cancellation + /// + /// The Future returned by this function is basically not cancellation-safe, + /// in that if this Future is cancelled, one of a few things might be true: + /// + /// * Nothing has happened; it's as though this function was never called. + /// * The saga has been created, but not started. If this happens, the saga + /// will likely start running the next time saga recovery happens (e.g., + /// the next time Nexus starts up) and then run to completion. + /// + /// It's not clear what the caller would _want_ if they cancelled this + /// future, but whatever it is, clearly it's not guaranteed to be true. + /// You're better off avoiding cancellation. Fortunately, we currently + /// execute sagas either from API calls and background tasks, neither of + /// which can be cancelled. **This function should not be used in a + /// `tokio::select!` with a `timeout` or the like.** + pub(crate) async fn saga_prepare( + &self, dag: SagaDag, - saga_id: SagaId, ) -> Result { + // Construct the context necessary to execute this saga. + let nexus = self.nexus()?; + let saga_id = SagaId(Uuid::new_v4()); let saga_logger = self.log.new(o!( "saga_name" => dag.saga_name().to_string(), "saga_id" => saga_id.to_string() )); let saga_context = Arc::new(Arc::new(SagaContext::new( - self.clone(), - saga_logger, - Arc::clone(&self.authz), + nexus.clone(), + saga_logger.clone(), ))); - let future = self + + // Tell Steno about it. This does not start it running yet. + info!(saga_logger, "preparing saga"); + let saga_completion_future = self .sec_client .saga_create( saga_id, @@ -135,22 +191,156 @@ impl super::Nexus { // Steno. Error::internal_error(&format!("{:#}", error)) })?; - Ok(RunnableSaga { id: saga_id, fut: future }) + Ok(RunnableSaga { + id: saga_id, + saga_completion_future, + log: saga_logger, + sec_client: self.sec_client.clone(), + }) } - pub(crate) async fn run_saga( + // Convenience functions + + /// Create a new saga (of type `N` with parameters `params`), start it + /// running, wait for it to finish, and report the result + /// + /// Note that this can take a long time and may not complete while parts of + /// the system are not functioning. Care should be taken when waiting on + /// this in a latency-sensitive context. + /// + /// + /// ## Async cancellation + /// + /// This function isn't really cancel-safe, in that if the Future returned + /// by this function is cancelled, one of three things may be true: + /// + /// * Nothing has happened; it's as though this function was never called. + /// * The saga has been created, but not started. If this happens, the saga + /// will likely start running the next time saga recovery happens (e.g., + /// the next time Nexus starts up) and then run to completion. + /// * The saga has already been started and will eventually run to + /// completion (even though this Future has been cancelled). + /// + /// It's not clear what the caller would _want_ if they cancelled this + /// future, but whatever it is, clearly it's not guaranteed to be true. + /// You're better off avoiding cancellation. Fortunately, we currently + /// execute sagas either from API calls and background tasks, neither of + /// which can be cancelled. **This function should not be used in a + /// `tokio::select!` with a `timeout` or the like.** + /// + /// Say you _do_ want to kick off a saga and wait only a little while before + /// it completes. In that case, you can use the lower-level interface to + /// first create the saga (a process which still should not be cancelled, + /// but would generally be quick) and then wait for it to finish. The + /// waiting part is cancellable. + /// + /// Note that none of this affects _crash safety_. In terms of a crash: the + /// crash will either happen before the saga has been created (in which + /// case it's as though we didn't even call this function) or after (in + /// which case the saga will run to completion). + pub(crate) async fn saga_execute( &self, - runnable_saga: RunnableSaga, + params: N::Params, ) -> Result { - let log = &self.log; + // Construct the DAG specific to this saga. + let dag = create_saga_dag::(params)?; + let runnable_saga = self.saga_prepare(dag).await?; + let running_saga = runnable_saga.start().await?; + let stopped_saga = running_saga.wait_until_stopped().await; + stopped_saga.into_omicron_result() + } +} + +/// Encapsulates a saga to be run before we actually start running it +/// +/// At this point, we've built the DAG, loaded it into the SEC, etc. but haven't +/// started it running. This is a useful point to inject errors, inspect the +/// DAG, etc. +pub(crate) struct RunnableSaga { + id: SagaId, + saga_completion_future: BoxFuture<'static, SagaResult>, + log: slog::Logger, + sec_client: Arc, +} + +impl RunnableSaga { + #[cfg(test)] + pub(crate) fn id(&self) -> SagaId { + self.id + } + + /// Start this saga running. + /// + /// Once this completes, even if you drop the returned `RunningSaga`, the + /// saga will still run to completion. + pub(crate) async fn start(self) -> Result { + info!(self.log, "starting saga"); self.sec_client - .saga_start(runnable_saga.id) + .saga_start(self.id) .await .context("starting saga") .map_err(|error| Error::internal_error(&format!("{:#}", error)))?; - let result = runnable_saga.fut.await; - result.kind.map_err(|saga_error| { + Ok(RunningSaga { + id: self.id, + saga_completion_future: self.saga_completion_future, + log: self.log, + }) + } + + /// Start the saga running and wait for it to complete. + /// + /// This is a shorthand for `start().await?.wait_until_stopped().await`. + // There is no reason this needs to be limited to tests, but it's only used + // by the tests today. + #[cfg(test)] + pub(crate) async fn run_to_completion(self) -> Result { + Ok(self.start().await?.wait_until_stopped().await) + } +} + +/// Describes a saga that's started running +pub(crate) struct RunningSaga { + id: SagaId, + saga_completion_future: BoxFuture<'static, SagaResult>, + log: slog::Logger, +} + +impl RunningSaga { + /// Waits until this saga stops executing + /// + /// Normally, the saga will have finished successfully or failed and unwound + /// completely. If unwinding fails, it will be _stuck_ instead. + pub(crate) async fn wait_until_stopped(self) -> StoppedSaga { + let result = self.saga_completion_future.await; + info!(self.log, "saga finished"; "saga_result" => ?result); + StoppedSaga { id: self.id, result, log: self.log } + } +} + +/// Describes a saga that's finished +pub(crate) struct StoppedSaga { + id: SagaId, + result: SagaResult, + log: slog::Logger, +} + +impl StoppedSaga { + /// Fetches the raw Steno result for the saga's execution + /// + /// This is a test-only routine meant for use in tests that need to examine + /// the details of a saga's final state (e.g., examining the exact point at + /// which it failed). Non-test callers should use `into_omicron_result` + /// instead. + #[cfg(test)] + pub(crate) fn into_raw_result(self) -> SagaResult { + self.result + } + + /// Interprets the result of saga execution as a `Result` whose error type + /// is `Error`. + pub(crate) fn into_omicron_result(self) -> Result { + self.result.kind.map_err(|saga_error| { let mut error = saga_error .error_source .convert::() @@ -165,8 +355,12 @@ impl super::Nexus { undo_node, undo_error )); - error!(log, "saga stuck"; - "saga_id" => runnable_saga.id.to_string(), + // TODO this log message does not belong here because if the + // caller isn't checking this then we won't log it. We should + // probably make Steno log this since there may be no place in + // Nexus that's waiting for a given saga to finish. + error!(self.log, "saga stuck"; + "saga_id" => self.id.to_string(), "error" => #%error, ); } @@ -174,57 +368,55 @@ impl super::Nexus { error }) } +} - /// Starts the supplied `runnable_saga` and, if that succeeded, awaits its - /// completion and returns the raw `SagaResult`. - /// - /// This is a test-only routine meant for use in tests that need to examine - /// the details of a saga's final state (e.g., examining the exact point at - /// which it failed). Non-test callers should use `run_saga` instead (it - /// logs messages on error conditions and has a standard mechanism for - /// converting saga errors to generic Omicron errors). - #[cfg(test)] - pub(crate) async fn run_saga_raw_result( +impl super::Nexus { + /// Lists sagas currently managed by this Nexus instance + pub(crate) async fn sagas_list( &self, - runnable_saga: RunnableSaga, - ) -> Result { - self.sec_client - .saga_start(runnable_saga.id) + opctx: &OpContext, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResult { + // The endpoint we're serving only supports `ScanById`, which only + // supports an ascending scan. + bail_unless!( + pagparams.direction == dropshot::PaginationOrder::Ascending + ); + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let marker = pagparams.marker.map(|s| SagaId::from(*s)); + let saga_list = self + .sagas + .sec_client + .saga_list(marker, pagparams.limit) .await - .context("starting saga") - .map_err(|error| Error::internal_error(&format!("{:#}", error)))?; - - Ok(runnable_saga.fut.await) + .into_iter() + .map(nexus_types::internal_api::views::Saga::from) + .map(Ok); + Ok(futures::stream::iter(saga_list).boxed()) } - pub fn sec(&self) -> &steno::SecClient { - &self.sec_client + /// Fetch information about a saga currently managed by this Nexus instance + pub(crate) async fn saga_get( + &self, + opctx: &OpContext, + id: Uuid, + ) -> LookupResult { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + self.sagas + .sec_client + .saga_get(SagaId::from(id)) + .await + .map(nexus_types::internal_api::views::Saga::from) + .map(Ok) + .map_err(|_: ()| { + Error::not_found_by_id(ResourceType::SagaDbg, &id) + })? } - /// Given a saga type and parameters, create a new saga and execute it. - pub(crate) async fn execute_saga( - self: &Arc, - params: N::Params, - ) -> Result { - // Construct the DAG specific to this saga. - let dag = create_saga_dag::(params)?; - - // Register the saga with the saga executor. - let runnable_saga = self.create_runnable_saga(dag).await?; - - // Actually run the saga to completion. - // - // XXX: This may loop forever in case `SecStore::record_event` fails. - // Ideally, `run_saga` wouldn't both start the saga and wait for it to - // be finished -- instead, it would start off the saga, and then return - // a notification channel that the caller could use to decide: - // - // - either to .await until completion - // - or to stop waiting after a certain period, while still letting the - // saga run in the background. - // - // For more, see https://github.com/oxidecomputer/omicron/issues/5406 - // and the note in `sec_store.rs`'s `record_event`. - self.run_saga(runnable_saga).await + /// For testing only: provides direct access to the underlying SecClient so + /// that tests can inject errors + #[cfg(test)] + pub(crate) fn sec(&self) -> &steno::SecClient { + &self.sagas.sec_client } } diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index ff0cc63d00..204d004938 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -883,15 +883,11 @@ pub(crate) mod test { let project_id = create_project(&client, PROJECT_NAME).await.identity.id; - // Build the saga DAG with the provided test parameters + // Build the saga DAG with the provided test parameters and run it. let opctx = test_opctx(cptestctx); let params = new_test_params(&opctx, project_id); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - let output = nexus.run_saga(runnable_saga).await.unwrap(); - + let output = + nexus.sagas.saga_execute::(params).await.unwrap(); let disk = output .lookup_node_output::( "created_disk", diff --git a/nexus/src/app/sagas/disk_delete.rs b/nexus/src/app/sagas/disk_delete.rs index 24cf331a34..aab361eced 100644 --- a/nexus/src/app/sagas/disk_delete.rs +++ b/nexus/src/app/sagas/disk_delete.rs @@ -236,7 +236,7 @@ pub(crate) mod test { let project_id = create_project(client, PROJECT_NAME).await.identity.id; let disk = create_disk(&cptestctx).await; - // Build the saga DAG with the provided test parameters + // Build the saga DAG with the provided test parameters and run it. let opctx = test_opctx(&cptestctx); let params = Params { serialized_authn: Serialized::for_opctx(&opctx), @@ -244,11 +244,7 @@ pub(crate) mod test { disk_id: disk.id(), volume_id: disk.volume_id, }; - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - nexus.run_saga(runnable_saga).await.unwrap(); + nexus.sagas.saga_execute::(params).await.unwrap(); } #[nexus_test(server = crate::Server)] diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index ffbd5ff2f5..4f0ec7c0c6 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -1140,15 +1140,12 @@ pub mod test { let nexus = &cptestctx.server.server_context().nexus; let project_id = create_org_project_and_disk(&client).await; - // Build the saga DAG with the provided test parameters + // Build the saga DAG with the provided test parameters and run it let opctx = test_helpers::test_opctx(&cptestctx); let params = new_test_params(&opctx, project_id); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga nexus - .run_saga(runnable_saga) + .sagas + .saga_execute::(params) .await .expect("Saga should have succeeded"); } diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index b6fedc175d..2168657ef4 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -268,22 +268,17 @@ mod test { let nexus = &cptestctx.server.server_context().nexus; create_org_project_and_disk(&client).await; - // Build the saga DAG with the provided test parameters - let dag = create_saga_dag::( - new_test_params( - &cptestctx, - create_instance(&cptestctx, new_instance_create_params()) - .await - .id(), - ) - .await, + // Build the saga DAG with the provided test parameters and run it. + let params = new_test_params( + &cptestctx, + create_instance(&cptestctx, new_instance_create_params()) + .await + .id(), ) - .unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga + .await; nexus - .run_saga(runnable_saga) + .sagas + .saga_execute::(params) .await .expect("Saga should have succeeded"); } diff --git a/nexus/src/app/sagas/instance_ip_attach.rs b/nexus/src/app/sagas/instance_ip_attach.rs index c4e209dccd..b18ac3109f 100644 --- a/nexus/src/app/sagas/instance_ip_attach.rs +++ b/nexus/src/app/sagas/instance_ip_attach.rs @@ -427,10 +427,11 @@ pub(crate) mod test { for use_float in [false, true] { let params = new_test_params(&opctx, datastore, use_float).await; - - let dag = create_saga_dag::(params).unwrap(); - let saga = nexus.create_runnable_saga(dag).await.unwrap(); - nexus.run_saga(saga).await.expect("Attach saga should succeed"); + nexus + .sagas + .saga_execute::(params) + .await + .expect("Attach saga should succeed"); } // Sled agent has a record of the new external IPs. diff --git a/nexus/src/app/sagas/instance_ip_detach.rs b/nexus/src/app/sagas/instance_ip_detach.rs index 474cfb18a6..a5b51ce375 100644 --- a/nexus/src/app/sagas/instance_ip_detach.rs +++ b/nexus/src/app/sagas/instance_ip_detach.rs @@ -402,10 +402,11 @@ pub(crate) mod test { for use_float in [false, true] { let params = new_test_params(&opctx, datastore, use_float).await; - - let dag = create_saga_dag::(params).unwrap(); - let saga = nexus.create_runnable_saga(dag).await.unwrap(); - nexus.run_saga(saga).await.expect("Detach saga should succeed"); + nexus + .sagas + .saga_execute::(params) + .await + .expect("Detach saga should succeed"); } // Sled agent has removed its records of the external IPs. diff --git a/nexus/src/app/sagas/instance_migrate.rs b/nexus/src/app/sagas/instance_migrate.rs index 3546642bbb..491b916c9d 100644 --- a/nexus/src/app/sagas/instance_migrate.rs +++ b/nexus/src/app/sagas/instance_migrate.rs @@ -574,7 +574,7 @@ async fn sim_instance_migrate( #[cfg(test)] mod tests { - use crate::app::{saga::create_saga_dag, sagas::test_helpers}; + use crate::app::sagas::test_helpers; use camino::Utf8Path; use dropshot::test_util::ClientTestContext; use nexus_test_interface::NexusServer; @@ -707,9 +707,11 @@ mod tests { }, }; - let dag = create_saga_dag::(params).unwrap(); - let saga = nexus.create_runnable_saga(dag).await.unwrap(); - nexus.run_saga(saga).await.expect("Migration saga should succeed"); + nexus + .sagas + .saga_execute::(params) + .await + .expect("Migration saga should succeed"); // Merely running the migration saga (without simulating any completion // steps in the simulated agents) should not change where the instance diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 9730025099..adde040a77 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -756,9 +756,11 @@ mod test { db_instance, }; - let dag = create_saga_dag::(params).unwrap(); - let saga = nexus.create_runnable_saga(dag).await.unwrap(); - nexus.run_saga(saga).await.expect("Start saga should succeed"); + nexus + .sagas + .saga_execute::(params) + .await + .expect("Start saga should succeed"); test_helpers::instance_simulate(cptestctx, &instance_id).await; let vmm_state = test_helpers::instance_fetch(cptestctx, instance_id) @@ -918,11 +920,13 @@ mod test { ))) .await; - let saga = nexus.create_runnable_saga(dag).await.unwrap(); - let saga_error = nexus - .run_saga_raw_result(saga) + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); + let saga_result = runnable_saga + .run_to_completion() .await .expect("saga execution should have started") + .into_raw_result(); + let saga_error = saga_result .kind .expect_err("saga should fail due to injected error"); diff --git a/nexus/src/app/sagas/project_create.rs b/nexus/src/app/sagas/project_create.rs index 6893590519..33f150ec32 100644 --- a/nexus/src/app/sagas/project_create.rs +++ b/nexus/src/app/sagas/project_create.rs @@ -154,7 +154,7 @@ async fn spc_create_vpc_params( #[cfg(test)] mod test { use crate::{ - app::saga::create_saga_dag, app::sagas::project_create::Params, + app::sagas::project_create::Params, app::sagas::project_create::SagaProjectCreate, external_api::params, }; use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; @@ -263,15 +263,11 @@ mod test { // Before running the test, confirm we have no records of any projects. verify_clean_slate(nexus.datastore()).await; - // Build the saga DAG with the provided test parameters + // Build the saga DAG with the provided test parameters and run it. let opctx = test_opctx(&cptestctx); let authz_silo = opctx.authn.silo_required().unwrap(); let params = new_test_params(&opctx, authz_silo); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - nexus.run_saga(runnable_saga).await.unwrap(); + nexus.sagas.saga_execute::(params).await.unwrap(); } #[nexus_test(server = crate::Server)] diff --git a/nexus/src/app/sagas/region_replacement_finish.rs b/nexus/src/app/sagas/region_replacement_finish.rs index c917ac2edd..f200156ce6 100644 --- a/nexus/src/app/sagas/region_replacement_finish.rs +++ b/nexus/src/app/sagas/region_replacement_finish.rs @@ -206,7 +206,6 @@ async fn srrf_update_request_record( #[cfg(test)] pub(crate) mod test { use crate::{ - app::saga::create_saga_dag, app::sagas::region_replacement_finish::Params, app::sagas::region_replacement_finish::SagaRegionReplacementFinish, }; @@ -315,17 +314,16 @@ pub(crate) mod test { .unwrap(); // Run the region replacement finish saga - let dag = create_saga_dag::(Params { + let params = Params { serialized_authn: Serialized::for_opctx(&opctx), region_volume_id: old_region_volume_id, request: request.clone(), - }) - .unwrap(); - - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - let _output = nexus.run_saga(runnable_saga).await.unwrap(); + }; + let _output = nexus + .sagas + .saga_execute::(params) + .await + .unwrap(); // Validate the state transition let result = datastore diff --git a/nexus/src/app/sagas/region_replacement_start.rs b/nexus/src/app/sagas/region_replacement_start.rs index c983716b4f..b71944a460 100644 --- a/nexus/src/app/sagas/region_replacement_start.rs +++ b/nexus/src/app/sagas/region_replacement_start.rs @@ -864,19 +864,18 @@ pub(crate) mod test { .unwrap(); // Run the region replacement start saga - let dag = create_saga_dag::(Params { + let params = Params { serialized_authn: Serialized::for_opctx(&opctx), request: request.clone(), allocation_strategy: RegionAllocationStrategy::Random { seed: None, }, - }) - .unwrap(); - - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - let output = nexus.run_saga(runnable_saga).await.unwrap(); + }; + let output = nexus + .sagas + .saga_execute::(params) + .await + .unwrap(); // Validate the state transition let result = datastore diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 41e1793fab..a16ec6932e 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -1908,11 +1908,12 @@ mod test { None, // not attached to an instance true, // use the pantry ); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - // Actually run the saga - let output = nexus.run_saga(runnable_saga).await.unwrap(); + let output = nexus + .sagas + .saga_execute::(params) + .await + .unwrap(); let snapshot = output .lookup_node_output::( @@ -2237,7 +2238,7 @@ mod test { ); let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); // Before running the saga, attach the disk to an instance! let _instance_and_vmm = setup_test_instance( @@ -2252,7 +2253,11 @@ mod test { .await; // Actually run the saga - let output = nexus.run_saga(runnable_saga).await; + let output = runnable_saga + .run_to_completion() + .await + .unwrap() + .into_omicron_result(); // Expect to see 409 match output { @@ -2295,9 +2300,8 @@ mod test { true, // use the pantry ); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - let output = nexus.run_saga(runnable_saga).await; + let output = + nexus.sagas.saga_execute::(params).await; // Expect 200 assert!(output.is_ok()); @@ -2349,7 +2353,7 @@ mod test { ); let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); // Before running the saga, detach the disk! let (.., authz_disk, db_disk) = @@ -2370,7 +2374,11 @@ mod test { .expect("failed to detach disk")); // Actually run the saga. This should fail. - let output = nexus.run_saga(runnable_saga).await; + let output = runnable_saga + .run_to_completion() + .await + .unwrap() + .into_omicron_result(); assert!(output.is_err()); @@ -2397,9 +2405,8 @@ mod test { false, // use the pantry ); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - let output = nexus.run_saga(runnable_saga).await; + let output = + nexus.sagas.saga_execute::(params).await; // Expect 200 assert!(output.is_ok()); diff --git a/nexus/src/app/sagas/test_helpers.rs b/nexus/src/app/sagas/test_helpers.rs index 684b84dd07..a5d9d0a843 100644 --- a/nexus/src/app/sagas/test_helpers.rs +++ b/nexus/src/app/sagas/test_helpers.rs @@ -261,7 +261,7 @@ pub(crate) async fn actions_succeed_idempotently( nexus: &Arc, dag: SagaDag, ) { - let runnable_saga = nexus.create_runnable_saga(dag.clone()).await.unwrap(); + let runnable_saga = nexus.sagas.saga_prepare(dag.clone()).await.unwrap(); for node in dag.get_nodes() { nexus .sec() @@ -277,7 +277,12 @@ pub(crate) async fn actions_succeed_idempotently( .unwrap(); } - nexus.run_saga(runnable_saga).await.expect("Saga should have succeeded"); + runnable_saga + .run_to_completion() + .await + .expect("Saga should have started") + .into_omicron_result() + .expect("Saga should have succeeded"); } /// Tests that a saga `S` functions properly when any of its nodes fails and @@ -346,7 +351,7 @@ pub(crate) async fn action_failure_can_unwind<'a, S, B, A>( ); let runnable_saga = - nexus.create_runnable_saga(dag.clone()).await.unwrap(); + nexus.sagas.saga_prepare(dag.clone()).await.unwrap(); nexus .sec() @@ -354,12 +359,14 @@ pub(crate) async fn action_failure_can_unwind<'a, S, B, A>( .await .unwrap(); - let saga_error = nexus - .run_saga_raw_result(runnable_saga) + let saga_result = runnable_saga + .run_to_completion() .await .expect("saga should have started successfully") - .kind - .expect_err("saga execution should have failed"); + .into_raw_result(); + + let saga_error = + saga_result.kind.expect_err("saga execution should have failed"); assert_eq!(saga_error.error_node_name, *node.name()); @@ -447,7 +454,7 @@ pub(crate) async fn action_failure_can_unwind_idempotently<'a, S, B, A>( ); let runnable_saga = - nexus.create_runnable_saga(dag.clone()).await.unwrap(); + nexus.sagas.saga_prepare(dag.clone()).await.unwrap(); nexus .sec() @@ -468,10 +475,11 @@ pub(crate) async fn action_failure_can_unwind_idempotently<'a, S, B, A>( .await .unwrap(); - let saga_error = nexus - .run_saga_raw_result(runnable_saga) + let saga_error = runnable_saga + .run_to_completion() .await .expect("saga should have started successfully") + .into_raw_result() .kind .expect_err("saga execution should have failed"); diff --git a/nexus/src/app/sagas/test_saga.rs b/nexus/src/app/sagas/test_saga.rs index 9ccdc4aebc..c872cca67f 100644 --- a/nexus/src/app/sagas/test_saga.rs +++ b/nexus/src/app/sagas/test_saga.rs @@ -78,7 +78,7 @@ async fn test_saga_stuck(cptestctx: &ControlPlaneTestContext) { let nexus = &cptestctx.server.server_context().nexus; let params = Params {}; let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag.clone()).await.unwrap(); + let runnable_saga = nexus.sagas.saga_prepare(dag.clone()).await.unwrap(); let saga_id = runnable_saga.id(); // Inject an error into the second node's action and the first node's undo @@ -87,9 +87,11 @@ async fn test_saga_stuck(cptestctx: &ControlPlaneTestContext) { let n2 = dag.get_index("n2").unwrap(); nexus.sec().saga_inject_error(saga_id, n2).await.unwrap(); nexus.sec().saga_inject_error_undo(saga_id, n1).await.unwrap(); - let result = nexus - .run_saga(runnable_saga) + let result = runnable_saga + .run_to_completion() .await + .expect("expected saga to start") + .into_omicron_result() .expect_err("expected saga to finish stuck"); match result { diff --git a/nexus/src/app/sagas/vpc_create.rs b/nexus/src/app/sagas/vpc_create.rs index 2b6615ad40..a34b25ceb7 100644 --- a/nexus/src/app/sagas/vpc_create.rs +++ b/nexus/src/app/sagas/vpc_create.rs @@ -485,8 +485,8 @@ async fn svc_notify_sleds( #[cfg(test)] pub(crate) mod test { use crate::{ - app::saga::create_saga_dag, app::sagas::vpc_create::Params, - app::sagas::vpc_create::SagaVpcCreate, external_api::params, + app::sagas::vpc_create::Params, app::sagas::vpc_create::SagaVpcCreate, + external_api::params, }; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{ @@ -782,11 +782,7 @@ pub(crate) mod test { ) .await; let params = new_test_params(&opctx, authz_project); - let dag = create_saga_dag::(params).unwrap(); - let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); - - // Actually run the saga - nexus.run_saga(runnable_saga).await.unwrap(); + nexus.sagas.saga_execute::(params).await.unwrap(); } #[nexus_test(server = crate::Server)] diff --git a/nexus/src/app/snapshot.rs b/nexus/src/app/snapshot.rs index c28d180d3c..2b3f59fbe3 100644 --- a/nexus/src/app/snapshot.rs +++ b/nexus/src/app/snapshot.rs @@ -125,7 +125,8 @@ impl super::Nexus { }; let saga_outputs = self - .execute_saga::( + .sagas + .saga_execute::( saga_params, ) .await?; @@ -165,10 +166,11 @@ impl super::Nexus { snapshot: db_snapshot, }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::( + saga_params, + ) + .await?; Ok(()) } diff --git a/nexus/src/app/volume.rs b/nexus/src/app/volume.rs index d4353e52d9..b4aa1dee7f 100644 --- a/nexus/src/app/volume.rs +++ b/nexus/src/app/volume.rs @@ -34,10 +34,11 @@ impl super::Nexus { volume_id, }; - self.execute_saga::( - saga_params, - ) - .await?; + self.sagas + .saga_execute::( + saga_params, + ) + .await?; Ok(()) } diff --git a/nexus/src/app/vpc.rs b/nexus/src/app/vpc.rs index 09d402a940..b3605945d3 100644 --- a/nexus/src/app/vpc.rs +++ b/nexus/src/app/vpc.rs @@ -81,7 +81,8 @@ impl super::Nexus { }; let saga_outputs = self - .execute_saga::(saga_params) + .sagas + .saga_execute::(saga_params) .await?; let (_, db_vpc) = saga_outputs diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index e76ea893e7..5a828ff0ec 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -16,7 +16,6 @@ use std::sync::Arc; pub(crate) struct SagaContext { nexus: Arc, log: Logger, - authz: Arc, } impl fmt::Debug for SagaContext { @@ -26,12 +25,8 @@ impl fmt::Debug for SagaContext { } impl SagaContext { - pub(crate) fn new( - nexus: Arc, - log: Logger, - authz: Arc, - ) -> SagaContext { - SagaContext { authz, nexus, log } + pub(crate) fn new(nexus: Arc, log: Logger) -> SagaContext { + SagaContext { nexus, log } } pub(crate) fn log(&self) -> &Logger { @@ -39,7 +34,7 @@ impl SagaContext { } pub(crate) fn authz(&self) -> &Arc { - &self.authz + &self.nexus.authz() } pub(crate) fn nexus(&self) -> &Arc {