diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index 4e929b9b4b..9e2c2f55e9 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1672,6 +1672,50 @@ impl DataStore { } } +/// Check if a region is present in a Volume Construction Request +fn region_in_vcr( + vcr: &VolumeConstructionRequest, + region: &SocketAddrV6, +) -> anyhow::Result { + let mut parts: VecDeque<&VolumeConstructionRequest> = VecDeque::new(); + parts.push_back(vcr); + + let mut region_found = false; + + while let Some(vcr_part) = parts.pop_front() { + match vcr_part { + VolumeConstructionRequest::Volume { sub_volumes, .. } => { + for sub_volume in sub_volumes { + parts.push_back(sub_volume); + } + + // Skip looking at read-only parent, this function only looks + // for R/W regions + } + + VolumeConstructionRequest::Url { .. } => { + // nothing required + } + + VolumeConstructionRequest::Region { opts, .. } => { + for target in &opts.target { + let parsed_target: SocketAddrV6 = target.parse()?; + if parsed_target == *region { + region_found = true; + break; + } + } + } + + VolumeConstructionRequest::File { .. } => { + // nothing required + } + } + } + + Ok(region_found) +} + pub struct VolumeReplacementParams { pub volume_id: Uuid, pub region_id: Uuid, @@ -1796,6 +1840,61 @@ impl DataStore { .transaction(&conn, |conn| { let err = err.clone(); async move { + // Grab the old volume first + let maybe_old_volume = { + volume_dsl::volume + .filter(volume_dsl::id.eq(existing.volume_id)) + .select(Volume::as_select()) + .first_async::(&conn) + .await + .optional() + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeReplaceRegionError::Public( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + ) + }) + })? + }; + + let old_volume = if let Some(old_volume) = maybe_old_volume { + old_volume + } else { + // Existing volume was deleted, so return an error. We + // can't perform the region replacement now! + return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); + }; + + let old_vcr: VolumeConstructionRequest = + match serde_json::from_str(&old_volume.data()) { + Ok(vcr) => vcr, + Err(e) => { + return Err(err.bail(VolumeReplaceRegionError::SerdeError(e))); + }, + }; + + // Does it look like this replacement already happened? + let old_region_in_vcr = match region_in_vcr(&old_vcr, &existing.region_addr) { + Ok(v) => v, + Err(e) => { + return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e))); + }, + }; + let new_region_in_vcr = match region_in_vcr(&old_vcr, &replacement.region_addr) { + Ok(v) => v, + Err(e) => { + return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e))); + }, + }; + + if !old_region_in_vcr && new_region_in_vcr { + // It does seem like the replacement happened + return Ok(()); + } + use db::schema::region::dsl as region_dsl; use db::schema::volume::dsl as volume_dsl; @@ -1838,40 +1937,6 @@ impl DataStore { // Update the existing volume's construction request to // replace the existing region's SocketAddrV6 with the // replacement region's - let maybe_old_volume = { - volume_dsl::volume - .filter(volume_dsl::id.eq(existing.volume_id)) - .select(Volume::as_select()) - .first_async::(&conn) - .await - .optional() - .map_err(|e| { - err.bail_retryable_or_else(e, |e| { - VolumeReplaceRegionError::Public( - public_error_from_diesel( - e, - ErrorHandler::Server, - ) - ) - }) - })? - }; - - let old_volume = if let Some(old_volume) = maybe_old_volume { - old_volume - } else { - // existing volume was deleted, so return an error, we - // can't perform the region replacement now! - return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); - }; - - let old_vcr: VolumeConstructionRequest = - match serde_json::from_str(&old_volume.data()) { - Ok(vcr) => vcr, - Err(e) => { - return Err(err.bail(VolumeReplaceRegionError::SerdeError(e))); - }, - }; // Copy the old volume's VCR, changing out the old region // for the new. diff --git a/nexus/src/app/background/region_replacement.rs b/nexus/src/app/background/region_replacement.rs index fc92f888b9..02ae548d75 100644 --- a/nexus/src/app/background/region_replacement.rs +++ b/nexus/src/app/background/region_replacement.rs @@ -5,32 +5,57 @@ //! Background task for detecting regions that need replacing and beginning that //! process //! -//! TODO this is currently a placeholder for a future PR +//! This task's responsibility is to create region replacement requests when +//! physical disks are expunged, and trigger the region replacement start saga +//! for any requests that are in state "Requested". See the documentation there +//! for more information. use super::common::BackgroundTask; -use crate::app::sagas::SagaRequest; +use crate::app::authn; +use crate::app::sagas; +use crate::app::RegionAllocationStrategy; use futures::future::BoxFuture; use futures::FutureExt; +use nexus_db_model::RegionReplacement; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use omicron_uuid_kinds::GenericUuid; +use omicron_uuid_kinds::TypedUuid; use serde_json::json; use std::sync::Arc; +use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; pub struct RegionReplacementDetector { - _datastore: Arc, - _saga_request: Sender, + datastore: Arc, + saga_request: Sender, } impl RegionReplacementDetector { pub fn new( datastore: Arc, - saga_request: Sender, + saga_request: Sender, ) -> Self { - RegionReplacementDetector { - _datastore: datastore, - _saga_request: saga_request, - } + RegionReplacementDetector { datastore, saga_request } + } + + async fn send_start_request( + &self, + serialized_authn: authn::saga::Serialized, + request: RegionReplacement, + ) -> Result<(), SendError> { + let saga_request = sagas::SagaRequest::RegionReplacementStart { + params: sagas::region_replacement_start::Params { + serialized_authn, + request, + allocation_strategy: + RegionAllocationStrategy::RandomWithDistinctSleds { + seed: None, + }, + }, + }; + + self.saga_request.send(saga_request).await } } @@ -43,15 +68,192 @@ impl BackgroundTask for RegionReplacementDetector { let log = &opctx.log; warn!(&log, "region replacement task started"); - // TODO + let mut ok = 0; + let mut err = 0; + + // Find regions on expunged physical disks + let regions_to_be_replaced = match self + .datastore + .find_regions_on_expunged_physical_disks(opctx) + .await + { + Ok(regions) => regions, + + Err(e) => { + error!( + &log, + "find_regions_on_expunged_physical_disks failed: \ + {e}" + ); + err += 1; + + return json!({ + "region_replacement_started_ok": ok, + "region_replacement_started_err": err, + }); + } + }; + + // Then create replacement requests for those if one doesn't exist + // yet. + for region in regions_to_be_replaced { + let maybe_request = match self + .datastore + .lookup_region_replacement_request_by_old_region_id( + opctx, + TypedUuid::from_untyped_uuid(region.id()), + ) + .await + { + Ok(v) => v, + + Err(e) => { + error!( + &log, + "error looking for existing region \ + replacement requests for {}: {e}", + region.id(), + ); + continue; + } + }; + + if maybe_request.is_none() { + match self + .datastore + .create_region_replacement_request_for_region( + opctx, ®ion, + ) + .await + { + Ok(request_id) => { + info!( + &log, + "added region replacement request \ + {request_id} for {} volume {}", + region.id(), + region.volume_id(), + ); + } + + Err(e) => { + error!( + &log, + "error adding region replacement request for \ + region {} volume id {}: {e}", + region.id(), + region.volume_id(), + ); + continue; + } + } + } + } + + // Next, for each region replacement request in state "Requested", + // run the start saga. + match self.datastore.get_requested_region_replacements(opctx).await + { + Ok(requests) => { + for request in requests { + let result = self + .send_start_request( + authn::saga::Serialized::for_opctx(opctx), + request, + ) + .await; + + match result { + Ok(()) => { + ok += 1; + } + + Err(e) => { + error!( + &log, + "sending region replacement start request \ + failed: {e}", + ); + err += 1; + } + }; + } + } + + Err(e) => { + error!( + &log, + "query for region replacement requests failed: {e}", + ); + } + } warn!(&log, "region replacement task done"); json!({ - "region_replacement_started_ok": 0, - "region_replacement_started_err": 0, + "region_replacement_started_ok": ok, + "region_replacement_started_err": err, }) } .boxed() } } + +#[cfg(test)] +mod test { + use super::*; + use nexus_db_model::RegionReplacement; + use nexus_test_utils_macros::nexus_test; + use tokio::sync::mpsc; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + #[nexus_test(server = crate::Server)] + async fn test_add_region_replacement_causes_start( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let mut task = + RegionReplacementDetector::new(datastore.clone(), saga_request_tx); + + // Noop test + let result = task.activate(&opctx).await; + assert_eq!( + result, + json!({ + "region_replacement_started_ok": 0, + "region_replacement_started_err": 0, + }) + ); + + // Add a region replacement request for a fake region + let request = RegionReplacement::new(Uuid::new_v4(), Uuid::new_v4()); + + datastore + .insert_region_replacement_request(&opctx, request) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // replacement start saga + let result = task.activate(&opctx).await; + assert_eq!( + result, + json!({ + "region_replacement_started_ok": 1, + "region_replacement_started_err": 0, + }) + ); + + saga_request_rx.try_recv().unwrap(); + } +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index f9bcc2cf80..8e4a795a95 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -920,12 +920,39 @@ impl Nexus { /// Reliable persistent workflows can request that sagas be executed by /// sending a SagaRequest to a supplied channel. Execute those here. - pub(crate) async fn handle_saga_request(&self, saga_request: SagaRequest) { + pub(crate) async fn handle_saga_request( + self: &Arc, + saga_request: SagaRequest, + ) { match saga_request { #[cfg(test)] SagaRequest::TestOnly => { unimplemented!(); } + + SagaRequest::RegionReplacementStart { params } => { + let nexus = self.clone(); + tokio::spawn(async move { + let saga_result = nexus + .execute_saga::( + params, + ) + .await; + + match saga_result { + Ok(_) => { + info!( + nexus.log, + "region replacement drive saga completed ok" + ); + } + + Err(e) => { + warn!(nexus.log, "region replacement start saga returned an error: {e}"); + } + } + }); + } } } diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 1fe8d76783..44dd72c571 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -7,6 +7,10 @@ use super::*; use crate::Nexus; +use crucible_agent_client::types::Region; +use crucible_agent_client::types::RegionId; +use crucible_agent_client::Client as CrucibleAgentClient; +use crucible_pantry_client::types::VolumeConstructionRequest; use internal_dns::ServiceName; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; @@ -60,7 +64,7 @@ pub(crate) async fn call_pantry_attach_for_disk( disk.volume_id, ); - let volume_construction_request: crucible_pantry_client::types::VolumeConstructionRequest = + let volume_construction_request: VolumeConstructionRequest = serde_json::from_str(&disk_volume.data()).map_err(|e| { ActionError::action_failed(Error::internal_error(&format!( "failed to deserialize disk {} volume data: {}", @@ -107,3 +111,40 @@ pub(crate) async fn call_pantry_detach_for_disk( Ok(()) } + +/// GET a Region from a Crucible Agent +pub(crate) async fn get_region_from_agent( + agent_address: &SocketAddrV6, + region_id: Uuid, +) -> Result { + let url = format!("http://{}", agent_address); + let client = CrucibleAgentClient::new(&url); + + let result = client.region_get(&RegionId(region_id.to_string())).await; + + match result { + Ok(v) => Ok(v.into_inner()), + + Err(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::NOT_FOUND => { + Err(Error::non_resourcetype_not_found(format!( + "{region_id} not found" + ))) + } + + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => Err(Error::internal_error(&rv.message)), + } + } + + _ => Err(Error::internal_error( + "unexpected failure during `region_get`", + )), + }, + } +} diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index e725c1f093..ac9a30dd98 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -32,6 +32,7 @@ pub mod instance_ip_detach; pub mod instance_migrate; pub mod instance_start; pub mod project_create; +pub mod region_replacement_start; pub mod snapshot_create; pub mod snapshot_delete; pub mod test_saga; @@ -159,6 +160,9 @@ fn make_action_registry() -> ActionRegistry { ::register_actions( &mut registry, ); + ::register_actions( + &mut registry, + ); #[cfg(test)] ::register_actions(&mut registry); @@ -312,6 +316,10 @@ pub(crate) use declare_saga_actions; pub enum SagaRequest { #[cfg(test)] TestOnly, + + RegionReplacementStart { + params: region_replacement_start::Params, + }, } impl SagaRequest { diff --git a/nexus/src/app/sagas/region_replacement_start.rs b/nexus/src/app/sagas/region_replacement_start.rs new file mode 100644 index 0000000000..f546ae645b --- /dev/null +++ b/nexus/src/app/sagas/region_replacement_start.rs @@ -0,0 +1,1261 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Region replacements are required when a physical disk is expunged: Volumes +//! are now pointing to Regions whose Downstairs have gone away, and are in a +//! degraded state. The Upstairs can handle a single Downstairs going away at +//! any time, but the Volume remains degraded until a new Region replaces the +//! one that is gone. +//! +//! It's this saga's responsibility to start that replacement process. This saga +//! handles the following region replacement request state transitions: +//! +//! ```text +//! Requested <-- +//! | +//! | | +//! v | +//! | +//! Allocating -- +//! +//! | +//! v +//! +//! Running +//! ``` +//! +//! The first thing this saga does is set itself as the "operating saga" for the +//! request, and change the state to "Allocating". Then, it performs the following +//! steps: +//! +//! 1. Allocate a new region +//! +//! 2. For the affected Volume, swap the region being replaced with the new region. +//! +//! 3. Create a fake volume that can be later deleted with the region being +//! replaced. +//! +//! 4. Update the region replacement request by clearing the operating saga id +//! and changing the state to "Running". +//! +//! Any unwind will place the state back into Requested. +//! +//! See the documentation for the "region replacement drive" saga for the next +//! steps in the process. + +use super::{ + ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, + ACTION_GENERATE_ID, +}; +use crate::app::sagas::common_storage::get_region_from_agent; +use crate::app::sagas::declare_saga_actions; +use crate::app::RegionAllocationStrategy; +use crate::app::{authn, db}; +use nexus_db_queries::db::datastore::REGION_REDUNDANCY_THRESHOLD; +use omicron_common::api::external::Error; +use serde::Deserialize; +use serde::Serialize; +use sled_agent_client::types::CrucibleOpts; +use sled_agent_client::types::VolumeConstructionRequest; +use slog::Logger; +use std::net::SocketAddrV6; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// region replacement start saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Params { + pub serialized_authn: authn::saga::Serialized, + pub request: db::model::RegionReplacement, + pub allocation_strategy: RegionAllocationStrategy, +} + +// region replacement start saga: actions + +declare_saga_actions! { + region_replacement_start; + SET_SAGA_ID -> "unused_1" { + + srrs_set_saga_id + - srrs_set_saga_id_undo + } + GET_EXISTING_DATASETS_AND_REGIONS -> "existing_datasets_and_regions" { + + srrs_get_existing_datasets_and_regions + } + ALLOC_NEW_REGION -> "new_datasets_and_regions" { + + srrs_alloc_new_region + - srrs_alloc_new_region_undo + } + FIND_NEW_REGION -> "new_dataset_and_region" { + + srrs_find_new_region + } + NEW_REGION_ENSURE -> "ensured_dataset_and_region" { + + srrs_new_region_ensure + - srrs_new_region_ensure_undo + } + GET_OLD_REGION_ADDRESS -> "old_region_address" { + + srrs_get_old_region_address + } + GET_OLD_REGION_VOLUME_ID -> "old_region_volume_id" { + + srrs_get_old_region_volume_id + } + REPLACE_REGION_IN_VOLUME -> "unused_2" { + + srrs_replace_region_in_volume + - srrs_replace_region_in_volume_undo + } + CREATE_FAKE_VOLUME -> "unused_3" { + + srrs_create_fake_volume + - srrs_create_fake_volume_undo + } + UPDATE_REQUEST_RECORD -> "unused_4" { + + srrs_update_request_record + } +} + +// region replacement start saga: definition + +#[derive(Debug)] +pub(crate) struct SagaRegionReplacementStart; +impl NexusSaga for SagaRegionReplacementStart { + const NAME: &'static str = "region-replacement-start"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + region_replacement_start_register_actions(registry); + } + + fn make_saga_dag( + _params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + builder.append(Node::action( + "saga_id", + "GenerateSagaId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(Node::action( + "new_volume_id", + "GenerateNewVolumeId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(set_saga_id_action()); + builder.append(get_existing_datasets_and_regions_action()); + builder.append(alloc_new_region_action()); + builder.append(find_new_region_action()); + builder.append(new_region_ensure_action()); + builder.append(get_old_region_address_action()); + builder.append(get_old_region_volume_id_action()); + builder.append(replace_region_in_volume_action()); + builder.append(create_fake_volume_action()); + builder.append(update_request_record_action()); + + Ok(builder.build()?) + } +} + +// region replacement start saga: action implementations + +async fn srrs_set_saga_id( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + // Change the request record here to an intermediate "allocating" state to + // block out other sagas that will be triggered for the same request. This + // avoids Nexus allocating a bunch of replacement regions only to unwind all + // but one. + osagactx + .datastore() + .set_region_replacement_allocating(&opctx, params.request.id, saga_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn srrs_set_saga_id_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + osagactx + .datastore() + .undo_set_region_replacement_allocating( + &opctx, + params.request.id, + saga_id, + ) + .await?; + + Ok(()) +} + +async fn srrs_get_existing_datasets_and_regions( + sagactx: NexusActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + // Look up the existing region + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + // Find out the existing datasets and regions that back the volume + let datasets_and_regions = osagactx + .datastore() + .get_allocated_regions(db_region.volume_id()) + .await + .map_err(ActionError::action_failed)?; + + Ok(datasets_and_regions) +} + +async fn srrs_alloc_new_region( + sagactx: NexusActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // Look up the existing region: we want to duplicate it but at another + // physical disk + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + // Request an additional region for this volume: THRESHOLD + 1 is required + // in order to have the proper redundancy. It's important _not_ to delete + // the existing region first, as (if it's still there) then the Crucible + // agent could reuse the allocated port and cause trouble. + let datasets_and_regions = osagactx + .datastore() + .arbitrary_region_allocate_direct( + &opctx, + db_region.volume_id(), + db_region.block_size().to_bytes(), + db_region.blocks_per_extent(), + db_region.extent_count(), + ¶ms.allocation_strategy, + // Note: this assumes that previous redundancy is + // REGION_REDUNDANCY_THRESHOLD, and that region replacement will + // only be run for volumes that start at this redundancy level. + REGION_REDUNDANCY_THRESHOLD + 1, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(datasets_and_regions) +} + +fn find_only_new_region( + log: &Logger, + existing_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + new_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Option<(db::model::Dataset, db::model::Region)> { + // Only filter on whether or not a Region is in the existing list! Datasets + // can change values (like size_used) if this saga interleaves with other + // saga runs of the same type. + let mut dataset_and_region: Vec<(db::model::Dataset, db::model::Region)> = + new_datasets_and_regions + .into_iter() + .filter(|(_, r)| { + !existing_datasets_and_regions.iter().any(|(_, er)| er == r) + }) + .collect(); + + if dataset_and_region.len() != 1 { + error!( + log, + "find_only_new_region saw dataset_and_region len {}: {:?}", + dataset_and_region.len(), + dataset_and_region, + ); + + None + } else { + dataset_and_region.pop() + } +} + +async fn srrs_alloc_new_region_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + let maybe_dataset_and_region = find_only_new_region( + log, + sagactx.lookup::>( + "existing_datasets_and_regions", + )?, + sagactx.lookup::>( + "new_datasets_and_regions", + )?, + ); + + // It should be guaranteed that if srrs_alloc_new_region succeeded then it + // would have bumped the region redundancy to 4, and the existing region + // redundancy should be 3, so we should see something here. Guard against + // the case anyway. + if let Some(dataset_and_region) = maybe_dataset_and_region { + let (_, region) = dataset_and_region; + osagactx + .datastore() + .regions_hard_delete(log, vec![region.id()]) + .await?; + } + + Ok(()) +} + +async fn srrs_find_new_region( + sagactx: NexusActionContext, +) -> Result<(db::model::Dataset, db::model::Region), ActionError> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + let maybe_dataset_and_region = find_only_new_region( + log, + sagactx.lookup::>( + "existing_datasets_and_regions", + )?, + sagactx.lookup::>( + "new_datasets_and_regions", + )?, + ); + + let Some(dataset_and_region) = maybe_dataset_and_region else { + return Err(ActionError::action_failed(Error::internal_error( + &format!( + "expected dataset and region, saw {:?}!", + maybe_dataset_and_region, + ), + ))); + }; + + Ok(dataset_and_region) +} + +async fn srrs_new_region_ensure( + sagactx: NexusActionContext, +) -> Result< + (nexus_db_model::Dataset, crucible_agent_client::types::Region), + ActionError, +> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + // With a list of datasets and regions to ensure, other sagas need to have a + // separate no-op forward step for the undo action to ensure that the undo + // step occurs in the case that the ensure partially fails. Here this not + // required, there's only one dataset and region. + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + + let mut ensured_dataset_and_region = osagactx + .nexus() + .ensure_all_datasets_and_regions(&log, vec![new_dataset_and_region]) + .await + .map_err(ActionError::action_failed)?; + + if ensured_dataset_and_region.len() != 1 { + return Err(ActionError::action_failed(Error::internal_error( + &format!( + "expected 1 dataset and region, saw {}!", + ensured_dataset_and_region.len() + ), + ))); + } + + Ok(ensured_dataset_and_region.pop().unwrap()) +} + +async fn srrs_new_region_ensure_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + warn!(log, "srrs_new_region_ensure_undo: Deleting crucible regions"); + + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + + osagactx + .nexus() + .delete_crucible_regions(log, vec![new_dataset_and_region]) + .await?; + + Ok(()) +} + +async fn srrs_get_old_region_volume_id( + sagactx: NexusActionContext, +) -> Result { + // Save the region's original volume ID, because we'll be altering it and + // need the original + + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(db_region.volume_id()) +} + +async fn srrs_get_old_region_address( + sagactx: NexusActionContext, +) -> Result { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + // It was a mistake not to record the port of a region in the Region record. + // However, we haven't needed it until now! If the Crucible agent is gone + // (which it will be if the disk is expunged), assume that the region in the + // read/write portion of the volume with the same dataset address (there + // should only be one due to the allocation strategy!) is the old region. + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + let targets = osagactx + .datastore() + .get_dataset_rw_regions_in_volume( + &opctx, + db_region.dataset_id(), + db_region.volume_id(), + ) + .await + .map_err(ActionError::action_failed)?; + + if targets.len() == 1 { + // If there's a single RW region in the volume that matches this + // region's dataset, then it must match. Return the target + Ok(targets[0]) + } else { + // Otherwise, Nexus cannot know which region to target for replacement. + // Attempt grabbing the id from the corresponding Crucible agent: the + // sled or disk may not be physically gone, or we may be running in a + // test where the allocation strategy does not mandate distinct sleds. + + let db_dataset = osagactx + .datastore() + .dataset_get(db_region.dataset_id()) + .await + .map_err(ActionError::action_failed)?; + + match get_region_from_agent( + &db_dataset.address(), + params.request.old_region_id, + ) + .await + { + Ok(region) => { + // If the Crucible agent is still answering (i.e. if a region + // replacement was requested and the sled is still there, or if + // this is running in a test), then we know the port number for + // the region. + Ok(SocketAddrV6::new( + *db_dataset.address().ip(), + region.port_number, + 0, + 0, + )) + } + + Err(e) => { + error!( + log, + "error contacting crucible agent: {e}"; + "address" => ?db_dataset.address(), + ); + + // Bail out here! + Err(ActionError::action_failed(format!( + "{} regions match dataset {} in volume {}", + targets.len(), + db_region.dataset_id(), + db_region.volume_id(), + ))) + } + } + } +} + +async fn srrs_replace_region_in_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + let old_region_address = + sagactx.lookup::("old_region_address")?; + + let (new_dataset, ensured_region) = sagactx.lookup::<( + db::model::Dataset, + crucible_agent_client::types::Region, + )>( + "ensured_dataset_and_region", + )?; + + let new_region_address = SocketAddrV6::new( + *new_dataset.address().ip(), + ensured_region.port_number, + 0, + 0, + ); + + // If this node is rerun, the forward action will have overwritten + // db_region's volume id, so get the cached copy. + let old_volume_id = sagactx.lookup::("old_region_volume_id")?; + + info!( + log, + "replacing {} with {} in volume {}", + old_region_address, + new_region_address, + old_volume_id, + ); + + // `volume_replace_region` will swap the old region for the new region, + // assigning the old region to the new volume id for later (attempted) + // deletion. After this is done, repair or reconciliation needs to occur. + osagactx + .datastore() + .volume_replace_region( + /* target */ + db::datastore::VolumeReplacementParams { + volume_id: old_volume_id, + region_id: db_region.id(), + region_addr: old_region_address, + }, + /* replacement */ + db::datastore::VolumeReplacementParams { + volume_id: new_volume_id, + region_id: ensured_region.id.0.parse().unwrap(), + region_addr: new_region_address, + }, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn srrs_replace_region_in_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + // Undo the forward action's volume_replace_region call by swapping the + // region id and address fields in the parameters. Note: ROP removal may + // have occurred but this does not affect what volume_replace_region does. + // + // IMPORTANT: it is _not_ valid to undo this step if a repair or + // reconciliation has started! However that _should_ only start if this saga + // is successful due to the last step of this saga changing the request's + // state to "Running". + + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + let old_region_address = + sagactx.lookup::("old_region_address")?; + + let (new_dataset, ensured_region) = sagactx.lookup::<( + db::model::Dataset, + crucible_agent_client::types::Region, + )>( + "ensured_dataset_and_region", + )?; + + let new_region_address = SocketAddrV6::new( + *new_dataset.address().ip(), + ensured_region.port_number, + 0, + 0, + ); + + // The forward action will have overwritten db_region's volume id, so get + // the cached copy. + let old_volume_id = sagactx.lookup::("old_region_volume_id")?; + + info!( + log, + "undo: replacing {} with {} in volume {}", + old_region_address, + new_region_address, + old_volume_id, + ); + + // Note: volume ID is not swapped! The fake volume hasn't been created yet, + // and we have to target the original volume id. + osagactx + .datastore() + .volume_replace_region( + /* target */ + db::datastore::VolumeReplacementParams { + volume_id: old_volume_id, + region_id: ensured_region.id.0.parse().unwrap(), + region_addr: new_region_address, + }, + /* replacement */ + db::datastore::VolumeReplacementParams { + volume_id: new_volume_id, + region_id: db_region.id(), + region_addr: old_region_address, + }, + ) + .await?; + + Ok(()) +} + +async fn srrs_create_fake_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + let old_region_address = + sagactx.lookup::("old_region_address")?; + + // One the new region was swapped in, create a fake volume record for the + // old region record. This will be deleted after region replacement has + // finished. + + let volume_construction_request = VolumeConstructionRequest::Volume { + id: new_volume_id, + block_size: 0, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 0, + blocks_per_extent: 0, + extent_count: 0, + gen: 0, + opts: CrucibleOpts { + id: new_volume_id, + target: vec![old_region_address.to_string()], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: false, + }, + }], + read_only_parent: None, + }; + + let volume_data = serde_json::to_string(&volume_construction_request) + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; + + let volume = db::model::Volume::new(new_volume_id, volume_data); + + osagactx + .datastore() + .volume_create(volume) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn srrs_create_fake_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + // Delete the fake volume. + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + osagactx.datastore().volume_hard_delete(new_volume_id).await?; + + Ok(()) +} + +async fn srrs_update_request_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + let new_region_id = new_dataset_and_region.1.id(); + + let old_region_volume_id = sagactx.lookup::("new_volume_id")?; + + // Now that the region has been ensured and the construction request has + // been updated, update the replacement request record to 'Running' and + // clear the operating saga id. There is no undo step for this, it should + // succeed idempotently. + datastore + .set_region_replacement_running( + &opctx, + params.request.id, + saga_id, + new_region_id, + old_region_volume_id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +#[cfg(test)] +pub(crate) mod test { + use crate::{ + app::db::lookup::LookupPath, app::db::DataStore, + app::saga::create_saga_dag, + app::sagas::region_replacement_start::find_only_new_region, + app::sagas::region_replacement_start::Params, + app::sagas::region_replacement_start::SagaRegionReplacementStart, + app::sagas::test_helpers::test_opctx, app::RegionAllocationStrategy, + }; + use chrono::Utc; + use nexus_db_model::Dataset; + use nexus_db_model::DatasetKind; + use nexus_db_model::Region; + use nexus_db_model::RegionReplacement; + use nexus_db_model::RegionReplacementState; + use nexus_db_model::Volume; + use nexus_db_queries::authn::saga::Serialized; + use nexus_db_queries::context::OpContext; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils_macros::nexus_test; + use nexus_types::identity::Asset; + use sled_agent_client::types::VolumeConstructionRequest; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const DISK_NAME: &str = "my-disk"; + const PROJECT_NAME: &str = "springfield-squidport"; + + #[nexus_test(server = crate::Server)] + async fn test_region_replacement_start_saga( + cptestctx: &ControlPlaneTestContext, + ) { + let mut disk_test = DiskTest::new(cptestctx).await; + disk_test.add_zpool_with_dataset(&cptestctx).await; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let _project_id = + create_project(&client, PROJECT_NAME).await.identity.id; + + // Create a disk + let client = &cptestctx.external_client; + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + // Assert disk has three allocated regions + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + let allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(allocated_regions.len(), 3); + + // Replace one of the disk's regions + let region_to_replace: &nexus_db_model::Region = + &allocated_regions[0].1; + + // Manually insert the replacement request + let request = RegionReplacement { + id: Uuid::new_v4(), + request_time: Utc::now(), + old_region_id: region_to_replace.id(), + volume_id: region_to_replace.volume_id(), + old_region_volume_id: None, + new_region_id: None, + replacement_state: RegionReplacementState::Requested, + operating_saga_id: None, + }; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Run the region replacement start saga + let dag = create_saga_dag::(Params { + serialized_authn: Serialized::for_opctx(&opctx), + request: request.clone(), + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + }) + .unwrap(); + + let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); + + // Actually run the saga + let output = nexus.run_saga(runnable_saga).await.unwrap(); + + // Validate the state transition + let result = datastore + .get_region_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + assert_eq!(result.replacement_state, RegionReplacementState::Running); + assert!(result.new_region_id.is_some()); + assert!(result.operating_saga_id.is_none()); + + // Validate number of regions for disk didn't change + let allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(allocated_regions.len(), 3); + + // Validate that one of the regions for the disk is the new one + let new_region = + datastore.get_region(result.new_region_id.unwrap()).await.unwrap(); + assert!(allocated_regions + .iter() + .any(|(_, region)| *region == new_region)); + + // Validate the old region has the new volume id + let old_region = + datastore.get_region(region_to_replace.id()).await.unwrap(); + let new_volume_id = + output.lookup_node_output::("new_volume_id").unwrap(); + assert_eq!(old_region.volume_id(), new_volume_id); + } + + #[nexus_test(server = crate::Server)] + async fn test_find_only_new_region(cptestctx: &ControlPlaneTestContext) { + let log = &cptestctx.logctx.log; + + let datasets = vec![ + Dataset::new( + Uuid::new_v4(), + Uuid::new_v4(), + "[fd00:1122:3344:101::1]:12345".parse().unwrap(), + DatasetKind::Crucible, + ), + Dataset::new( + Uuid::new_v4(), + Uuid::new_v4(), + "[fd00:1122:3344:102::1]:12345".parse().unwrap(), + DatasetKind::Crucible, + ), + Dataset::new( + Uuid::new_v4(), + Uuid::new_v4(), + "[fd00:1122:3344:103::1]:12345".parse().unwrap(), + DatasetKind::Crucible, + ), + Dataset::new( + Uuid::new_v4(), + Uuid::new_v4(), + "[fd00:1122:3344:104::1]:12345".parse().unwrap(), + DatasetKind::Crucible, + ), + ]; + + let regions = vec![ + Region::new( + datasets[0].id(), + Uuid::new_v4(), + 512_i64.try_into().unwrap(), + 10, + 10, + ), + Region::new( + datasets[1].id(), + Uuid::new_v4(), + 512_i64.try_into().unwrap(), + 10, + 10, + ), + Region::new( + datasets[2].id(), + Uuid::new_v4(), + 512_i64.try_into().unwrap(), + 10, + 10, + ), + Region::new( + datasets[3].id(), + Uuid::new_v4(), + 512_i64.try_into().unwrap(), + 10, + 10, + ), + ]; + + let existing_datasets_and_regions = vec![ + (datasets[0].clone(), regions[0].clone()), + (datasets[1].clone(), regions[1].clone()), + (datasets[2].clone(), regions[2].clone()), + ]; + + let new_datasets_and_regions = vec![ + (datasets[0].clone(), regions[0].clone()), + (datasets[1].clone(), regions[1].clone()), + (datasets[2].clone(), regions[2].clone()), + (datasets[3].clone(), regions[3].clone()), + ]; + + let only_new_region = find_only_new_region( + &log, + existing_datasets_and_regions, + new_datasets_and_regions, + ); + + assert_eq!( + only_new_region, + Some((datasets[3].clone(), regions[3].clone())) + ); + } + + fn new_test_params( + opctx: &OpContext, + request: &RegionReplacement, + ) -> Params { + Params { + serialized_authn: Serialized::for_opctx(opctx), + request: request.clone(), + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + } + } + + pub(crate) async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + test: &DiskTest, + request: &RegionReplacement, + affected_volume_original: &Volume, + affected_region_original: &Region, + ) { + let datastore = cptestctx.server.server_context().nexus.datastore(); + + crate::app::sagas::test_helpers::assert_no_failed_undo_steps( + &cptestctx.logctx.log, + datastore, + ) + .await; + + assert!(three_region_allocations_exist(&datastore, &test).await); + assert_region_replacement_request_untouched( + cptestctx, &datastore, &request, + ) + .await; + assert_volume_untouched(&datastore, &affected_volume_original).await; + assert_region_untouched(&datastore, &affected_region_original).await; + } + + async fn three_region_allocations_exist( + datastore: &DataStore, + test: &DiskTest, + ) -> bool { + let mut count = 0; + + for zpool in &test.zpools { + for dataset in &zpool.datasets { + if datastore + .regions_total_occupied_size(dataset.id) + .await + .unwrap() + != 0 + { + count += 1; + } + } + } + + count == 3 + } + + async fn assert_region_replacement_request_untouched( + cptestctx: &ControlPlaneTestContext, + datastore: &DataStore, + request: &RegionReplacement, + ) { + let opctx = test_opctx(cptestctx); + let db_request = datastore + .get_region_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!(db_request.new_region_id, None); + assert_eq!( + db_request.replacement_state, + RegionReplacementState::Requested + ); + assert_eq!(db_request.operating_saga_id, None); + } + + fn zero_out_gen_number(vcr: &mut VolumeConstructionRequest) { + match vcr { + VolumeConstructionRequest::Volume { + sub_volumes, + read_only_parent, + .. + } => { + for sv in sub_volumes { + zero_out_gen_number(sv); + } + + if let Some(rop) = read_only_parent { + zero_out_gen_number(rop); + } + } + + VolumeConstructionRequest::Region { gen, .. } => { + *gen = 0; + } + + _ => {} + } + } + + async fn assert_volume_untouched( + datastore: &DataStore, + affected_volume_original: &Volume, + ) { + let affected_volume = datastore + .volume_get(affected_volume_original.id()) + .await + .unwrap() + .unwrap(); + + let mut actual: VolumeConstructionRequest = + serde_json::from_str(&affected_volume.data()).unwrap(); + + let mut expected: VolumeConstructionRequest = + serde_json::from_str(&affected_volume_original.data()).unwrap(); + + zero_out_gen_number(&mut actual); + zero_out_gen_number(&mut expected); + + assert_eq!(actual, expected); + } + + async fn assert_region_untouched( + datastore: &DataStore, + affected_region_original: &Region, + ) { + let affected_region = + datastore.get_region(affected_region_original.id()).await.unwrap(); + assert_eq!(&affected_region, affected_region_original); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let mut disk_test = DiskTest::new(cptestctx).await; + disk_test.add_zpool_with_dataset(&cptestctx).await; + + let log = &cptestctx.logctx.log; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + + let _ = create_project(&client, PROJECT_NAME).await.identity.id; + let opctx = test_opctx(&cptestctx); + + // Create a disk, and use the first region as input to the replacement + // start saga + + let client = &cptestctx.external_client; + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + let allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(allocated_regions.len(), 3); + + let region_to_replace: &Region = &allocated_regions[0].1; + + let request = RegionReplacement { + id: Uuid::new_v4(), + request_time: Utc::now(), + old_region_id: region_to_replace.id(), + volume_id: region_to_replace.volume_id(), + old_region_volume_id: None, + new_region_id: None, + replacement_state: RegionReplacementState::Requested, + operating_saga_id: None, + }; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + let affected_volume_original = datastore + .volume_get(region_to_replace.volume_id()) + .await + .unwrap() + .unwrap(); + + crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::< + SagaRegionReplacementStart, + _, + _ + >( + nexus, + || Box::pin(async { new_test_params(&opctx, &request) }), + || Box::pin(async { + verify_clean_slate( + &cptestctx, + &disk_test, + &request, + &affected_volume_original, + ®ion_to_replace, + ).await; + }), + log + ).await; + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let mut disk_test = DiskTest::new(cptestctx).await; + disk_test.add_zpool_with_dataset(&cptestctx).await; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + + let _ = create_project(&client, PROJECT_NAME).await.identity.id; + let opctx = test_opctx(&cptestctx); + + // Create a disk, and use the first region as input to the replacement + // start saga + + let client = &cptestctx.external_client; + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + let allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(allocated_regions.len(), 3); + + let region_to_replace: &Region = &allocated_regions[0].1; + + let request = RegionReplacement { + id: Uuid::new_v4(), + request_time: Utc::now(), + old_region_id: region_to_replace.id(), + volume_id: region_to_replace.volume_id(), + old_region_volume_id: None, + new_region_id: None, + replacement_state: RegionReplacementState::Requested, + operating_saga_id: None, + }; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Build the saga DAG with the provided test parameters + let params = new_test_params(&opctx, &request); + let dag = + create_saga_dag::(params).unwrap(); + crate::app::sagas::test_helpers::actions_succeed_idempotently( + nexus, dag, + ) + .await; + } +} diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 78ebd83973..beba6ba7c0 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -28,6 +28,7 @@ use nexus_test_utils::resource_helpers::create_instance_with; use nexus_test_utils::resource_helpers::create_project; use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::DiskTest; +use nexus_test_utils::SLED_AGENT_UUID; use nexus_test_utils_macros::nexus_test; use nexus_types::external_api::params; use omicron_common::api::external::ByteCount; @@ -2541,6 +2542,60 @@ async fn test_no_halt_disk_delete_one_region_on_expunged_agent( assert!(datasets_and_regions.is_empty()); } +#[nexus_test] +async fn test_disk_expunge(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create three 10 GiB zpools, each with one dataset. + let _disk_test = DiskTest::new(&cptestctx).await; + + // Assert default is still 10 GiB + assert_eq!(10, DiskTest::DEFAULT_ZPOOL_SIZE_GIB); + + // Create a disk + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + // Assert disk has three allocated regions + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + let allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(allocated_regions.len(), REGION_REDUNDANCY_THRESHOLD); + + // Expunge the sled + let int_client = &cptestctx.internal_client; + int_client + .make_request( + Method::POST, + "/sleds/expunge", + Some(params::SledSelector { + sled: SLED_AGENT_UUID.parse().unwrap(), + }), + StatusCode::OK, + ) + .await + .unwrap(); + + // All three regions should be returned + let expunged_regions = datastore + .find_regions_on_expunged_physical_disks(&opctx) + .await + .unwrap(); + + assert_eq!(expunged_regions.len(), 3); +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser)