From 88b8fcfb6e0dbadac2d56f1ab525ce21d204508d Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Thu, 27 Jun 2024 16:46:39 -0700 Subject: [PATCH] edits --- nexus/src/app/mod.rs | 7 ++++-- nexus/src/app/saga.rs | 44 +++++++++++++++++++++++++++---------- nexus/src/saga_interface.rs | 2 +- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 74ec4759bd..707a807d3d 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -130,8 +130,7 @@ pub struct Nexus { db_datastore: Arc, /// handle to global authz information - // XXX-dap - pub(super) authz: Arc, + authz: Arc, /// saga execution coordinator sagas: SagaExecutor, @@ -562,6 +561,10 @@ impl Nexus { &self.tunables } + pub fn authz(&self) -> &Arc { + &self.authz + } + pub(crate) async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { let mut my_rx = self.populate_status.clone(); loop { diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index f200b9d30b..da94e2417e 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -10,8 +10,8 @@ //! //! The basic lifecycle at the Nexus level is: //! -//! input: saga type (impls [`NexusSaga`]) -//! parameters (specific to the saga's type) +//! input: saga type (impls [`NexusSaga`]), +//! saga parameters (specific to the saga's type) //! | //! | [`create_saga_dag()`] //! v @@ -43,7 +43,8 @@ //! but not wait for it to finish. In this case, they can just stop after //! calling [`RunnableSaga::start()`]. The saga will continue running; they //! just won't be able to directly wait for it to finish or get the result. -//! * Tests can use any of the lower-level pieces to examine intermediate state. +//! * Tests can use any of the lower-level pieces to examine intermediate state +//! or inject errors. use super::sagas::NexusSaga; use super::sagas::SagaInitError; @@ -89,14 +90,14 @@ pub(crate) fn create_saga_dag( /// Note that Steno provides its own interface for kicking off sagas. This one /// is a thin wrapper around it. This one exists to layer Nexus-specific /// behavior on top of Steno's (e.g., error conversion). -pub struct SagaExecutor { +pub(crate) struct SagaExecutor { sec_client: Arc, log: slog::Logger, nexus: OnceLock>, } impl SagaExecutor { - pub fn new( + pub(crate) fn new( sec_client: Arc, log: slog::Logger, ) -> SagaExecutor { @@ -112,7 +113,7 @@ impl SagaExecutor { // one call site, it does fail cleanly if someone tries to use // `SagaExecutor` before this has been set, and the result is much cleaner // for all the other users of `SagaExecutor`. - pub fn set_nexus(&self, nexus: Arc) { + pub(crate) fn set_nexus(&self, nexus: Arc) { self.nexus.set(nexus).unwrap_or_else(|_| { panic!("concurrent initialization of SagaExecutor") }) @@ -133,6 +134,23 @@ impl SagaExecutor { /// Given a DAG (which has generally been specifically created for a /// particular saga and includes the saga's parameters), prepare to start /// running the saga. This does not actually start the saga running. + /// + /// ## Async cancellation + /// + /// The Future returned by this function is basically not cancellation-safe, + /// in that if this Future is cancelled, one of a few things might be true: + /// + /// * Nothing has happened; it's as though this function was never called. + /// * The saga has been created, but not started. If this happens, the saga + /// will likely start running the next time saga recovery happens (e.g., + /// the next time Nexus starts up) and then run to completion. + /// + /// It's not clear what the caller would _want_ if they cancelled this + /// future, but whatever it is, clearly it's not guaranteed to be true. + /// You're better off avoiding cancellation. Fortunately, we currently + /// execute sagas either from API calls and background tasks, neither of + /// which can be cancelled. **This function should not be used in a + /// `tokio::select!` with a `timeout` or the like.** pub(crate) async fn saga_prepare( &self, dag: SagaDag, @@ -214,7 +232,7 @@ impl SagaExecutor { /// crash will either happen before the saga has been created (in which /// case it's as though we didn't even call this function) or after (in /// which case the saga will run to completion). - pub async fn saga_execute( + pub(crate) async fn saga_execute( &self, params: N::Params, ) -> Result { @@ -245,6 +263,7 @@ impl RunnableSaga { self.id } + /// Start this saga running. pub(crate) async fn start(self) -> Result { info!(self.log, "starting saga"); self.sec_client @@ -269,6 +288,10 @@ pub(crate) struct RunningSaga { } impl RunningSaga { + /// Waits until this saga stops executing + /// + /// Normally, the saga will have finished successfully or failed and unwound + /// completely. If unwinding fails, it will be _stuck_ instead. pub(crate) async fn wait_until_stopped(self) -> StoppedSaga { let result = self.saga_completion_future.await; info!(self.log, "saga finished"; "saga_result" => ?result); @@ -295,6 +318,8 @@ impl StoppedSaga { self.result } + /// Interprets the result of saga execution as a `Result` whose error type + /// is `Error`. pub(crate) fn into_omicron_result(self) -> Result { self.result.kind.map_err(|saga_error| { let mut error = saga_error @@ -369,13 +394,10 @@ impl super::Nexus { })? } - // XXX-dap this PR fixes - // https://github.com/oxidecomputer/omicron/issues/5406 too - /// For testing only: provides direct access to the underlying SecClient so /// that tests can inject errors #[cfg(test)] - pub fn sec(&self) -> &steno::SecClient { + pub(crate) fn sec(&self) -> &steno::SecClient { &self.sagas.sec_client } } diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index 61d6386985..5a828ff0ec 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -34,7 +34,7 @@ impl SagaContext { } pub(crate) fn authz(&self) -> &Arc { - &self.nexus.authz + &self.nexus.authz() } pub(crate) fn nexus(&self) -> &Arc {