diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 5af75fac8f..6d7152d9f7 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -34,6 +34,7 @@ use nexus_saga_recovery::LastPass; use nexus_types::deployment::Blueprint; use nexus_types::internal_api::background::LookupRegionPortStatus; use nexus_types::internal_api::background::RegionReplacementDriverStatus; +use nexus_types::internal_api::background::RegionSnapshotReplacementFinishStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementGarbageCollectStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; @@ -1612,6 +1613,30 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { } } } + } else if name == "region_snapshot_replacement_finish" { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + + Ok(status) => { + println!( + " total records transitioned to done: {}", + status.records_set_to_done.len(), + ); + for line in &status.records_set_to_done { + println!(" > {line}"); + } + + println!(" errors: {}", status.errors.len()); + for line in &status.errors { + println!(" > {line}"); + } + } + } } else { println!( "warning: unknown background task: {:?} \ diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 2774a5d734..c57a9c9dce 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -127,6 +127,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_finish" + complete a region snapshot replacement if all the steps are done + + task: "region_snapshot_replacement_garbage_collection" clean up all region snapshot replacement step volumes @@ -289,6 +293,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_finish" + complete a region snapshot replacement if all the steps are done + + task: "region_snapshot_replacement_garbage_collection" clean up all region snapshot replacement step volumes @@ -438,6 +446,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_finish" + complete a region snapshot replacement if all the steps are done + + task: "region_snapshot_replacement_garbage_collection" clean up all region snapshot replacement step volumes diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 757b4e8888..e7720dd12c 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -343,6 +343,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_finish" + complete a region snapshot replacement if all the steps are done + + task: "region_snapshot_replacement_garbage_collection" clean up all region snapshot replacement step volumes @@ -594,6 +598,14 @@ task: "region_replacement_driver" number of region replacement finish sagas started ok: 0 number of errors: 0 +task: "region_snapshot_replacement_finish" + configured period: every s + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + total records transitioned to done: 0 + errors: 0 + task: "region_snapshot_replacement_garbage_collection" configured period: every s currently executing: no @@ -1012,6 +1024,14 @@ task: "region_replacement_driver" number of region replacement finish sagas started ok: 0 number of errors: 0 +task: "region_snapshot_replacement_finish" + configured period: every s + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + total records transitioned to done: 0 + errors: 0 + task: "region_snapshot_replacement_garbage_collection" configured period: every s currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index b3d189691c..7f2726cc59 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -398,6 +398,9 @@ pub struct BackgroundTaskConfig { RegionSnapshotReplacementGarbageCollectionConfig, /// configuration for region snapshot replacement step task pub region_snapshot_replacement_step: RegionSnapshotReplacementStepConfig, + /// configuration for region snapshot replacement finisher task + pub region_snapshot_replacement_finish: + RegionSnapshotReplacementFinishConfig, } #[serde_as] @@ -658,6 +661,14 @@ pub struct RegionSnapshotReplacementStepConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RegionSnapshotReplacementFinishConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -908,6 +919,7 @@ mod test { region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 + region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1082,6 +1094,10 @@ mod test { RegionSnapshotReplacementStepConfig { period_secs: Duration::from_secs(30), }, + region_snapshot_replacement_finish: + RegionSnapshotReplacementFinishConfig { + period_secs: Duration::from_secs(30), + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1161,6 +1177,7 @@ mod test { region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 + region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index e63b155fc6..4c181ef3a2 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -142,6 +142,7 @@ lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 +region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index bca3f7f2c4..d25408e6e3 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -128,6 +128,7 @@ lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 +region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index ae4309d8f9..fedb74b81b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -108,6 +108,7 @@ use super::tasks::phantom_disks; use super::tasks::physical_disk_adoption; use super::tasks::region_replacement; use super::tasks::region_replacement_driver; +use super::tasks::region_snapshot_replacement_finish::*; use super::tasks::region_snapshot_replacement_garbage_collect::*; use super::tasks::region_snapshot_replacement_start::*; use super::tasks::region_snapshot_replacement_step::*; @@ -167,6 +168,7 @@ pub struct BackgroundTasks { pub task_region_snapshot_replacement_start: Activator, pub task_region_snapshot_replacement_garbage_collection: Activator, pub task_region_snapshot_replacement_step: Activator, + pub task_region_snapshot_replacement_finish: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -252,6 +254,7 @@ impl BackgroundTasksInitializer { task_region_snapshot_replacement_garbage_collection: Activator::new( ), task_region_snapshot_replacement_step: Activator::new(), + task_region_snapshot_replacement_finish: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -316,6 +319,7 @@ impl BackgroundTasksInitializer { task_region_snapshot_replacement_start, task_region_snapshot_replacement_garbage_collection, task_region_snapshot_replacement_step, + task_region_snapshot_replacement_finish, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -780,7 +784,7 @@ impl BackgroundTasksInitializer { replacement, and run the step saga for them", period: config.region_snapshot_replacement_step.period_secs, task_impl: Box::new(RegionSnapshotReplacementFindAffected::new( - datastore, + datastore.clone(), sagas.clone(), )), opctx: opctx.child(BTreeMap::new()), @@ -788,6 +792,20 @@ impl BackgroundTasksInitializer { activator: task_region_snapshot_replacement_step, }); + driver.register(TaskDefinition { + name: "region_snapshot_replacement_finish", + description: + "complete a region snapshot replacement if all the steps are \ + done", + period: config.region_snapshot_replacement_finish.period_secs, + task_impl: Box::new(RegionSnapshotReplacementFinishDetector::new( + datastore, + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_region_snapshot_replacement_finish, + }); + driver } } diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 6089ba8d65..6cbba0a07b 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -25,6 +25,7 @@ pub mod phantom_disks; pub mod physical_disk_adoption; pub mod region_replacement; pub mod region_replacement_driver; +pub mod region_snapshot_replacement_finish; pub mod region_snapshot_replacement_garbage_collect; pub mod region_snapshot_replacement_start; pub mod region_snapshot_replacement_step; diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs new file mode 100644 index 0000000000..134995d848 --- /dev/null +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs @@ -0,0 +1,332 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for detecting when a region snapshot replacement has all its +//! steps done, and finishing it. +//! +//! Once all related region snapshot replacement steps are done, the region +//! snapshot replacement can be completed. + +use crate::app::background::BackgroundTask; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::internal_api::background::RegionSnapshotReplacementFinishStatus; +use serde_json::json; +use std::sync::Arc; + +pub struct RegionSnapshotReplacementFinishDetector { + datastore: Arc, +} + +impl RegionSnapshotReplacementFinishDetector { + pub fn new(datastore: Arc) -> Self { + RegionSnapshotReplacementFinishDetector { datastore } + } + + async fn transition_requests_to_done( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementFinishStatus, + ) { + let log = &opctx.log; + + // Find all region snapshot replacement requests in state "Running" + let requests = match self + .datastore + .get_running_region_snapshot_replacements(opctx) + .await + { + Ok(requests) => requests, + + Err(e) => { + let s = format!( + "query for region snapshot replacement requests \ + failed: {e}", + ); + error!(&log, "{s}"); + status.errors.push(s); + + return; + } + }; + + for request in requests { + // Count associated region snapshot replacement steps that are not + // completed. + let count = match self + .datastore + .in_progress_region_snapshot_replacement_steps( + opctx, request.id, + ) + .await + { + Ok(count) => count, + + Err(e) => { + let s = format!( + "counting incomplete region snapshot replacement \ + steps failed: {e}", + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + if count == 0 { + // If the region snapshot has been deleted, then the snapshot + // replacement is done: the reference number went to zero and it + // was deleted, therefore there aren't any volumes left that + // reference it! + match self + .datastore + .region_snapshot_get( + request.old_dataset_id, + request.old_region_id, + request.old_snapshot_id, + ) + .await + { + Ok(Some(_)) => { + info!( + &log, + "region snapshot still exists"; + "request.old_dataset_id" => %request.old_dataset_id, + "request.old_region_id" => %request.old_region_id, + "request.old_snapshot_id" => %request.old_snapshot_id, + ); + continue; + } + + Ok(None) => { + // gone! + } + + Err(e) => { + let s = format!( + "error querying for region snapshot {} {} {}: {e}", + request.old_dataset_id, + request.old_region_id, + request.old_snapshot_id, + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + // Transition region snapshot replacement to Complete + match self + .datastore + .set_region_snapshot_replacement_complete(opctx, request.id) + .await + { + Ok(()) => { + let s = format!("set request {} to done", request.id); + info!(&log, "{s}"); + status.records_set_to_done.push(s); + } + + Err(e) => { + let s = format!( + "marking snapshot replacement as done failed: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + } + } + } + } + } +} + +impl BackgroundTask for RegionSnapshotReplacementFinishDetector { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async move { + let mut status = RegionSnapshotReplacementFinishStatus::default(); + + self.transition_requests_to_done(opctx, &mut status).await; + + json!(status) + } + .boxed() + } +} + +#[cfg(test)] +mod test { + use super::*; + use nexus_db_model::RegionSnapshotReplacement; + use nexus_db_model::RegionSnapshotReplacementStep; + use nexus_db_model::RegionSnapshotReplacementStepState; + use nexus_test_utils_macros::nexus_test; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + #[nexus_test(server = crate::Server)] + async fn test_done_region_snapshot_replacement_causes_finish( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let mut task = + RegionSnapshotReplacementFinishDetector::new(datastore.clone()); + + // Noop test + let result: RegionSnapshotReplacementFinishStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementFinishStatus::default()); + + // Add a region snapshot replacement request for a fake region snapshot. + + let dataset_id = Uuid::new_v4(); + let region_id = Uuid::new_v4(); + let snapshot_id = Uuid::new_v4(); + + // Do not add the fake region snapshot to the database, as it should + // have been deleted by the time the request transitions to "Running" + + let request = + RegionSnapshotReplacement::new(dataset_id, region_id, snapshot_id); + + let request_id = request.id; + + datastore + .insert_region_snapshot_replacement_request_with_volume_id( + &opctx, + request, + Uuid::new_v4(), + ) + .await + .unwrap(); + + // Transition that to Allocating -> ReplacementDone -> DeletingOldVolume + // -> Running + + let operating_saga_id = Uuid::new_v4(); + + datastore + .set_region_snapshot_replacement_allocating( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + let new_region_id = Uuid::new_v4(); + let old_snapshot_volume_id = Uuid::new_v4(); + + datastore + .set_region_snapshot_replacement_replacement_done( + &opctx, + request_id, + operating_saga_id, + new_region_id, + old_snapshot_volume_id, + ) + .await + .unwrap(); + + datastore + .set_region_snapshot_replacement_deleting_old_volume( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + datastore + .set_region_snapshot_replacement_running( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + // Insert a few steps, not all finished yet + + let operating_saga_id = Uuid::new_v4(); + + let mut step_1 = + RegionSnapshotReplacementStep::new(request_id, Uuid::new_v4()); + step_1.replacement_state = RegionSnapshotReplacementStepState::Complete; + step_1.operating_saga_id = Some(operating_saga_id); + let step_1_id = step_1.id; + + let mut step_2 = + RegionSnapshotReplacementStep::new(request_id, Uuid::new_v4()); + step_2.replacement_state = RegionSnapshotReplacementStepState::Complete; + step_2.operating_saga_id = Some(operating_saga_id); + let step_2_id = step_2.id; + + datastore + .insert_region_snapshot_replacement_step(&opctx, step_1) + .await + .unwrap(); + datastore + .insert_region_snapshot_replacement_step(&opctx, step_2) + .await + .unwrap(); + + // Activate the task, it should do nothing yet + + let result: RegionSnapshotReplacementFinishStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementFinishStatus::default()); + + // Transition one record to Complete, the task should still do nothing + + datastore + .set_region_snapshot_replacement_step_volume_deleted( + &opctx, step_1_id, + ) + .await + .unwrap(); + + let result: RegionSnapshotReplacementFinishStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementFinishStatus::default()); + + // Transition the other record to Complete + + datastore + .set_region_snapshot_replacement_step_volume_deleted( + &opctx, step_2_id, + ) + .await + .unwrap(); + + // Activate the task - it should pick the request up, change the state, + // and try to run the region snapshot replacement finish saga + let result: RegionSnapshotReplacementFinishStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result, + RegionSnapshotReplacementFinishStatus { + records_set_to_done: vec![format!( + "set request {request_id} to done" + )], + errors: vec![], + }, + ); + } +} diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs index d78e304b75..cd13a56642 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs @@ -413,12 +413,6 @@ impl BackgroundTask for RegionSnapshotReplacementFindAffected { opctx: &'a OpContext, ) -> BoxFuture<'a, serde_json::Value> { async move { - let log = &opctx.log; - info!( - &log, - "region snapshot replacement find affected volumes task started" - ); - let mut status = RegionSnapshotReplacementStepStatus::default(); // Importantly, clean old steps up before finding affected volumes! @@ -436,11 +430,6 @@ impl BackgroundTask for RegionSnapshotReplacementFindAffected { self.invoke_step_saga_for_affected_volumes(opctx, &mut status) .await; - info!( - &log, - "region snapshot replacement find affected volumes task done" - ); - json!(status) } .boxed() diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 6859e992ca..bd338469e0 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -140,6 +140,7 @@ instance_updater.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 +region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index e5fd35d1e3..6f6e80cb60 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -46,3 +46,10 @@ pub struct RegionSnapshotReplacementStepStatus { pub step_invoked_ok: Vec, pub errors: Vec, } + +/// The status of a `region_snapshot_replacement_finish` background task activation +#[derive(Serialize, Deserialize, Default, Debug, PartialEq, Eq)] +pub struct RegionSnapshotReplacementFinishStatus { + pub records_set_to_done: Vec, + pub errors: Vec, +} diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index f0f40d282e..30b8676785 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -68,6 +68,7 @@ instance_updater.period_secs = 30 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 +region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 23340b3c36..1761d41698 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -68,6 +68,7 @@ instance_updater.period_secs = 30 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 +region_snapshot_replacement_finish.period_secs = 30 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds.