diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a8c863298e..98948f2405 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -385,6 +385,8 @@ pub struct BackgroundTaskConfig { pub v2p_mapping_propagation: V2PMappingPropagationConfig, /// configuration for abandoned VMM reaper task pub abandoned_vmm_reaper: AbandonedVmmReaperConfig, + /// configuration for saga recovery task + pub saga_recovery: SagaRecoveryConfig, } #[serde_as] @@ -566,6 +568,14 @@ pub struct AbandonedVmmReaperConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct SagaRecoveryConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct RegionReplacementDriverConfig { @@ -816,6 +826,7 @@ mod test { service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 + saga_recovery.period_secs = 60 [default_region_allocation_strategy] type = "random" seed = 0 @@ -962,7 +973,10 @@ mod test { }, abandoned_vmm_reaper: AbandonedVmmReaperConfig { period_secs: Duration::from_secs(60), - } + }, + saga_recovery: SagaRecoveryConfig { + period_secs: Duration::from_secs(60), + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1035,6 +1049,7 @@ mod test { service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 + saga_recovery.period_secs = 60 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index c8c8860901..5884e149aa 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -50,7 +50,7 @@ pub use config::Config; pub use datastore::DataStore; pub use on_conflict_ext::IncompleteOnConflictExt; pub use pool::{DbConnection, Pool}; -pub use saga_recovery::{recover, CompletionTask, RecoveryTask}; +pub use saga_recovery::{recover, CompletionTask}; pub use saga_types::SecId; pub use sec_store::CockroachDbSecStore; diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index e85011f60f..851d228ddc 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -18,33 +18,11 @@ use futures::{future::BoxFuture, TryFutureExt}; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; -use omicron_common::backoff::retry_notify; -use omicron_common::backoff::retry_policy_internal_service; -use omicron_common::backoff::BackoffError; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -/// Result type of a [`RecoveryTask`]. -pub type RecoveryResult = Result; - -/// A future which completes once sagas have been loaded and resumed. -/// Note that this does not necessarily mean the sagas have completed -/// execution. -/// -/// Returns a Result of either: -/// - A [`CompletionTask`] to track the completion of the resumed sagas, or -/// - An [`Error`] encountered when attempting to load and resume sagas. -pub struct RecoveryTask(BoxFuture<'static, RecoveryResult>); - -impl Future for RecoveryTask { - type Output = RecoveryResult; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.get_mut().0).poll(cx) - } -} - /// Result type from a [`CompletionTask`]. pub type CompletionResult = Result<(), Error>; @@ -58,117 +36,93 @@ impl Future for CompletionTask { } } -/// Starts an asynchronous task to recover sagas (as after a crash or restart) +/// Kick off saga recovery (as after a crash or restart) /// -/// More specifically, this task queries the database to list all uncompleted -/// sagas that are assigned to SEC `sec_id` and for each one: +/// More specifically, this function queries the database to list all +/// uncompleted sagas that are assigned to SEC `sec_id` and for each one: /// /// * loads the saga DAG and log from `datastore` /// * uses [`steno::SecClient::saga_resume`] to prepare to resume execution of /// the saga using the persistent saga log /// * resumes execution of each saga /// -/// The returned [`RecoveryTask`] completes once all sagas have been loaded -/// and resumed, and itself returns a [`CompletionTask`] which completes -/// when those resumed sagas have finished. -pub fn recover( - opctx: OpContext, +/// The function completes once all sagas have been loaded and resumed. The +/// itself returns a [`CompletionTask`] that completes when those resumed sagas +/// have finished. +pub async fn recover( + opctx: &OpContext, sec_id: db::SecId, - uctx: Arc, - datastore: Arc, - sec_client: Arc, + make_context: &(dyn Fn(&slog::Logger) -> Arc + + Send + + Sync), + datastore: &db::DataStore, + sec_client: &steno::SecClient, registry: Arc>, -) -> RecoveryTask +) -> Result where T: steno::SagaType, { - let join_handle = tokio::spawn(async move { - info!(&opctx.log, "start saga recovery"); - - // We perform the initial list of sagas using a standard retry policy. - // We treat all errors as transient because there's nothing we can do - // about any of them except try forever. As a result, we never expect - // an error from the overall operation. - // TODO-monitoring we definitely want a way to raise a big red flag if - // saga recovery is not completing. - // TODO-robustness It would be better to retry the individual database - // operations within this operation than retrying the overall operation. - // As this is written today, if the listing requires a bunch of pages - // and the operation fails partway through, we'll re-fetch all the pages - // we successfully fetched before. If the database is overloaded and - // only N% of requests are completing, the probability of this operation - // succeeding decreases considerably as the number of separate queries - // (pages) goes up. We'd be much more likely to finish the overall - // operation if we didn't throw away the results we did get each time. - let found_sagas = retry_notify( - retry_policy_internal_service(), - || async { - list_unfinished_sagas(&opctx, &datastore, &sec_id) - .await - .map_err(BackoffError::transient) - }, - |error, duration| { - warn!( - &opctx.log, - "failed to list sagas (will retry after {:?}): {:#}", - duration, - error - ) - }, + // XXX-dap who else was calling this function and is it okay that we do this + // work "synchronously" now? + info!(&opctx.log, "start saga recovery"); + + // We do not retry any database operations here because we're being invoked + // by a background task that will be re-activated some time later and pick + // up anything we missed. + // TODO-monitoring we definitely want a way to raise a big red flag if + // saga recovery is not completing. + let found_sagas = list_unfinished_sagas(&opctx, datastore, &sec_id).await?; + + info!(&opctx.log, "listed sagas ({} total)", found_sagas.len()); + + // Load and resume all sagas in serial. Too much parallelism here could + // overload the database. It wouldn't buy us much anyway to parallelize + // this since these operations should generally be quick, and there + // shouldn't be too many sagas outstanding, and Nexus has already crashed so + // they've experienced a bit of latency already. + let mut completion_futures = Vec::with_capacity(found_sagas.len()); + for saga in found_sagas { + // TODO-debugging want visibility into sagas that we cannot recover for + // whatever reason + let saga_id: steno::SagaId = saga.id.into(); + let saga_name = saga.name.clone(); + let saga_logger = opctx.log.new(o!( + "saga_name" => saga_name, + "saga_id" => saga_id.to_string() + )); + info!(&saga_logger, "recovering saga: start"); + match recover_saga( + &opctx, + &saga_logger, + make_context, + datastore, + sec_client, + Arc::clone(®istry), + saga, ) .await - .unwrap(); - - info!(&opctx.log, "listed sagas ({} total)", found_sagas.len()); - - let recovery_futures = found_sagas.into_iter().map(|saga| async { - // TODO-robustness We should put this into a retry loop. We may - // also want to take any failed sagas and put them at the end of the - // queue. It shouldn't really matter, in that the transient - // failures here are likely to affect recovery of all sagas. - // However, it's conceivable we misclassify a permanent failure as a - // transient failure, or that a transient failure is more likely to - // affect some sagas than others (e.g, data on a different node, or - // it has a larger log that requires more queries). To avoid one - // bad saga ruining the rest, we should try to recover the rest - // before we go back to one that's failed. - // TODO-debugging want visibility into "abandoned" sagas - let saga_id: steno::SagaId = saga.id.into(); - recover_saga( - &opctx, - Arc::clone(&uctx), - &datastore, - &sec_client, - Arc::clone(®istry), - saga, - ) - .map_err(|error| { - warn!( - &opctx.log, - "failed to recover saga {}: {:#}", saga_id, error + { + Ok(completion_future) => { + info!(&saga_logger, "recovered saga"); + completion_futures.push(completion_future); + } + Err(error) => { + // It's essential that we not bail out early just because we hit + // an error here. We want to recover all the sagas that we can. + error!( + &saga_logger, + "failed to recover saga"; + slog_error_chain::InlineErrorChain::new(&error), ); - error - }) - .await - }); - - let mut completion_futures = Vec::with_capacity(recovery_futures.len()); - // Loads and resumes all sagas in serial. - for recovery_future in recovery_futures { - let saga_complete_future = recovery_future.await?; - completion_futures.push(saga_complete_future); + } } - // Returns a future that awaits the completion of all resumed sagas. - Ok(CompletionTask(Box::pin(async move { - futures::future::try_join_all(completion_futures).await?; - Ok(()) - }))) - }); - - RecoveryTask(Box::pin(async move { - // Unwraps join-related errors. - join_handle.await.unwrap() - })) + } + + // Returns a future that awaits the completion of all resumed sagas. + Ok(CompletionTask(Box::pin(async move { + futures::future::try_join_all(completion_futures).await?; + Ok(()) + }))) } /// Queries the database to return a list of uncompleted sagas assigned to SEC @@ -233,7 +187,10 @@ async fn list_unfinished_sagas( /// and does not need to be polled. async fn recover_saga<'a, T>( opctx: &'a OpContext, - uctx: Arc, + saga_logger: &slog::Logger, + make_context: &(dyn Fn(&slog::Logger) -> Arc + + Send + + Sync), datastore: &'a db::DataStore, sec_client: &'a steno::SecClient, registry: Arc>, @@ -246,27 +203,11 @@ where T: steno::SagaType, { let saga_id: steno::SagaId = saga.id.into(); - let saga_name = saga.name.clone(); - trace!(opctx.log, "recovering saga: start"; - "saga_id" => saga_id.to_string(), - "saga_name" => saga_name.clone(), - ); - let log_events = load_saga_log(&opctx, datastore, &saga).await?; - trace!( - opctx.log, - "recovering saga: loaded log"; - "saga_id" => ?saga_id, - "saga_name" => saga_name.clone() - ); + trace!(&saga_logger, "recovering saga: loaded log"); + let uctx = make_context(&saga_logger); let saga_completion = sec_client - .saga_resume( - saga_id, - Arc::clone(&uctx), - saga.saga_dag, - registry, - log_events, - ) + .saga_resume(saga_id, uctx, saga.saga_dag, registry, log_events) .await .map_err(|error| { // TODO-robustness We want to differentiate between retryable and @@ -276,6 +217,7 @@ where error )) })?; + trace!(&saga_logger, "recovering saga: starting the saga"); sec_client.saga_start(saga_id).await.map_err(|error| { Error::internal_error(&format!("failed to start saga: {:#}", error)) })?; @@ -519,11 +461,11 @@ mod test { // Recover the saga, observing that it re-runs operations and completes. let sec_client = Arc::new(sec_client); recover( - opctx, + &opctx, sec_id, - uctx.clone(), - db_datastore, - sec_client.clone(), + &|_| uctx.clone(), + &db_datastore, + &sec_client, registry_create(), ) .await // Await the loading and resuming of the sagas @@ -584,11 +526,11 @@ mod test { // Recover the saga, observing that it does not replay the nodes. let sec_client = Arc::new(sec_client); recover( - opctx, + &opctx, sec_id, - uctx.clone(), - db_datastore, - sec_client.clone(), + &|_| uctx.clone(), + &db_datastore, + &sec_client, registry_create(), ) .await diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index 407f5479d5..06c726a602 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -120,6 +120,7 @@ instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 +saga_recovery.period_secs = 600 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 3c66d3242b..a5573c77cc 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -105,6 +105,7 @@ use super::tasks::phantom_disks; use super::tasks::physical_disk_adoption; use super::tasks::region_replacement; use super::tasks::region_replacement_driver; +use super::tasks::saga_recovery; use super::tasks::service_firewall_rules; use super::tasks::sync_service_zone_nat::ServiceZoneNatTracker; use super::tasks::sync_switch_configuration::SwitchPortSettingsManager; @@ -114,6 +115,8 @@ use super::Activator; use super::Driver; use crate::app::oximeter::PRODUCER_LEASE_DURATION; use crate::app::saga::StartSaga; +use crate::app::sagas::ActionRegistry; +use crate::Nexus; use nexus_config::BackgroundTaskConfig; use nexus_config::DnsTasksConfig; use nexus_db_model::DnsGroup; @@ -152,6 +155,7 @@ pub struct BackgroundTasks { pub task_service_firewall_propagation: Activator, pub task_abandoned_vmm_reaper: Activator, pub task_vpc_route_manager: Activator, + pub task_saga_recovery: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -229,6 +233,7 @@ impl BackgroundTasksInitializer { task_service_firewall_propagation: Activator::new(), task_abandoned_vmm_reaper: Activator::new(), task_vpc_route_manager: Activator::new(), + task_saga_recovery: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -243,22 +248,20 @@ impl BackgroundTasksInitializer { /// /// This function will wire up the `Activator`s in `background_tasks` to the /// corresponding tasks once they've been started. - #[allow(clippy::too_many_arguments)] pub fn start( self, background_tasks: &'_ BackgroundTasks, - opctx: OpContext, - datastore: Arc, - config: BackgroundTaskConfig, - rack_id: Uuid, - nexus_id: Uuid, - resolver: internal_dns::resolver::Resolver, - sagas: Arc, - producer_registry: ProducerRegistry, + args: BackgroundTasksData, ) -> Driver { let mut driver = self.driver; - let opctx = &opctx; - let producer_registry = &producer_registry; + let opctx = &args.opctx; + let datastore = args.datastore; + let config = args.config; + let rack_id = args.rack_id; + let nexus_id = args.nexus_id; + let resolver = args.resolver; + let sagas = args.saga_starter; + let producer_registry = &args.producer_registry; // This "let" construction helps catch mistakes where someone forgets to // wire up an activator to its corresponding background task. @@ -288,6 +291,7 @@ impl BackgroundTasksInitializer { task_service_firewall_propagation, task_abandoned_vmm_reaper, task_vpc_route_manager, + task_saga_recovery, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -640,17 +644,70 @@ impl BackgroundTasksInitializer { by their instances", period: config.abandoned_vmm_reaper.period_secs, task_impl: Box::new(abandoned_vmm_reaper::AbandonedVmmReaper::new( - datastore, + datastore.clone(), )), opctx: opctx.child(BTreeMap::new()), watchers: vec![], activator: task_abandoned_vmm_reaper, }); + // Background task: saga recovery + { + let task_impl = Box::new(saga_recovery::SagaRecovery::new( + datastore, + args.nexus_id, + args.saga_recovery_opctx, + args.saga_recovery_nexus, + args.saga_recovery_sec, + args.saga_recovery_registry, + )); + + driver.register(TaskDefinition { + name: "saga_recovery", + description: "recovers sagas assigned to this Nexus", + period: config.saga_recovery.period_secs, + task_impl, + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_saga_recovery, + }); + } + driver } } +pub struct BackgroundTasksData { + /// root `OpContext` used for background tasks + pub opctx: OpContext, + /// handle to `DataStore`, provided directly to many background tasks + pub datastore: Arc, + /// background task configuration + pub config: BackgroundTaskConfig, + /// rack identifier + pub rack_id: Uuid, + /// nexus identifier + pub nexus_id: Uuid, + /// internal DNS DNS resolver, used when tasks need to contact other + /// internal services + pub resolver: internal_dns::resolver::Resolver, + /// handle to saga subsystem for starting sagas + pub saga_starter: Arc, + /// Oximeter producer registry (for metrics) + pub producer_registry: ProducerRegistry, + + /// `OpContext` used for carrying out saga recovery + /// + /// This may have fewer privileges than `opctx`. + pub saga_recovery_opctx: OpContext, + /// handle to `Nexus`, used to construct `SagaContext`s for recovered sagas + pub saga_recovery_nexus: Arc, + /// handle to Steno SEC client, used to recover sagas + pub saga_recovery_sec: Arc, + /// Steno (saga) action registry + pub saga_recovery_registry: Arc, +} + /// Starts the three DNS-propagation-related background tasks for either /// internal or external DNS (depending on the arguments) #[allow(clippy::too_many_arguments)] diff --git a/nexus/src/app/background/mod.rs b/nexus/src/app/background/mod.rs index 1bd7a323c3..160bcb39af 100644 --- a/nexus/src/app/background/mod.rs +++ b/nexus/src/app/background/mod.rs @@ -136,6 +136,7 @@ mod tasks; pub use driver::Activator; pub use driver::Driver; pub use init::BackgroundTasks; +pub use init::BackgroundTasksData; pub use init::BackgroundTasksInitializer; use futures::future::BoxFuture; diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index cb2ab46c2a..08b6f990ed 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -22,6 +22,7 @@ pub mod phantom_disks; pub mod physical_disk_adoption; pub mod region_replacement; pub mod region_replacement_driver; +pub mod saga_recovery; pub mod service_firewall_rules; pub mod sync_service_zone_nat; pub mod sync_switch_configuration; diff --git a/nexus/src/app/background/tasks/saga_recovery.rs b/nexus/src/app/background/tasks/saga_recovery.rs new file mode 100644 index 0000000000..784f0bbfcb --- /dev/null +++ b/nexus/src/app/background/tasks/saga_recovery.rs @@ -0,0 +1,112 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for recovering sagas assigned to this Nexus + +use crate::app::background::BackgroundTask; +use crate::app::sagas::ActionRegistry; +use crate::saga_interface::SagaContext; +use crate::Nexus; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use serde_json::json; +use std::sync::Arc; +use uuid::Uuid; + +/// Background task that recovers sagas assigned to this Nexus +/// +/// Normally, this task only does anything of note once, when Nexus starts up. +/// However, it runs periodically and can be activated explicitly for the rare +/// case when a saga has been re-assigned to this Nexus (e.g., because some +/// other Nexus has been expunged). +pub struct SagaRecovery { + datastore: Arc, + /// Unique identifier for this Saga Execution Coordinator + /// + /// This always matches the Nexus id. + sec_id: nexus_db_queries::db::SecId, + /// OpContext used for saga recovery + saga_recovery_opctx: OpContext, + nexus: Arc, + sec: Arc, + registry: Arc, +} + +impl SagaRecovery { + pub fn new( + datastore: Arc, + sec_id: Uuid, + saga_recovery_opctx: OpContext, + nexus: Arc, + sec: Arc, + registry: Arc, + ) -> SagaRecovery { + SagaRecovery { + datastore, + sec_id: nexus_db_queries::db::SecId(sec_id), + saga_recovery_opctx, + nexus, + sec, + registry, + } + } +} + +impl BackgroundTask for SagaRecovery { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + // XXX-dap We need to modify `recover_saga()` to handle the case + // that we've already recovered this saga or we're already running + // it. This should include cases where: + // - we recovered in a previous run + // - we already started it (and maybe finished and are trying to + // write out saga log entries reflecting that) + // - including the case where it actually finishes in between when + // we listed it and when we tried to recover it -- this is + // important since the SEC might not remember it any more! + // - we want tests for all of this, which we might be able to + // simulate by manually messing with the same SecClient + // XXX-dap it'd be nice if this function returned: + // - a list of sagas that it successfully recovered + // - a list of sagas whose recovery failed (for a reason other than + // "we were already working on it") + // Then this background task's status could report: + // - the N most recently-recovered sagas + // - up to M sagas we failed to recover, and why + // - how many we recovered in the last go-around + // XXX-dap review logging around all of this + + nexus_db_queries::db::recover( + &self.saga_recovery_opctx, + self.sec_id, + &|saga_logger| { + // The extra `Arc` is a little ridiculous. The problem is + // that Steno expects (in `sec_client.saga_resume()`) that + // the user-defined context will be wrapped in an `Arc`. + // But we already use `Arc` for our type. + // Hence we need two Arcs. + Arc::new(Arc::new(SagaContext::new( + self.nexus.clone(), + saga_logger.clone(), + ))) + }, + &self.datastore, + &self.sec, + self.registry.clone(), + ) + .await; + + // XXX-dap + serde_json::Value::Null + } + .boxed() + } +} + +// XXX-dap TODO-coverage diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index cee62f1107..3b494298ca 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -10,7 +10,6 @@ use crate::app::oximeter::LazyTimeseriesClient; use crate::populate::populate_start; use crate::populate::PopulateArgs; use crate::populate::PopulateStatus; -use crate::saga_interface::SagaContext; use crate::DropshotServer; use ::oximeter::types::ProducerRegistry; use anyhow::anyhow; @@ -91,6 +90,7 @@ pub(crate) mod sagas; pub(crate) use nexus_db_queries::db::queries::disk::MAX_DISKS_PER_INSTANCE; +use crate::app::background::BackgroundTasksData; use nexus_db_model::AllSchemaVersions; pub(crate) use nexus_db_model::MAX_NICS_PER_INSTANCE; @@ -135,9 +135,6 @@ pub struct Nexus { /// saga execution coordinator sagas: Arc, - /// Task representing completion of recovered Sagas - recovery_task: std::sync::Mutex>, - /// External dropshot servers external_server: std::sync::Mutex>, @@ -420,7 +417,6 @@ impl Nexus { db_datastore: Arc::clone(&db_datastore), authz: Arc::clone(&authz), sagas, - recovery_task: std::sync::Mutex::new(None), external_server: std::sync::Mutex::new(None), techport_external_server: std::sync::Mutex::new(None), internal_server: std::sync::Mutex::new(None), @@ -462,26 +458,12 @@ impl Nexus { // TODO-cleanup all the extra Arcs here seems wrong let nexus = Arc::new(nexus); nexus.sagas.set_nexus(nexus.clone()); - let opctx = OpContext::for_background( + let saga_recovery_opctx = OpContext::for_background( log.new(o!("component" => "SagaRecoverer")), Arc::clone(&authz), authn::Context::internal_saga_recovery(), Arc::clone(&db_datastore) as Arc, ); - let saga_logger = nexus.log.new(o!("saga_type" => "recovery")); - let recovery_task = db::recover( - opctx, - my_sec_id, - Arc::new(Arc::new(SagaContext::new( - Arc::clone(&nexus), - saga_logger, - ))), - Arc::clone(&db_datastore), - Arc::clone(&sec_client), - sagas::ACTION_REGISTRY.clone(), - ); - - *nexus.recovery_task.lock().unwrap() = Some(recovery_task); // Wait to start background tasks until after the populate step // finishes. Among other things, the populate step installs role @@ -508,14 +490,21 @@ impl Nexus { let driver = background_tasks_initializer.start( &task_nexus.background_tasks, - background_ctx, - db_datastore, - task_config.pkg.background_tasks, - rack_id, - task_config.deployment.id, - resolver, - task_nexus.sagas.clone(), - task_registry, + BackgroundTasksData { + opctx: background_ctx, + datastore: db_datastore, + config: task_config.pkg.background_tasks, + rack_id, + nexus_id: task_config.deployment.id, + resolver, + saga_starter: task_nexus.sagas.clone(), + producer_registry: task_registry, + + saga_recovery_opctx, + saga_recovery_nexus: task_nexus.clone(), + saga_recovery_sec: sec_client.clone(), + saga_recovery_registry: sagas::ACTION_REGISTRY.clone(), + }, ); if let Err(_) = task_nexus.background_tasks_driver.set(driver) { diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index f90a035de6..f01f54dfb6 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -119,6 +119,7 @@ instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 +saga_recovery.period_secs = 600 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index e63eb411c3..0f20fc70a3 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -61,6 +61,7 @@ service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 instance_watcher.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 +saga_recovery.period_secs = 600 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index ced1da17b3..6f0b9fc22b 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -61,6 +61,7 @@ service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 instance_watcher.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 +saga_recovery.period_secs = 600 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds.