diff --git a/nexus/auth/src/context.rs b/nexus/auth/src/context.rs index 0aac0900c5..161ce6493b 100644 --- a/nexus/auth/src/context.rs +++ b/nexus/auth/src/context.rs @@ -236,6 +236,25 @@ impl OpContext { } } + /// Creates a new `OpContext` just like the given one, but with a different + /// identity. + /// + /// This is only intended for tests. + pub fn child_with_authn(&self, authn: authn::Context) -> OpContext { + let created_instant = Instant::now(); + let created_walltime = SystemTime::now(); + + OpContext { + log: self.log.clone(), + authn: Arc::new(authn), + authz: self.authz.clone(), + created_instant, + created_walltime, + metadata: self.metadata.clone(), + kind: self.kind, + } + } + /// Check whether the actor performing this request is authorized for /// `action` on `resource`. pub async fn authorize( diff --git a/nexus/db-queries/src/db/test_utils/unpluggable_sec_store.rs b/nexus/db-queries/src/db/test_utils/unpluggable_sec_store.rs index 70914f1e1c..b2092e7ef8 100644 --- a/nexus/db-queries/src/db/test_utils/unpluggable_sec_store.rs +++ b/nexus/db-queries/src/db/test_utils/unpluggable_sec_store.rs @@ -3,7 +3,6 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Test-only helper function for detaching storage. -// XXX-dap move out of db::DataStore? use crate::db; use async_trait::async_trait; diff --git a/nexus/src/app/background/driver.rs b/nexus/src/app/background/driver.rs index c93729a335..be09ccb21f 100644 --- a/nexus/src/app/background/driver.rs +++ b/nexus/src/app/background/driver.rs @@ -382,6 +382,9 @@ impl TaskExec { // Do it! let details = self.imp.activate(&self.opctx).await; + let details_str = serde_json::to_string(&details).unwrap_or_else(|e| { + format!("<>", e) + }); let elapsed = start_instant.elapsed(); @@ -407,6 +410,7 @@ impl TaskExec { "activation complete"; "elapsed" => ?elapsed, "iteration" => iteration, + "status" => details_str, ); } } diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 17a5f723ec..8cd1a44634 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -656,7 +656,7 @@ impl BackgroundTasksInitializer { { let task_impl = Box::new(saga_recovery::SagaRecovery::new( datastore, - args.nexus_id, + nexus_db_model::SecId(args.nexus_id), args.saga_recovery_opctx, args.saga_recovery_nexus, args.saga_recovery_sec, diff --git a/nexus/src/app/background/tasks/saga_recovery/mod.rs b/nexus/src/app/background/tasks/saga_recovery/mod.rs index a608dd1dc6..410e4d85fd 100644 --- a/nexus/src/app/background/tasks/saga_recovery/mod.rs +++ b/nexus/src/app/background/tasks/saga_recovery/mod.rs @@ -133,7 +133,6 @@ mod task; #[cfg(test)] mod test { use super::*; - use futures::FutureExt; use omicron_common::api::external::Error; use omicron_test_utils::dev::test_setup_log; use std::collections::BTreeMap; @@ -303,10 +302,7 @@ mod test { ); } else { nok += 1; - summary_builder.saga_recovery_success( - *saga_id, - futures::future::ready(Ok(())).boxed(), - ); + summary_builder.saga_recovery_success(*saga_id); } } diff --git a/nexus/src/app/background/tasks/saga_recovery/recovery.rs b/nexus/src/app/background/tasks/saga_recovery/recovery.rs index 19a31a4b57..4df69de90c 100644 --- a/nexus/src/app/background/tasks/saga_recovery/recovery.rs +++ b/nexus/src/app/background/tasks/saga_recovery/recovery.rs @@ -8,9 +8,6 @@ use super::status::RecoveryFailure; use super::status::RecoverySuccess; use chrono::{DateTime, Utc}; -use futures::future::BoxFuture; -#[cfg(test)] -use futures::FutureExt; use omicron_common::api::external::Error; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; @@ -105,11 +102,6 @@ impl RestState { self.remove_next = plan.sagas_maybe_done().collect(); } - - #[cfg(test)] - pub fn sagas_started(&self) -> Vec { - self.sagas_started.keys().copied().collect() - } } /// Read all message that are currently available on the given channel (without @@ -423,10 +415,6 @@ pub struct ExecutionSummary { pub succeeded: Vec, /// list of sagas that failed to be recovered pub failed: Vec, - /// list of Futures to enable the consumer to wait for all recovered sagas - /// to complete - #[cfg(test)] - completion_futures: Vec>>, } impl ExecutionSummary { @@ -440,25 +428,12 @@ impl ExecutionSummary { pub fn into_results(self) -> (Vec, Vec) { (self.succeeded, self.failed) } - - #[cfg(test)] - pub fn wait_for_recovered_sagas_to_finish( - self, - ) -> BoxFuture<'static, Result<(), Error>> { - async { - futures::future::try_join_all(self.completion_futures).await?; - Ok(()) - } - .boxed() - } } pub struct ExecutionSummaryBuilder { in_progress: BTreeMap, succeeded: Vec, failed: Vec, - #[cfg(test)] - completion_futures: Vec>>, } impl ExecutionSummaryBuilder { @@ -467,8 +442,6 @@ impl ExecutionSummaryBuilder { in_progress: BTreeMap::new(), succeeded: Vec::new(), failed: Vec::new(), - #[cfg(test)] - completion_futures: Vec::new(), } } @@ -478,12 +451,7 @@ impl ExecutionSummaryBuilder { "attempted to build execution result while some recoveries are \ still in progress" ); - ExecutionSummary { - succeeded: self.succeeded, - failed: self.failed, - #[cfg(test)] - completion_futures: self.completion_futures, - } + ExecutionSummary { succeeded: self.succeeded, failed: self.failed } } /// Record that we've started recovering this saga @@ -497,28 +465,13 @@ impl ExecutionSummaryBuilder { } /// Record that we've successfully recovered this saga - pub fn saga_recovery_success( - &mut self, - saga_id: SagaId, - // XXX-dap Right now, the only test code we have calls this function - // separately. If that remains the case, we can put this argument under - // cfg[test] and have the caller in task.rs not provide it and have - // recover_one not even provide it... but then in that case, there's no - // point in any of this machinery. - // - // On the other hand if we the higher-level tests end up using this, - // we'll need to figure out what to do with this since right now this - // argument is unused in non-test code. - completion_future: BoxFuture<'static, Result<(), Error>>, - ) { + pub fn saga_recovery_success(&mut self, saga_id: SagaId) { let saga_logger = self .in_progress .remove(&saga_id) .expect("recovered saga should have previously started"); info!(saga_logger, "recovered saga"); self.succeeded.push(RecoverySuccess { time: Utc::now(), saga_id }); - #[cfg(test)] - self.completion_futures.push(completion_future); } /// Record that we failed to recover this saga @@ -545,8 +498,6 @@ mod test { use crate::app::background::tasks::saga_recovery::test::make_fake_saga; use crate::app::background::tasks::saga_recovery::test::make_saga_ids; use omicron_test_utils::dev::test_setup_log; - use std::sync::Arc; - use tokio::sync::Notify; #[test] fn test_read_all_from_channel() { @@ -698,8 +649,6 @@ mod test { assert_eq!(0, last_pass.nskipped); assert_eq!(0, last_pass.nremoved); - assert!(summary.wait_for_recovered_sagas_to_finish().await.is_ok()); - // Test a non-trivial ExecutionSummary. let BasicPlanTestCase { plan, @@ -719,41 +668,20 @@ mod test { } // "Finish" recovery, in yet a different order (for the same reason as - // above). - // - // We want to test the success and failure cases. We also want to test - // wait_for_recovered_sagas_to_finish(), so we'll provide futures whose - // completion we control precisely. + // above). We want to test the success and failure cases. // // Act like: // - recovery for the last saga failed - // - recovery for the first saga completes successfully, but the saga - // itself takes some time to finish - // - recovery for the other sagas completes successfully and the sagas - // themselves complete immediately + // - recovery for the other sagas completes successfully to_recover.rotate_left(2); - let notify = Arc::new(Notify::new()); for (i, saga_id) in to_recover.iter().enumerate() { - if i == 0 { - let n = notify.clone(); - summary_builder.saga_recovery_success( - *saga_id, - async move { - n.notified().await; - Ok(()) - } - .boxed(), - ); - } else if i == to_recover.len() - 1 { + if i == to_recover.len() - 1 { summary_builder.saga_recovery_failure( *saga_id, &Error::internal_error("test error"), ); } else { - summary_builder.saga_recovery_success( - *saga_id, - futures::future::ready(Ok(())).boxed(), - ); + summary_builder.saga_recovery_success(*saga_id); } } @@ -769,16 +697,6 @@ mod test { assert_eq!(to_skip.len(), last_pass.nskipped); assert_eq!(to_mark_done.len(), last_pass.nremoved); - // The recovered sagas' completion futures should not yet be done. - let mut fut = summary.wait_for_recovered_sagas_to_finish(); - let is_ready = (&mut fut).now_or_never(); - assert!(is_ready.is_none()); - - // Simulate the last recovered sagas completing, then try again to wait - // for completion. This should complete. - notify.notify_waiters(); - let _ = fut.await; - logctx.cleanup_successful(); } } diff --git a/nexus/src/app/background/tasks/saga_recovery/status.rs b/nexus/src/app/background/tasks/saga_recovery/status.rs index 43d5cabf57..3148a2693e 100644 --- a/nexus/src/app/background/tasks/saga_recovery/status.rs +++ b/nexus/src/app/background/tasks/saga_recovery/status.rs @@ -7,7 +7,7 @@ use super::recovery; use chrono::{DateTime, Utc}; use omicron_common::api::external::Error; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use slog_error_chain::InlineErrorChain; use std::collections::VecDeque; use steno::SagaId; @@ -20,11 +20,11 @@ const N_SUCCESS_SAGA_HISTORY: usize = 128; /// Maximum number of recent failures to keep track of for debugging const N_FAILED_SAGA_HISTORY: usize = 128; -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct Report { pub recent_recoveries: DebuggingHistory, pub recent_failures: DebuggingHistory, - last_pass: LastPass, + pub last_pass: LastPass, } impl Report { @@ -62,27 +62,27 @@ impl Report { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub struct RecoverySuccess { pub time: DateTime, pub saga_id: SagaId, } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub struct RecoveryFailure { pub time: DateTime, pub saga_id: SagaId, pub message: String, } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub enum LastPass { NeverStarted, Failed { message: String }, Success(LastPassSuccess), } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub struct LastPassSuccess { pub nfound: usize, pub nrecovered: usize, @@ -107,7 +107,7 @@ impl LastPassSuccess { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub struct DebuggingHistory { size: usize, ring: VecDeque, diff --git a/nexus/src/app/background/tasks/saga_recovery/task.rs b/nexus/src/app/background/tasks/saga_recovery/task.rs index 1c0400f535..bfc174959b 100644 --- a/nexus/src/app/background/tasks/saga_recovery/task.rs +++ b/nexus/src/app/background/tasks/saga_recovery/task.rs @@ -9,6 +9,14 @@ // XXX-dap TODO-coverage everything here // XXX-dap omdb support // XXX-dap TODO-doc everything here +// XXX-dap sync with "main" +// XXX-dap move guts to separate crate? +// XXX-dap move debug thing to a trait? +// XXX-dap write up summary for PR, including the option of doing recovery in +// Steno with read methods on SecStore? also could have added purge option to +// SEC and then relied on Steno to know if a thing was alive. (can't do it +// without a purge that's invoked by recovery) +// XXX-dap when we consider a saga done, we should ask steno in order to confirm //! Saga recovery //! @@ -135,7 +143,7 @@ use super::recovery; use super::status; use crate::app::background::BackgroundTask; -use crate::app::sagas::ActionRegistry; +use crate::app::sagas::NexusSagaType; use crate::saga_interface::SagaContext; use crate::Nexus; use futures::future::BoxFuture; @@ -149,7 +157,24 @@ use std::collections::BTreeMap; use std::sync::Arc; use steno::SagaId; use tokio::sync::mpsc; -use uuid::Uuid; + +// XXX-dap TODO-doc +pub trait MakeSagaContext: Send + Sync { + type SagaType: steno::SagaType; + + fn make_saga_context( + &self, + log: slog::Logger, + ) -> Arc<::ExecContextType>; +} + +impl MakeSagaContext for Arc { + type SagaType = NexusSagaType; + fn make_saga_context(&self, log: slog::Logger) -> Arc> { + // XXX-dap dig up the comment about the double Arc + Arc::new(Arc::new(SagaContext::new(self.clone(), log))) + } +} /// Background task that recovers sagas assigned to this Nexus /// @@ -158,7 +183,7 @@ use uuid::Uuid; /// case when a saga has been re-assigned to this Nexus (e.g., because some /// other Nexus has been expunged) and to handle retries for sagas whose /// previous recovery failed. -pub struct SagaRecovery { +pub struct SagaRecovery { datastore: Arc, /// Unique identifier for this Saga Execution Coordinator /// @@ -166,9 +191,9 @@ pub struct SagaRecovery { sec_id: db::SecId, /// OpContext used for saga recovery saga_recovery_opctx: OpContext, - nexus: Arc, + maker: N, sec_client: Arc, - registry: Arc, + registry: Arc>, sagas_started_rx: mpsc::UnboundedReceiver, /// recovery state persisted between passes @@ -178,21 +203,21 @@ pub struct SagaRecovery { status: status::Report, } -impl SagaRecovery { +impl SagaRecovery { pub fn new( datastore: Arc, - sec_id: Uuid, + sec_id: db::SecId, saga_recovery_opctx: OpContext, - nexus: Arc, + maker: N, sec: Arc, - registry: Arc, + registry: Arc>, sagas_started_rx: mpsc::UnboundedReceiver, - ) -> SagaRecovery { + ) -> SagaRecovery { SagaRecovery { datastore, - sec_id: db::SecId(sec_id), + sec_id, saga_recovery_opctx, - nexus, + maker, sec_client: sec, registry, sagas_started_rx, @@ -201,15 +226,74 @@ impl SagaRecovery { } } + // XXX-dap TODO-doc like activate, but return information we can use to see + // what happened as well as completion future + async fn activate_internal( + &mut self, + opctx: &OpContext, + ) -> Option<(BoxFuture<'static, Result<(), Error>>, status::LastPassSuccess)> + { + let log = &opctx.log; + let datastore = &self.datastore; + + // Fetch the list of not-yet-finished sagas that are assigned to + // this Nexus instance. + let result = list_sagas_in_progress( + &self.saga_recovery_opctx, + datastore, + self.sec_id, + ) + .await; + + // Process any newly-created sagas, adding them to our set of sagas + // to ignore during recovery. We never want to try to recover a + // saga that was created within this Nexus's lifetime. + // + // We do this even if the previous step failed in order to avoid + // letting the channel queue build up. In practice, it shouldn't + // really matter. + // + // But given that we're doing this, it's critical that we do it + // *after* having fetched the candidate sagas from the database. + // It's okay if one of these newly-created sagas doesn't show up in + // the candidate list (because it hadn't actually started at the + // point where we fetched the candidate list). The reverse is not + // okay: if we did this step before fetching candidates, and a saga + // was immediately created and showed up in our candidate list, we'd + // erroneously conclude that it needed to be recovered when in fact + // it was already running. + self.rest_state.update_started_sagas(log, &mut self.sagas_started_rx); + + match result { + Ok(db_sagas) => { + let plan = recovery::Plan::new(log, &self.rest_state, db_sagas); + let (execution, future) = + self.recovery_execute(log, &plan).await; + self.rest_state.update_after_pass(&plan, &execution); + let last_pass_success = + status::LastPassSuccess::new(&plan, &execution); + self.status.update_after_pass(&plan, execution); + Some((future, last_pass_success)) + } + Err(error) => { + self.status.update_after_failure(&error); + None + } + } + } + async fn recovery_execute( &self, bgtask_log: &slog::Logger, plan: &recovery::Plan, - ) -> recovery::ExecutionSummary { + ) -> (recovery::ExecutionSummary, BoxFuture<'static, Result<(), Error>>) + { let mut builder = recovery::ExecutionSummaryBuilder::new(); + let mut completion_futures = Vec::new(); for (saga_id, saga) in plan.sagas_needing_recovery() { - let saga_log = self.nexus.log.new(o!( + // XXX-dap used to be self.nexus.log. Is this okay? + let saga_log = bgtask_log.new(o!( "saga_name" => saga.name.clone(), "saga_id" => saga_id.to_string(), )); @@ -217,7 +301,8 @@ impl SagaRecovery { builder.saga_recovery_start(*saga_id, saga_log.clone()); match self.recover_one_saga(bgtask_log, &saga_log, saga).await { Ok(completion_future) => { - builder.saga_recovery_success(*saga_id, completion_future); + builder.saga_recovery_success(*saga_id); + completion_futures.push(completion_future); } Err(error) => { // It's essential that we not bail out early just because we @@ -228,7 +313,12 @@ impl SagaRecovery { } } - builder.build() + let future = async { + futures::future::try_join_all(completion_futures).await?; + Ok(()) + } + .boxed(); + (builder.build(), future) } async fn recover_one_saga( @@ -249,14 +339,7 @@ impl SagaRecovery { "saga_id" => %saga_id, ); - // The extra `Arc` is a little ridiculous. The problem is that Steno - // expects (in `sec_client.saga_resume()`) that the user-defined context - // will be wrapped in an `Arc`. But we already use `Arc` - // for our type. Hence we need two Arcs. - let saga_context = Arc::new(Arc::new(SagaContext::new( - self.nexus.clone(), - saga_logger.clone(), - ))); + let saga_context = self.maker.make_saga_context(saga_logger.clone()); let saga_completion = self .sec_client .saga_resume( @@ -293,57 +376,15 @@ impl SagaRecovery { } } -impl BackgroundTask for SagaRecovery { +impl BackgroundTask for SagaRecovery { fn activate<'a>( &'a mut self, opctx: &'a OpContext, ) -> BoxFuture<'a, serde_json::Value> { async { - let log = &opctx.log; - let datastore = &self.datastore; - - // Fetch the list of not-yet-finished sagas that are assigned to - // this Nexus instance. - let result = list_sagas_in_progress( - &self.saga_recovery_opctx, - datastore, - self.sec_id, - ) - .await; - - // Process any newly-created sagas, adding them to our set of sagas - // to ignore during recovery. We never want to try to recover a - // saga that was created within this Nexus's lifetime. - // - // We do this even if the previous step failed in order to avoid - // letting the channel queue build up. In practice, it shouldn't - // really matter. - // - // But given that we're doing this, it's critical that we do it - // *after* having fetched the candidate sagas from the database. - // It's okay if one of these newly-created sagas doesn't show up in - // the candidate list (because it hadn't actually started at the - // point where we fetched the candidate list). The reverse is not - // okay: if we did this step before fetching candidates, and a saga - // was immediately created and showed up in our candidate list, we'd - // erroneously conclude that it needed to be recovered when in fact - // it was already running. - self.rest_state - .update_started_sagas(log, &mut self.sagas_started_rx); - - match result { - Ok(db_sagas) => { - let plan = - recovery::Plan::new(log, &self.rest_state, db_sagas); - let execution = self.recovery_execute(log, &plan).await; - self.rest_state.update_after_pass(&plan, &execution); - self.status.update_after_pass(&plan, execution); - } - Err(error) => { - self.status.update_after_failure(&error); - } - }; - + // We don't need the future that's returned by this function. + // That's only used by the test suite. + let _ = self.activate_internal(opctx).await; serde_json::to_value(&self.status).unwrap() } .boxed() @@ -377,3 +418,438 @@ async fn list_sagas_in_progress( }; result } + +#[cfg(test)] +mod test { + use super::*; + use nexus_auth::authn; + use nexus_db_queries::context::OpContext; + use nexus_db_queries::db::test_utils::UnpluggableCockroachDbSecStore; + use nexus_test_utils::{ + db::test_setup_database, resource_helpers::create_project, + }; + use nexus_test_utils_macros::nexus_test; + use nexus_types::internal_api::views::LastResult; + use omicron_test_utils::dev::{ + self, + poll::{wait_for_condition, CondCheckError}, + }; + use once_cell::sync::Lazy; + use pretty_assertions::assert_eq; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + use steno::{ + new_action_noop_undo, Action, ActionContext, ActionError, + ActionRegistry, DagBuilder, Node, SagaDag, SagaId, SagaName, + SagaResult, SagaType, SecClient, + }; + use uuid::Uuid; + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + // Returns a Cockroach DB, as well as a "datastore" interface (which is the + // one more frequently used by Nexus). + // + // The caller is responsible for calling "cleanup().await" on the returned + // CockroachInstance - we would normally wrap this in a drop method, but it + // is async. + async fn new_db( + log: &slog::Logger, + ) -> (dev::db::CockroachInstance, Arc) { + let db = test_setup_database(&log).await; + let cfg = nexus_db_queries::db::Config { url: db.pg_config().clone() }; + let pool = Arc::new(db::Pool::new(log, &cfg)); + let db_datastore = Arc::new( + db::DataStore::new(&log, Arc::clone(&pool), None).await.unwrap(), + ); + (db, db_datastore) + } + + // The following is our "saga-under-test". It's a simple two-node operation + // that tracks how many times it has been called, and provides a mechanism + // for detaching storage to simulate power failure (and meaningfully + // recover). + + #[derive(Debug)] + struct TestContext { + log: slog::Logger, + + // Storage, and instructions on whether or not to detach it + // when executing the first saga action. + storage: Arc, + do_unplug: AtomicBool, + + // Tracks of how many times each node has been reached. + n1_count: AtomicU32, + n2_count: AtomicU32, + } + + impl TestContext { + fn new( + log: &slog::Logger, + storage: Arc, + ) -> Self { + TestContext { + log: log.clone(), + storage, + do_unplug: AtomicBool::new(false), + + // Counters of how many times the nodes have been invoked. + n1_count: AtomicU32::new(0), + n2_count: AtomicU32::new(0), + } + } + } + + #[derive(Debug)] + struct TestOp; + impl SagaType for TestOp { + type ExecContextType = TestContext; + } + + impl MakeSagaContext for Arc { + type SagaType = TestOp; + fn make_saga_context(&self, _log: slog::Logger) -> Arc { + self.clone() + } + } + + static ACTION_N1: Lazy>> = + Lazy::new(|| new_action_noop_undo("n1_action", node_one)); + static ACTION_N2: Lazy>> = + Lazy::new(|| new_action_noop_undo("n2_action", node_two)); + + fn registry_create() -> Arc> { + let mut registry = ActionRegistry::new(); + registry.register(Arc::clone(&ACTION_N1)); + registry.register(Arc::clone(&ACTION_N2)); + Arc::new(registry) + } + + fn saga_object_create() -> Arc { + let mut builder = DagBuilder::new(SagaName::new("test-saga")); + builder.append(Node::action("n1_out", "NodeOne", ACTION_N1.as_ref())); + builder.append(Node::action("n2_out", "NodeTwo", ACTION_N2.as_ref())); + let dag = builder.build().unwrap(); + Arc::new(SagaDag::new(dag, serde_json::Value::Null)) + } + + async fn node_one(ctx: ActionContext) -> Result { + let uctx = ctx.user_data(); + uctx.n1_count.fetch_add(1, Ordering::SeqCst); + info!(&uctx.log, "ACTION: node_one"); + // If "do_unplug" is true, we detach storage. + // + // This prevents the SEC from successfully recording that + // this node completed, and acts like a crash. + if uctx.do_unplug.load(Ordering::SeqCst) { + info!(&uctx.log, "Unplugged storage"); + uctx.storage.set_unplug(true); + } + Ok(1) + } + + async fn node_two(ctx: ActionContext) -> Result { + let uctx = ctx.user_data(); + uctx.n2_count.fetch_add(1, Ordering::SeqCst); + info!(&uctx.log, "ACTION: node_two"); + Ok(2) + } + + // Helper function for setting up storage, SEC, and a test context object. + fn create_storage_sec_and_context( + log: &slog::Logger, + db_datastore: Arc, + sec_id: db::SecId, + ) -> (Arc, SecClient, Arc) + { + let storage = Arc::new(UnpluggableCockroachDbSecStore::new( + sec_id, + db_datastore, + log.new(o!("component" => "SecStore")), + )); + let sec_client = + steno::sec(log.new(o!("component" => "SEC")), storage.clone()); + let uctx = Arc::new(TestContext::new(&log, storage.clone())); + (storage, sec_client, uctx) + } + + // Helper function to run a basic saga that we can use to see which nodes + // ran and how many times. + async fn run_test_saga( + uctx: &Arc, + sec_client: &SecClient, + ) -> (SagaId, SagaResult) { + let saga_id = SagaId(Uuid::new_v4()); + let future = sec_client + .saga_create( + saga_id, + uctx.clone(), + saga_object_create(), + registry_create(), + ) + .await + .unwrap(); + sec_client.saga_start(saga_id).await.unwrap(); + (saga_id, future.await) + } + + // Tests the basic case: recovery of a saga that appears (from its log) to + // be still running, and which is not currently running already. In Nexus, + // this corresponds to the basic case where a saga was created in a previous + // Nexus lifetime and the current process knows nothing about it. + #[tokio::test] + async fn test_failure_during_saga_can_be_recovered() { + // Test setup + let logctx = + dev::test_setup_log("test_failure_during_saga_can_be_recovered"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let (storage, sec_client, uctx) = + create_storage_sec_and_context(&log, db_datastore.clone(), sec_id); + let sec_log = log.new(o!("component" => "SEC")); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + let saga_recovery_opctx = + opctx.child_with_authn(authn::Context::internal_saga_recovery()); + + // In order to recover a partially-created saga, we need a partial log. + // To create one, we'll run the saga normally, but configure it to + // unplug the datastore partway through so that the later log entries + // don't get written. Note that the unplugged datastore completes + // operations successfully so that the saga will appeaer to complete + // successfully. + uctx.do_unplug.store(true, Ordering::SeqCst); + let (_, result) = run_test_saga(&uctx, &sec_client).await; + let output = result.kind.unwrap(); + assert_eq!(output.lookup_node_output::("n1_out").unwrap(), 1); + assert_eq!(output.lookup_node_output::("n2_out").unwrap(), 2); + assert_eq!(uctx.n1_count.load(Ordering::SeqCst), 1); + assert_eq!(uctx.n2_count.load(Ordering::SeqCst), 1); + + // Simulate a crash by terminating the SEC and creating a new one using + // the same storage system. + // + // Update uctx to prevent the storage system from detaching again. + sec_client.shutdown().await; + let sec_client = steno::sec(sec_log, storage.clone()); + uctx.storage.set_unplug(false); + uctx.do_unplug.store(false, Ordering::SeqCst); + + // Use our background task to recover the saga. Observe that it re-runs + // operations and completes. + let sec_client = Arc::new(sec_client); + let (_, sagas_started_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut task = SagaRecovery::new( + db_datastore.clone(), + sec_id, + saga_recovery_opctx, + uctx.clone(), + sec_client.clone(), + registry_create(), + sagas_started_rx, + ); + + let Some((completion_future, last_pass_success)) = + task.activate_internal(&opctx).await + else { + panic!("saga recovery failed"); + }; + + assert_eq!(last_pass_success.nrecovered, 1); + assert_eq!(last_pass_success.nfailed, 0); + assert_eq!(last_pass_success.nskipped, 0); + + // Wait for the recovered saga to complete and make sure it re-ran the + // operations that we expected it to. + completion_future + .await + .expect("recovered saga to complete successfully"); + assert_eq!(uctx.n1_count.load(Ordering::SeqCst), 2); + assert_eq!(uctx.n2_count.load(Ordering::SeqCst), 2); + + // Test cleanup + drop(task); + let sec_client = Arc::try_unwrap(sec_client).unwrap(); + sec_client.shutdown().await; + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + // Tests that a saga that has finished (as reflected in the database state) + // does not get recovered. + #[tokio::test] + async fn test_successful_saga_does_not_replay_during_recovery() { + // Test setup + let logctx = dev::test_setup_log( + "test_successful_saga_does_not_replay_during_recovery", + ); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let (storage, sec_client, uctx) = + create_storage_sec_and_context(&log, db_datastore.clone(), sec_id); + let sec_log = log.new(o!("component" => "SEC")); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + let saga_recovery_opctx = + opctx.child_with_authn(authn::Context::internal_saga_recovery()); + + // Create and start a saga, which we expect to complete successfully. + let (_, result) = run_test_saga(&uctx, &sec_client).await; + let output = result.kind.unwrap(); + assert_eq!(output.lookup_node_output::("n1_out").unwrap(), 1); + assert_eq!(output.lookup_node_output::("n2_out").unwrap(), 2); + assert_eq!(uctx.n1_count.load(Ordering::SeqCst), 1); + assert_eq!(uctx.n2_count.load(Ordering::SeqCst), 1); + + // Simulate a crash by terminating the SEC and creating a new one using + // the same storage system. + sec_client.shutdown().await; + let sec_client = steno::sec(sec_log, storage.clone()); + + // Go through recovery. We should not find or recover this saga. + let sec_client = Arc::new(sec_client); + let (_, sagas_started_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut task = SagaRecovery::new( + db_datastore.clone(), + sec_id, + saga_recovery_opctx, + uctx.clone(), + sec_client.clone(), + registry_create(), + sagas_started_rx, + ); + + let Some((_, last_pass_success)) = task.activate_internal(&opctx).await + else { + panic!("saga recovery failed"); + }; + + assert_eq!(last_pass_success.nrecovered, 0); + assert_eq!(last_pass_success.nfailed, 0); + assert_eq!(last_pass_success.nskipped, 0); + + // The nodes should not have been replayed. + assert_eq!(uctx.n1_count.load(Ordering::SeqCst), 1); + assert_eq!(uctx.n2_count.load(Ordering::SeqCst), 1); + + // Test cleanup + drop(task); + let sec_client = Arc::try_unwrap(sec_client).unwrap(); + sec_client.shutdown().await; + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + // Verify the plumbing that exists between regular saga creation and saga + // recovery. + #[nexus_test(server = crate::Server)] + async fn test_nexus_recovery(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.server_context().nexus; + + // This is tricky to do. We're trying to make sure the plumbing is + // hooked up so that when a saga is created, the saga recovery task + // learns about it. The purpose of that plumbing is to ensure that we + // don't try to recover a task that's already running. It'd be ideal to + // test that directly, but we can't easily control execution well enough + // to ensure that the background task runs while the saga is still + // running. However, even if we miss it (i.e., the background task only + // runs after the saga completes successfully), there's a side effect we + // can look for: the task should report the completed saga as "maybe + // done". On the next activation, it should report that it's removed a + // saga from its internal state (because it saw that it was done). + + // Wait for the task to run once. + let driver = nexus.background_tasks_driver.get().unwrap(); + let task_name = driver + .tasks() + .find(|task_name| task_name.as_str() == "saga_recovery") + .expect("expected background task called \"saga_recovery\""); + let first_completed = wait_for_condition( + || async { + let status = driver.task_status(task_name); + let LastResult::Completed(completed) = status.last else { + return Err(CondCheckError::<()>::NotYet); + }; + Ok(completed) + }, + &std::time::Duration::from_millis(250), + &std::time::Duration::from_secs(15), + ) + .await + .unwrap(); + + // Make sure that it didn't find anything to do. + let status_raw = first_completed.details; + let status: status::Report = + serde_json::from_value(status_raw).unwrap(); + let status::LastPass::Success(last_pass_success) = status.last_pass + else { + panic!("wrong last pass variant"); + }; + assert_eq!(last_pass_success.nfound, 0); + assert_eq!(last_pass_success.nrecovered, 0); + assert_eq!(last_pass_success.nfailed, 0); + assert_eq!(last_pass_success.nskipped, 0); + + // Now kick off a saga -- any saga will do. We don't even care if it + // works or not. In practice, it will have finished by the time this + // call completes. + let _ = create_project(&cptestctx.external_client, "test").await; + + // Activate the background task. Wait for one pass. + nexus.background_tasks.task_saga_recovery.activate(); + let _ = wait_for_condition( + || async { + let status = driver.task_status(task_name); + let LastResult::Completed(completed) = status.last else { + panic!("task had completed before; how has it not now?"); + }; + if completed.iteration <= first_completed.iteration { + return Err(CondCheckError::<()>::NotYet); + } + Ok(completed) + }, + &std::time::Duration::from_millis(250), + &std::time::Duration::from_secs(15), + ) + .await + .unwrap(); + + // Activate it again. This should be enough for it to report having + // removed a saga from its state. + nexus.background_tasks.task_saga_recovery.activate(); + let last_pass_success = wait_for_condition( + || async { + let status = driver.task_status(task_name); + let LastResult::Completed(completed) = status.last else { + panic!("task had completed before; how has it not now?"); + }; + + let status: status::Report = + serde_json::from_value(completed.details).unwrap(); + let status::LastPass::Success(last_pass_success) = + status.last_pass + else { + panic!("wrong last pass variant"); + }; + if last_pass_success.nremoved > 0 { + return Ok(last_pass_success); + } + + Err(CondCheckError::<()>::NotYet) + }, + &std::time::Duration::from_millis(250), + &std::time::Duration::from_secs(15), + ) + .await + .unwrap(); + + assert!(last_pass_success.nremoved > 0); + } +} diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index d278fb5600..17f43b4950 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -50,7 +50,7 @@ pub mod common_storage; mod test_helpers; #[derive(Debug)] -pub(crate) struct NexusSagaType; +pub struct NexusSagaType; impl steno::SagaType for NexusSagaType { type ExecContextType = Arc; } diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index 5a828ff0ec..aef7044408 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -13,7 +13,7 @@ use std::sync::Arc; // TODO-design Should this be the same thing as ServerContext? It's // very analogous, but maybe there's utility in having separate views for the // HTTP server and sagas. -pub(crate) struct SagaContext { +pub struct SagaContext { nexus: Arc, log: Logger, }