diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index db9e2cba52..d45865b4a7 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -36,6 +36,7 @@ use nexus_types::internal_api::background::LookupRegionPortStatus; use nexus_types::internal_api::background::RegionReplacementDriverStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementGarbageCollectStatus; use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; +use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; use nexus_types::inventory::BaseboardId; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::DemoSagaUuid; @@ -1509,6 +1510,46 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { println!(" > {line}"); } + println!(" errors: {}", status.errors.len()); + for line in &status.errors { + println!(" > {line}"); + } + } + } + } else if name == "region_snapshot_replacement_step" { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + + Ok(status) => { + println!( + " total step records created ok: {}", + status.step_records_created_ok.len(), + ); + for line in &status.step_records_created_ok { + println!(" > {line}"); + } + + println!( + " total step garbage collect saga invoked ok: {}", + status.step_garbage_collect_invoked_ok.len(), + ); + for line in &status.step_garbage_collect_invoked_ok { + println!(" > {line}"); + } + + println!( + " total step saga invoked ok: {}", + status.step_invoked_ok.len(), + ); + for line in &status.step_invoked_ok { + println!(" > {line}"); + } + println!(" errors: {}", status.errors.len()); for line in &status.errors { println!(" > {line}"); diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index ec407cd123..2774a5d734 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -135,6 +135,11 @@ task: "region_snapshot_replacement_start" detect if region snapshots need replacement and begin the process +task: "region_snapshot_replacement_step" + detect what volumes were affected by a region snapshot replacement, and run + the step saga for them + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -292,6 +297,11 @@ task: "region_snapshot_replacement_start" detect if region snapshots need replacement and begin the process +task: "region_snapshot_replacement_step" + detect what volumes were affected by a region snapshot replacement, and run + the step saga for them + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -436,6 +446,11 @@ task: "region_snapshot_replacement_start" detect if region snapshots need replacement and begin the process +task: "region_snapshot_replacement_step" + detect what volumes were affected by a region snapshot replacement, and run + the step saga for them + + task: "saga_recovery" recovers sagas assigned to this Nexus diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index e939bfa864..757b4e8888 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -351,6 +351,11 @@ task: "region_snapshot_replacement_start" detect if region snapshots need replacement and begin the process +task: "region_snapshot_replacement_step" + detect what volumes were affected by a region snapshot replacement, and run + the step saga for them + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -606,6 +611,16 @@ task: "region_snapshot_replacement_start" total start saga invoked ok: 0 errors: 0 +task: "region_snapshot_replacement_step" + configured period: every s + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + total step records created ok: 0 + total step garbage collect saga invoked ok: 0 + total step saga invoked ok: 0 + errors: 0 + task: "saga_recovery" configured period: every 10m currently executing: no @@ -1014,6 +1029,16 @@ task: "region_snapshot_replacement_start" total start saga invoked ok: 0 errors: 0 +task: "region_snapshot_replacement_step" + configured period: every s + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms + total step records created ok: 0 + total step garbage collect saga invoked ok: 0 + total step saga invoked ok: 0 + errors: 0 + task: "saga_recovery" configured period: every 10m currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index f6e60bb558..b3d189691c 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -396,6 +396,8 @@ pub struct BackgroundTaskConfig { /// configuration for region snapshot replacement garbage collection pub region_snapshot_replacement_garbage_collection: RegionSnapshotReplacementGarbageCollectionConfig, + /// configuration for region snapshot replacement step task + pub region_snapshot_replacement_step: RegionSnapshotReplacementStepConfig, } #[serde_as] @@ -648,6 +650,14 @@ pub struct RegionSnapshotReplacementGarbageCollectionConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RegionSnapshotReplacementStepConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -897,6 +907,7 @@ mod test { lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 + region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1067,6 +1078,10 @@ mod test { RegionSnapshotReplacementGarbageCollectionConfig { period_secs: Duration::from_secs(30), }, + region_snapshot_replacement_step: + RegionSnapshotReplacementStepConfig { + period_secs: Duration::from_secs(30), + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1145,6 +1160,7 @@ mod test { lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 + region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index c87e1255b5..e63b155fc6 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -141,6 +141,7 @@ saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 +region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index f844adccbe..bca3f7f2c4 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -127,6 +127,7 @@ saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 +region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 37c276fa07..ae4309d8f9 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -110,6 +110,7 @@ use super::tasks::region_replacement; use super::tasks::region_replacement_driver; use super::tasks::region_snapshot_replacement_garbage_collect::*; use super::tasks::region_snapshot_replacement_start::*; +use super::tasks::region_snapshot_replacement_step::*; use super::tasks::saga_recovery; use super::tasks::service_firewall_rules; use super::tasks::sync_service_zone_nat::ServiceZoneNatTracker; @@ -165,6 +166,7 @@ pub struct BackgroundTasks { pub task_lookup_region_port: Activator, pub task_region_snapshot_replacement_start: Activator, pub task_region_snapshot_replacement_garbage_collection: Activator, + pub task_region_snapshot_replacement_step: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -249,6 +251,7 @@ impl BackgroundTasksInitializer { task_region_snapshot_replacement_start: Activator::new(), task_region_snapshot_replacement_garbage_collection: Activator::new( ), + task_region_snapshot_replacement_step: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -312,6 +315,7 @@ impl BackgroundTasksInitializer { task_lookup_region_port, task_region_snapshot_replacement_start, task_region_snapshot_replacement_garbage_collection, + task_region_snapshot_replacement_step, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -761,7 +765,7 @@ impl BackgroundTasksInitializer { .region_snapshot_replacement_garbage_collection .period_secs, task_impl: Box::new(RegionSnapshotReplacementGarbageCollect::new( - datastore, + datastore.clone(), sagas.clone(), )), opctx: opctx.child(BTreeMap::new()), @@ -769,6 +773,21 @@ impl BackgroundTasksInitializer { activator: task_region_snapshot_replacement_garbage_collection, }); + driver.register(TaskDefinition { + name: "region_snapshot_replacement_step", + description: + "detect what volumes were affected by a region snapshot \ + replacement, and run the step saga for them", + period: config.region_snapshot_replacement_step.period_secs, + task_impl: Box::new(RegionSnapshotReplacementFindAffected::new( + datastore, + sagas.clone(), + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_region_snapshot_replacement_step, + }); + driver } } diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 7ba68d0b80..6089ba8d65 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -27,6 +27,7 @@ pub mod region_replacement; pub mod region_replacement_driver; pub mod region_snapshot_replacement_garbage_collect; pub mod region_snapshot_replacement_start; +pub mod region_snapshot_replacement_step; pub mod saga_recovery; pub mod service_firewall_rules; pub mod sync_service_zone_nat; diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs new file mode 100644 index 0000000000..d78e304b75 --- /dev/null +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs @@ -0,0 +1,775 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for detecting volumes affected by a region snapshot +//! replacement, creating records for those, and triggering the "step" saga for +//! them. +//! +//! After the region snapshot replacement start saga finishes, the snapshot's +//! volume is no longer in a degraded state: the requested read-only region was +//! cloned to a new region, and the reference was replaced in the construction +//! request. Any disk that is now created using the snapshot as a source will +//! work without issues. +//! +//! The problem now is volumes that still reference the replaced read-only +//! region, and any Upstairs constructed from a VCR that references that region. +//! This task's responsibility is to find all volumes that reference the +//! replaced read-only region, create a record for them, and trigger the region +//! snapshot replacement step saga. This is a much less involved process than +//! region replacement: no continuous monitoring and driving is required. See +//! the "region snapshot replacement step" saga's docstring for more +//! information. + +use crate::app::authn; +use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; +use crate::app::sagas; +use crate::app::sagas::region_snapshot_replacement_step::*; +use crate::app::sagas::region_snapshot_replacement_step_garbage_collect::*; +use crate::app::sagas::NexusSaga; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_db_model::RegionSnapshotReplacementStep; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::identity::Asset; +use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; +use serde_json::json; +use std::sync::Arc; + +pub struct RegionSnapshotReplacementFindAffected { + datastore: Arc, + sagas: Arc, +} + +impl RegionSnapshotReplacementFindAffected { + pub fn new(datastore: Arc, sagas: Arc) -> Self { + RegionSnapshotReplacementFindAffected { datastore, sagas } + } + + async fn send_start_request( + &self, + opctx: &OpContext, + request: RegionSnapshotReplacementStep, + ) -> Result<(), omicron_common::api::external::Error> { + let params = sagas::region_snapshot_replacement_step::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + request, + }; + + let saga_dag = SagaRegionSnapshotReplacementStep::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await + } + + async fn send_garbage_collect_request( + &self, + opctx: &OpContext, + request: RegionSnapshotReplacementStep, + ) -> Result<(), omicron_common::api::external::Error> { + let Some(old_snapshot_volume_id) = request.old_snapshot_volume_id + else { + // This state is illegal! + let s = format!( + "request {} old snapshot volume id is None!", + request.id, + ); + + return Err(omicron_common::api::external::Error::internal_error( + &s, + )); + }; + + let params = + sagas::region_snapshot_replacement_step_garbage_collect::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + old_snapshot_volume_id, + request, + }; + + let saga_dag = + SagaRegionSnapshotReplacementStepGarbageCollect::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await + } + + async fn clean_up_region_snapshot_replacement_step_volumes( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementStepStatus, + ) { + let log = &opctx.log; + + let requests = match self + .datastore + .region_snapshot_replacement_steps_requiring_garbage_collection( + opctx, + ) + .await + { + Ok(requests) => requests, + + Err(e) => { + let s = format!("querying for steps to collect failed! {e}"); + error!(&log, "{s}"); + status.errors.push(s); + return; + } + }; + + for request in requests { + let request_id = request.id; + + let result = + self.send_garbage_collect_request(opctx, request.clone()).await; + + match result { + Ok(()) => { + let s = format!( + "region snapshot replacement step garbage \ + collect request ok for {request_id}" + ); + + info!( + &log, + "{s}"; + "request.volume_id" => %request.volume_id, + "request.old_snapshot_volume_id" => ?request.old_snapshot_volume_id, + ); + status.step_garbage_collect_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "sending region snapshot replacement step garbage \ + collect request failed: {e}", + ); + error!( + &log, + "{s}"; + "request.volume_id" => %request.volume_id, + "request.old_snapshot_volume_id" => ?request.old_snapshot_volume_id, + ); + status.errors.push(s); + } + } + } + } + + // Any request in state Running means that the target replacement has + // occurred already, meaning the region snapshot being replaced is not + // present as a target in the snapshot's volume construction request + // anymore. Any future usage of that snapshot (as a source for a disk or + // otherwise) will get a volume construction request that references the + // replacement read-only region. + // + // "step" records are created here for each volume found that still + // references the replaced region snapshot, most likely having been created + // by copying the snapshot's volume construction request before the target + // replacement occurred. These volumes also need to have target replacement + // performed, and this is captured in this "step" record. + async fn create_step_records_for_affected_volumes( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementStepStatus, + ) { + let log = &opctx.log; + + // Find all region snapshot replacement requests in state "Running" + let requests = match self + .datastore + .get_running_region_snapshot_replacements(opctx) + .await + { + Ok(requests) => requests, + + Err(e) => { + let s = format!( + "get_running_region_snapshot_replacements failed: {e}", + ); + + error!(&log, "{s}"); + status.errors.push(s); + return; + } + }; + + for request in requests { + // Find all volumes that reference the replaced snapshot + let region_snapshot = match self + .datastore + .region_snapshot_get( + request.old_dataset_id, + request.old_region_id, + request.old_snapshot_id, + ) + .await + { + Ok(Some(region_snapshot)) => region_snapshot, + + Ok(None) => { + let s = format!( + "region snapshot {} {} {} not found!", + request.old_dataset_id, + request.old_region_id, + request.old_snapshot_id, + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + + Err(e) => { + let s = format!( + "error querying for region snapshot {} {} {}: {e}", + request.old_dataset_id, + request.old_region_id, + request.old_snapshot_id, + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + let snapshot_addr = match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, + + Err(e) => { + let s = format!( + "region snapshot addr {} could not be parsed: {e}", + region_snapshot.snapshot_addr, + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + let volumes = match self + .datastore + .find_volumes_referencing_socket_addr(&opctx, snapshot_addr) + .await + { + Ok(volumes) => volumes, + + Err(e) => { + let s = format!("error finding referenced volumes: {e}"); + error!( + log, + "{s}"; + "request id" => ?request.id, + ); + status.errors.push(s); + + continue; + } + }; + + for volume in volumes { + // Any volume referencing the old socket addr needs to be + // replaced. Create a "step" record for this. + // + // Note: this function returns a conflict error if there already + // exists a step record referencing this volume ID because a + // volume repair record is also created using that volume ID, + // and only one of those can exist for a given volume at a time. + // + // Also note: this function returns a conflict error if another + // step record references this volume id in the "old snapshot + // volume id" column - this is ok! Region snapshot replacement + // step records are created for some volume id, and a null old + // snapshot volume id: + // + // volume_id: references snapshot_addr + // old_snapshot_volume_id: null + // + // The region snapshot replacement step saga will create a + // volume to stash the reference to snapshot_addr, and then call + // `volume_replace_snapshot`. This will swap snapshot_addr + // reference into the old snapshot volume for later deletion: + // + // volume_id: does _not_ reference snapshot_addr anymore + // old_snapshot_volume_id: now references snapshot_addr + // + // If `find_volumes_referencing_socket_addr` is executed before + // that volume is deleted, it will return the old snapshot + // volume id above, and then this for loop tries to make a + // region snapshot replacement step record for it! + // + // Allowing a region snapshot replacement step record to be + // created in this case would mean that (depending on when the + // functions execute), an indefinite amount of work would be + // created, continually "moving" the snapshot_addr from + // temporary volume to temporary volume. + + match self + .datastore + .create_region_snapshot_replacement_step( + opctx, + request.id, + volume.id(), + ) + .await + { + Ok(step_request_id) => { + let s = format!("created {step_request_id}"); + info!( + log, + "{s}"; + "request id" => ?request.id, + "volume id" => ?volume.id(), + ); + status.step_records_created_ok.push(s); + } + + Err(e) => { + let s = format!("error creating step request: {e}"); + error!( + log, + "{s}"; + "request id" => ?request.id, + "volume id" => ?volume.id(), + ); + status.errors.push(s); + } + } + } + } + } + + async fn invoke_step_saga_for_affected_volumes( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementStepStatus, + ) { + let log = &opctx.log; + + // Once all region snapshot replacement step records have been created, + // trigger sagas as appropriate. + + let step_requests = match self + .datastore + .get_requested_region_snapshot_replacement_steps(opctx) + .await + { + Ok(step_requests) => step_requests, + + Err(e) => { + let s = format!( + "query for requested region snapshot replacement step \ + requests failed: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + return; + } + }; + + for request in step_requests { + let request_id = request.id; + + match self.send_start_request(opctx, request.clone()).await { + Ok(()) => { + let s = format!( + "region snapshot replacement step saga invoked ok for \ + {request_id}" + ); + + info!( + &log, + "{s}"; + "request.request_id" => %request.request_id, + "request.volume_id" => %request.volume_id, + ); + status.step_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "invoking region snapshot replacement step saga for \ + {request_id} failed: {e}" + ); + + error!( + &log, + "{s}"; + "request.request_id" => %request.request_id, + "request.volume_id" => %request.volume_id, + ); + status.errors.push(s); + } + }; + } + } +} + +impl BackgroundTask for RegionSnapshotReplacementFindAffected { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async move { + let log = &opctx.log; + info!( + &log, + "region snapshot replacement find affected volumes task started" + ); + + let mut status = RegionSnapshotReplacementStepStatus::default(); + + // Importantly, clean old steps up before finding affected volumes! + // Otherwise, will continue to find the snapshot in volumes to + // delete, and will continue to see conflicts in next function. + self.clean_up_region_snapshot_replacement_step_volumes( + opctx, + &mut status, + ) + .await; + + self.create_step_records_for_affected_volumes(opctx, &mut status) + .await; + + self.invoke_step_saga_for_affected_volumes(opctx, &mut status) + .await; + + info!( + &log, + "region snapshot replacement find affected volumes task done" + ); + + json!(status) + } + .boxed() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::app::background::init::test::NoopStartSaga; + use nexus_db_model::RegionSnapshot; + use nexus_db_model::RegionSnapshotReplacement; + use nexus_db_model::RegionSnapshotReplacementStep; + use nexus_db_model::RegionSnapshotReplacementStepState; + use nexus_db_model::Volume; + use nexus_test_utils_macros::nexus_test; + use sled_agent_client::types::CrucibleOpts; + use sled_agent_client::types::VolumeConstructionRequest; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + async fn add_fake_volume_for_snapshot_addr( + datastore: &DataStore, + snapshot_addr: String, + ) -> Uuid { + let new_volume_id = Uuid::new_v4(); + + let volume_construction_request = VolumeConstructionRequest::Volume { + id: new_volume_id, + block_size: 0, + sub_volumes: vec![], + read_only_parent: Some(Box::new( + VolumeConstructionRequest::Region { + block_size: 0, + blocks_per_extent: 0, + extent_count: 0, + gen: 0, + opts: CrucibleOpts { + id: Uuid::new_v4(), + target: vec![snapshot_addr], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: true, + }, + }, + )), + }; + + let volume_data = + serde_json::to_string(&volume_construction_request).unwrap(); + + let volume = Volume::new(new_volume_id, volume_data); + + datastore.volume_create(volume).await.unwrap(); + + new_volume_id + } + + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_step_task( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let mut task = RegionSnapshotReplacementFindAffected::new( + datastore.clone(), + starter.clone(), + ); + + // Noop test + let result: RegionSnapshotReplacementStepStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementStepStatus::default()); + assert_eq!(starter.count_reset(), 0); + + // Add a region snapshot replacement request for a fake region snapshot. + + let dataset_id = Uuid::new_v4(); + let region_id = Uuid::new_v4(); + let snapshot_id = Uuid::new_v4(); + let snapshot_addr = String::from("[fd00:1122:3344::101]:9876"); + + let fake_region_snapshot = RegionSnapshot::new( + dataset_id, + region_id, + snapshot_id, + snapshot_addr.clone(), + ); + + datastore.region_snapshot_create(fake_region_snapshot).await.unwrap(); + + let request = + RegionSnapshotReplacement::new(dataset_id, region_id, snapshot_id); + + let request_id = request.id; + + datastore + .insert_region_snapshot_replacement_request_with_volume_id( + &opctx, + request, + Uuid::new_v4(), + ) + .await + .unwrap(); + + // Transition that to Allocating -> ReplacementDone -> DeletingOldVolume + // -> Running + + let operating_saga_id = Uuid::new_v4(); + + datastore + .set_region_snapshot_replacement_allocating( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + let new_region_id = Uuid::new_v4(); + let old_snapshot_volume_id = Uuid::new_v4(); + + datastore + .set_region_snapshot_replacement_replacement_done( + &opctx, + request_id, + operating_saga_id, + new_region_id, + old_snapshot_volume_id, + ) + .await + .unwrap(); + + datastore + .set_region_snapshot_replacement_deleting_old_volume( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + datastore + .set_region_snapshot_replacement_running( + &opctx, + request_id, + operating_saga_id, + ) + .await + .unwrap(); + + // Add some fake volumes that reference the region snapshot being + // replaced + + let new_volume_1_id = add_fake_volume_for_snapshot_addr( + &datastore, + snapshot_addr.clone(), + ) + .await; + let new_volume_2_id = add_fake_volume_for_snapshot_addr( + &datastore, + snapshot_addr.clone(), + ) + .await; + + // Add some fake volumes that do not + + let other_volume_1_id = add_fake_volume_for_snapshot_addr( + &datastore, + String::from("[fd00:1122:3344::101]:1000"), + ) + .await; + + let other_volume_2_id = add_fake_volume_for_snapshot_addr( + &datastore, + String::from("[fd12:5544:3344::912]:3901"), + ) + .await; + + // Activate the task - it should pick the running request up and try to + // run the region snapshot replacement step saga for the volumes + + let result: RegionSnapshotReplacementStepStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + let requested_region_snapshot_replacement_steps = datastore + .get_requested_region_snapshot_replacement_steps(&opctx) + .await + .unwrap(); + + assert_eq!(requested_region_snapshot_replacement_steps.len(), 2); + + for step in &requested_region_snapshot_replacement_steps { + let s: String = format!("created {}", step.id); + assert!(result.step_records_created_ok.contains(&s)); + + let s: String = format!( + "region snapshot replacement step saga invoked ok for {}", + step.id + ); + assert!(result.step_invoked_ok.contains(&s)); + + if step.volume_id == new_volume_1_id + || step.volume_id == new_volume_2_id + { + // ok! + } else if step.volume_id == other_volume_1_id + || step.volume_id == other_volume_2_id + { + // error! + assert!(false); + } else { + // error! + assert!(false); + } + } + + // No garbage collection would be invoked yet, as the step records are + // not in state Complete + assert!(result.step_garbage_collect_invoked_ok.is_empty()); + + assert_eq!(result.errors.len(), 0); + + assert_eq!(starter.count_reset(), 2); + } + + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_step_task_gc( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let mut task = RegionSnapshotReplacementFindAffected::new( + datastore.clone(), + starter.clone(), + ); + + // Noop test + let result: RegionSnapshotReplacementStepStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementStepStatus::default()); + assert_eq!(starter.count_reset(), 0); + + // Now, add some Complete records and make sure the garbage collection + // saga is invoked. + + datastore + .insert_region_snapshot_replacement_step(&opctx, { + let mut record = RegionSnapshotReplacementStep::new( + Uuid::new_v4(), + Uuid::new_v4(), + ); + + record.replacement_state = + RegionSnapshotReplacementStepState::Complete; + record.old_snapshot_volume_id = Some(Uuid::new_v4()); + + record + }) + .await + .unwrap(); + + datastore + .insert_region_snapshot_replacement_step(&opctx, { + let mut record = RegionSnapshotReplacementStep::new( + Uuid::new_v4(), + Uuid::new_v4(), + ); + + record.replacement_state = + RegionSnapshotReplacementStepState::Complete; + record.old_snapshot_volume_id = Some(Uuid::new_v4()); + + record + }) + .await + .unwrap(); + + // Activate the task - it should pick the complete steps up and try to + // run the region snapshot replacement step garbage collect saga + + let result: RegionSnapshotReplacementStepStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + let region_snapshot_replacement_steps_requiring_gc = datastore + .region_snapshot_replacement_steps_requiring_garbage_collection( + &opctx, + ) + .await + .unwrap(); + + assert_eq!(region_snapshot_replacement_steps_requiring_gc.len(), 2); + + eprintln!("{:?}", result); + + for step in ®ion_snapshot_replacement_steps_requiring_gc { + let s: String = format!( + "region snapshot replacement step garbage collect request ok \ + for {}", + step.id + ); + assert!(result.step_garbage_collect_invoked_ok.contains(&s)); + } + + assert!(result.step_records_created_ok.is_empty()); + + assert!(result.step_invoked_ok.is_empty()); + + assert_eq!(result.errors.len(), 0); + + assert_eq!(starter.count_reset(), 2); + } +} diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 926b983460..bd3ae62996 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -41,6 +41,8 @@ pub mod region_replacement_finish; pub mod region_replacement_start; pub mod region_snapshot_replacement_garbage_collect; pub mod region_snapshot_replacement_start; +pub mod region_snapshot_replacement_step; +pub mod region_snapshot_replacement_step_garbage_collect; pub mod snapshot_create; pub mod snapshot_delete; pub mod test_saga; @@ -198,6 +200,12 @@ fn make_action_registry() -> ActionRegistry { ::register_actions( &mut registry, ); + ::register_actions( + &mut registry, + ); + ::register_actions( + &mut registry, + ); #[cfg(test)] ::register_actions(&mut registry); diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step.rs b/nexus/src/app/sagas/region_snapshot_replacement_step.rs new file mode 100644 index 0000000000..600bb155bf --- /dev/null +++ b/nexus/src/app/sagas/region_snapshot_replacement_step.rs @@ -0,0 +1,603 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Region snapshot replacement is distinct from region replacement: replacing +//! parts of a volume's read-only parent (and all the layers under it) is easier +//! because this does _not_ incur a live repair or reconciliation. Each part of +//! a read-only region set contains the same data that will never be modified. +//! +//! A region snapshot replacement request starts off in the "Requested" state, +//! just like a region replacement request. A background task will search for +//! region snapshot replacement requests in this state and trigger the "region +//! snapshot replacement start" saga. This will allocate a new region to replace +//! the requested one, and modify the snapshot VCR accordingly. If any disks are +//! then created using that snapshot as a source, they will have the replacement +//! and will not need a replace request. +//! +//! However, any past use of that snapshot as a source means that the Volume +//! created from that will have a copy of the unmodified snapshot Volume as a +//! read-only parent. Any construction of the Volume will be referencing the +//! replaced region snapshot (which could be gone if it is expunged). It is this +//! saga's responsibility to update all Volumes that reference the region +//! snapshot being replaced, and send a replacement request to any Upstairs that +//! were constructed. +//! +//! Some difficulty comes from the requirement to notify existing Upstairs that +//! reference the replaced read-only part, but even this is not as difficult as +//! region replacement: Nexus does not have to continually monitor and drive +//! either live repair or reconciliation, just ensure that the read-only +//! replacement occurs. Read-only replacements should be basically +//! instantaneous. +//! +//! A replace request only needs to be done once per Upstairs that has the old +//! reference. This is done as a "region snapshot replacement step", and once +//! all those are done, the region snapshot replacement request can be +//! "completed". +//! +//! Region snapshot replacement steps need to be written into the database and +//! have an associated state and operating saga id for the same reason that +//! region snapshot replacement requests do: multiple background tasks will +//! invoke multiple sagas, and there needs to be some exclusive access. +//! +//! See the documentation for the "region snapshot replacement step garbage +//! collect" saga for the next step in the process. + +use super::{ + ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, + ACTION_GENERATE_ID, +}; +use crate::app::db::datastore::ExistingTarget; +use crate::app::db::datastore::ReplacementTarget; +use crate::app::db::datastore::VolumeToDelete; +use crate::app::db::datastore::VolumeWithTarget; +use crate::app::db::lookup::LookupPath; +use crate::app::sagas::declare_saga_actions; +use crate::app::{authn, authz, db}; +use nexus_db_model::VmmState; +use nexus_types::identity::Resource; +use omicron_common::api::external::Error; +use propolis_client::types::ReplaceResult; +use serde::Deserialize; +use serde::Serialize; +use sled_agent_client::types::CrucibleOpts; +use sled_agent_client::types::VolumeConstructionRequest; +use std::net::SocketAddrV6; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// region snapshot replacement step saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Params { + pub serialized_authn: authn::saga::Serialized, + pub request: db::model::RegionSnapshotReplacementStep, +} + +// region snapshot replacement step saga: actions + +declare_saga_actions! { + region_snapshot_replacement_step; + SET_SAGA_ID -> "unused_1" { + + rsrss_set_saga_id + - rsrss_set_saga_id_undo + } + CREATE_REPLACE_PARAMS -> "replace_params" { + + rsrss_create_replace_params + } + CREATE_FAKE_VOLUME -> "unused_2" { + + rssrs_create_fake_volume + - rssrs_create_fake_volume_undo + } + REPLACE_SNAPSHOT_IN_VOLUME -> "unused_3" { + + rsrss_replace_snapshot_in_volume + - rsrss_replace_snapshot_in_volume_undo + } + NOTIFY_UPSTAIRS -> "unused_4" { + + rsrss_notify_upstairs + } + UPDATE_REQUEST_RECORD -> "unused_5" { + + rsrss_update_request_record + } +} + +// region snapshot replacement step saga: definition + +#[derive(Debug)] +pub(crate) struct SagaRegionSnapshotReplacementStep; +impl NexusSaga for SagaRegionSnapshotReplacementStep { + const NAME: &'static str = "region-snapshot-replacement-step"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + region_snapshot_replacement_step_register_actions(registry); + } + + fn make_saga_dag( + _params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + builder.append(Node::action( + "saga_id", + "GenerateSagaId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(Node::action( + "new_volume_id", + "GenerateNewVolumeId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(set_saga_id_action()); + builder.append(create_replace_params_action()); + builder.append(create_fake_volume_action()); + builder.append(replace_snapshot_in_volume_action()); + builder.append(notify_upstairs_action()); + builder.append(update_request_record_action()); + + Ok(builder.build()?) + } +} + +// region snapshot replacement step saga: action implementations + +async fn rsrss_set_saga_id( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + // Change the request record here to an intermediate "running" state to + // block out other sagas that will be triggered for the same request. + + osagactx + .datastore() + .set_region_snapshot_replacement_step_running( + &opctx, + params.request.id, + saga_id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rsrss_set_saga_id_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + osagactx + .datastore() + .undo_set_region_snapshot_replacement_step_running( + &opctx, + params.request.id, + saga_id, + ) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +struct ReplaceParams { + old_snapshot_address: SocketAddrV6, + new_region_address: SocketAddrV6, +} + +async fn rsrss_create_replace_params( + sagactx: NexusActionContext, +) -> Result { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // look up region snapshot replace request by id + + let region_snapshot_replace_request = osagactx + .datastore() + .get_region_snapshot_replacement_request_by_id( + &opctx, + params.request.request_id, + ) + .await + .map_err(ActionError::action_failed)?; + + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + region_snapshot_replace_request.old_dataset_id, + region_snapshot_replace_request.old_region_id, + region_snapshot_replace_request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + region_snapshot_replace_request.old_dataset_id, + region_snapshot_replace_request.old_region_id, + region_snapshot_replace_request.old_snapshot_id, + ))); + }; + + let old_snapshot_address: SocketAddrV6 = + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, + + Err(e) => { + return Err(ActionError::action_failed(format!( + "parsing {} as SocketAddrV6 failed: {e}", + region_snapshot.snapshot_addr, + ))); + } + }; + + let Some(new_region_id) = region_snapshot_replace_request.new_region_id + else { + return Err(ActionError::action_failed(format!( + "request {} does not have a new_region_id!", + region_snapshot_replace_request.id, + ))); + }; + + let new_region_address = osagactx + .nexus() + .region_addr(&log, new_region_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(ReplaceParams { old_snapshot_address, new_region_address }) +} + +async fn rssrs_create_fake_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + // Create a fake volume record for the old snapshot target. This will be + // deleted after region snapshot replacement step saga has finished, and the + // region replacement snapshot gc step has run. It can be completely blank + // here, it will be replaced by `volume_replace_snapshot`. + + let volume_construction_request = VolumeConstructionRequest::Volume { + id: new_volume_id, + block_size: 0, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 0, + blocks_per_extent: 0, + extent_count: 0, + gen: 0, + opts: CrucibleOpts { + id: new_volume_id, + target: vec![], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: true, + }, + }], + read_only_parent: None, + }; + + let volume_data = serde_json::to_string(&volume_construction_request) + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; + + let volume = db::model::Volume::new(new_volume_id, volume_data); + + osagactx + .datastore() + .volume_create(volume) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rssrs_create_fake_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + // Delete the fake volume. + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + osagactx.datastore().volume_hard_delete(new_volume_id).await?; + + Ok(()) +} + +async fn rsrss_replace_snapshot_in_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let replace_params = sagactx.lookup::("replace_params")?; + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + // `volume_replace_snapshot` will swap the old snapshot for the new region. + // No repair or reconcilation needs to occur after this. + osagactx + .datastore() + .volume_replace_snapshot( + VolumeWithTarget(params.request.volume_id), + ExistingTarget(replace_params.old_snapshot_address), + ReplacementTarget(replace_params.new_region_address), + VolumeToDelete(new_volume_id), + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rsrss_replace_snapshot_in_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let replace_params = sagactx.lookup::("replace_params")?; + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + osagactx + .datastore() + .volume_replace_snapshot( + VolumeWithTarget(params.request.volume_id), + ExistingTarget(replace_params.new_region_address), + ReplacementTarget(replace_params.old_snapshot_address), + VolumeToDelete(new_volume_id), + ) + .await?; + + Ok(()) +} + +async fn rsrss_notify_upstairs( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); + + // Make an effort to notify a Propolis if one was booted for this volume. + // This is best effort: if there is a failure, this saga will unwind and be + // triggered again for the same request. If there is no Propolis booted for + // this volume, then there's nothing to be done: any future Propolis will + // receive the updated Volume. + // + // Unlike for region replacement, there's no step required here if there + // isn't an active Propolis: any Upstairs created after the snapshot_addr + // is replaced will reference the cloned data. + + let Some(disk) = osagactx + .datastore() + .disk_for_volume_id(params.request.volume_id) + .await + .map_err(ActionError::action_failed)? + else { + return Ok(()); + }; + + let Some(instance_id) = disk.runtime().attach_instance_id else { + return Ok(()); + }; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let (.., authz_instance) = LookupPath::new(&opctx, &osagactx.datastore()) + .instance_id(instance_id) + .lookup_for(authz::Action::Read) + .await + .map_err(ActionError::action_failed)?; + + let instance_and_vmm = osagactx + .datastore() + .instance_fetch_with_vmm(&opctx, &authz_instance) + .await + .map_err(ActionError::action_failed)?; + + let Some(vmm) = instance_and_vmm.vmm() else { + return Ok(()); + }; + + let state = vmm.runtime.state; + + info!( + log, + "volume associated with disk attached to instance with vmm in \ + state {state}"; + "request id" => %params.request.id, + "volume id" => %params.request.volume_id, + "disk id" => ?disk.id(), + "instance id" => ?instance_id, + "vmm id" => ?vmm.id, + ); + + match &state { + VmmState::Running | VmmState::Rebooting => { + // Propolis server is ok to receive the volume replacement request. + } + + VmmState::Starting + | VmmState::Stopping + | VmmState::Stopped + | VmmState::Migrating + | VmmState::Failed + | VmmState::Destroyed + | VmmState::SagaUnwound => { + // Propolis server is not ok to receive volume replacement requests + // - unwind so that this saga can run again. + return Err(ActionError::action_failed(format!( + "vmm {} propolis not in a state to receive request", + vmm.id, + ))); + } + } + + let new_volume_vcr = match osagactx + .datastore() + .volume_get(params.request.volume_id) + .await + .map_err(ActionError::action_failed)? + { + Some(volume) => volume.data().to_string(), + + None => { + return Err(ActionError::action_failed(Error::internal_error( + "new volume is gone!", + ))); + } + }; + + let instance_lookup = + LookupPath::new(&opctx, &osagactx.datastore()).instance_id(instance_id); + + let (vmm, client) = osagactx + .nexus() + .propolis_client_for_instance( + &opctx, + &instance_lookup, + authz::Action::Modify, + ) + .await + .map_err(ActionError::action_failed)?; + + info!( + log, + "sending replacement request for disk volume to propolis"; + "request id" => %params.request.id, + "volume id" => %params.request.volume_id, + "disk id" => ?disk.id(), + "instance id" => ?instance_id, + "vmm id" => ?vmm.id, + ); + + let result = client + .instance_issue_crucible_vcr_request() + .id(disk.id()) + .body(propolis_client::types::InstanceVcrReplace { + name: disk.name().to_string(), + vcr_json: new_volume_vcr, + }) + .send() + .await + .map_err(|e| match e { + propolis_client::Error::ErrorResponse(rv) => { + ActionError::action_failed(rv.message.clone()) + } + + _ => ActionError::action_failed(format!( + "unexpected failure during \ + `instance_issue_crucible_vcr_request`: {e}", + )), + })?; + + let replace_result = result.into_inner(); + + info!( + log, + "saw replace result {replace_result:?}"; + "request id" => %params.request.id, + "volume id" => %params.request.volume_id, + "disk id" => ?disk.id(), + "instance id" => ?instance_id, + "vmm id" => ?vmm.id, + ); + + match &replace_result { + ReplaceResult::Started => { + // This saga's call just started the replacement + } + + ReplaceResult::StartedAlready => { + // A previous run of this saga (or saga node) started the + // replacement + } + + ReplaceResult::CompletedAlready => { + // It's done! We see this if the same propolis that received the + // original replace request started and finished the replacement. + } + + ReplaceResult::VcrMatches => { + // This propolis booted with the updated VCR + } + + ReplaceResult::Missing => { + // The volume does not contain the region to be replaced. This is an + // error! + return Err(ActionError::action_failed(String::from( + "saw ReplaceResult::Missing", + ))); + } + } + + Ok(()) +} + +async fn rsrss_update_request_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + // Update the request record to 'Completed' and clear the operating saga id. + // There is no undo step for this, it should succeed idempotently. + datastore + .set_region_snapshot_replacement_step_complete( + &opctx, + params.request.id, + saga_id, + new_volume_id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs new file mode 100644 index 0000000000..93335b6125 --- /dev/null +++ b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs @@ -0,0 +1,233 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Delete the volume that stashes the target replaced during a region snapshot +//! replacement step saga. After that's done, change the region snapshot +//! replacement step's state to "VolumeDeleted". + +use super::{ActionRegistry, NexusActionContext, NexusSaga, SagaInitError}; +use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::volume_delete; +use crate::app::{authn, db}; +use serde::Deserialize; +use serde::Serialize; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// region snapshot replacement step garbage collect saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Params { + pub serialized_authn: authn::saga::Serialized, + /// The fake volume created for the snapshot that was replaced + // Note: this is only required in the params to build the volume-delete sub + // saga + pub old_snapshot_volume_id: Uuid, + pub request: db::model::RegionSnapshotReplacementStep, +} + +// region snapshot replacement step garbage collect saga: actions + +declare_saga_actions! { + region_snapshot_replacement_step_garbage_collect; + UPDATE_REQUEST_RECORD -> "unused_1" { + + srsgs_update_request_record + } +} + +// region snapshot replacement step garbage collect saga: definition + +#[derive(Debug)] +pub(crate) struct SagaRegionSnapshotReplacementStepGarbageCollect; +impl NexusSaga for SagaRegionSnapshotReplacementStepGarbageCollect { + const NAME: &'static str = + "region-snapshot-replacement-step-garbage-collect"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + region_snapshot_replacement_step_garbage_collect_register_actions( + registry, + ); + } + + fn make_saga_dag( + params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + let subsaga_params = volume_delete::Params { + serialized_authn: params.serialized_authn.clone(), + volume_id: params.old_snapshot_volume_id, + }; + + let subsaga_dag = { + let subsaga_builder = steno::DagBuilder::new(steno::SagaName::new( + volume_delete::SagaVolumeDelete::NAME, + )); + volume_delete::SagaVolumeDelete::make_saga_dag( + &subsaga_params, + subsaga_builder, + )? + }; + + builder.append(Node::constant( + "params_for_volume_delete_subsaga", + serde_json::to_value(&subsaga_params).map_err(|e| { + SagaInitError::SerializeError( + "params_for_volume_delete_subsaga".to_string(), + e, + ) + })?, + )); + + builder.append(Node::subsaga( + "volume_delete_subsaga_no_result", + subsaga_dag, + "params_for_volume_delete_subsaga", + )); + + builder.append(update_request_record_action()); + + Ok(builder.build()?) + } +} + +// region snapshot replacement step garbage collect saga: action implementations + +async fn srsgs_update_request_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // Now that the region snapshot step volume has been deleted, update the + // replacement request record to 'VolumeDeleted'. There is no undo step for + // this, it should succeed idempotently. + + datastore + .set_region_snapshot_replacement_step_volume_deleted( + &opctx, + params.request.id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +#[cfg(test)] +pub(crate) mod test { + use crate::app::sagas::region_snapshot_replacement_step_garbage_collect::*; + use nexus_db_model::RegionSnapshotReplacementStep; + use nexus_db_model::RegionSnapshotReplacementStepState; + use nexus_db_model::Volume; + use nexus_db_queries::authn::saga::Serialized; + use nexus_db_queries::context::OpContext; + use nexus_test_utils_macros::nexus_test; + use sled_agent_client::types::CrucibleOpts; + use sled_agent_client::types::VolumeConstructionRequest; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_step_garbage_collect_saga( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + // Manually insert required records + let old_snapshot_volume_id = Uuid::new_v4(); + + let volume_construction_request = VolumeConstructionRequest::Volume { + id: old_snapshot_volume_id, + block_size: 0, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 0, + blocks_per_extent: 0, + extent_count: 0, + gen: 0, + opts: CrucibleOpts { + id: old_snapshot_volume_id, + target: vec![ + // XXX if you put something here, you'll need a + // synthetic dataset record + ], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: false, + }, + }], + read_only_parent: None, + }; + + let volume_data = + serde_json::to_string(&volume_construction_request).unwrap(); + + datastore + .volume_create(Volume::new(old_snapshot_volume_id, volume_data)) + .await + .unwrap(); + + let mut request = + RegionSnapshotReplacementStep::new(Uuid::new_v4(), Uuid::new_v4()); + request.replacement_state = + RegionSnapshotReplacementStepState::Complete; + request.old_snapshot_volume_id = Some(old_snapshot_volume_id); + + datastore + .insert_region_snapshot_replacement_step(&opctx, request.clone()) + .await + .unwrap(); + + // Run the saga + let params = Params { + serialized_authn: Serialized::for_opctx(&opctx), + old_snapshot_volume_id, + request: request.clone(), + }; + + let _output = nexus + .sagas + .saga_execute::( + params, + ) + .await + .unwrap(); + + // Validate the state transition + let result = datastore + .get_region_snapshot_replacement_step_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + result.replacement_state, + RegionSnapshotReplacementStepState::VolumeDeleted + ); + + // Validate the Volume was deleted + assert!(datastore + .volume_get(old_snapshot_volume_id) + .await + .unwrap() + .is_none()); + } +} diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index d9cbb5eb34..6859e992ca 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -139,6 +139,7 @@ instance_updater.disable = true instance_updater.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 +region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index 8e4b6b3013..e5fd35d1e3 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -36,3 +36,13 @@ pub struct RegionSnapshotReplacementGarbageCollectStatus { pub garbage_collect_requested: Vec, pub errors: Vec, } + +/// The status of a `region_snapshot_replacement_step` background task +/// activation +#[derive(Serialize, Deserialize, Default, Debug, PartialEq, Eq)] +pub struct RegionSnapshotReplacementStepStatus { + pub step_records_created_ok: Vec, + pub step_garbage_collect_invoked_ok: Vec, + pub step_invoked_ok: Vec, + pub errors: Vec, +} diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 2e3a8fe578..f0f40d282e 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -67,6 +67,7 @@ lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 +region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index dbd61e953d..23340b3c36 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -67,6 +67,7 @@ lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 +region_snapshot_replacement_step.period_secs = 30 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds.