From 5e1251afd76bddc1fbe4a7f10cdcc7dc1ff1ff69 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 19 Sep 2024 06:57:10 +0000 Subject: [PATCH 1/3] Return richer enum types from datastore functions Create new enum types and return those to give more information to callers: - `create_region_snapshot_replacement_step` and `insert_region_snapshot_replacement_step` now return `InsertRegionSnapshotReplacementStepResult` - `volume_replace_region` and `volume_replace_snapshot` now return `VolumeReplaceResult` Notably, `VolumeReplaceResult::ExistingVolumeDeleted` replaces the previous error type `TargetVolumeDeleted`, and is not an error, allowing the caller to take action of the existing volume was deleted. This commit was peeled off work in progress to address #6353. --- nexus/db-model/src/region_replacement.rs | 2 +- nexus/db-queries/src/db/datastore/mod.rs | 2 + .../datastore/region_snapshot_replacement.rs | 133 +++++++++++++----- nexus/db-queries/src/db/datastore/volume.rs | 75 +++++----- .../region_snapshot_replacement_finish.rs | 16 ++- .../tasks/region_snapshot_replacement_step.rs | 44 ++++-- .../src/app/sagas/region_replacement_start.rs | 41 +++++- .../region_snapshot_replacement_start.rs | 39 ++++- .../sagas/region_snapshot_replacement_step.rs | 39 ++++- ...apshot_replacement_step_garbage_collect.rs | 8 +- 10 files changed, 309 insertions(+), 90 deletions(-) diff --git a/nexus/db-model/src/region_replacement.rs b/nexus/db-model/src/region_replacement.rs index 51570cf7f7..995c55001c 100644 --- a/nexus/db-model/src/region_replacement.rs +++ b/nexus/db-model/src/region_replacement.rs @@ -84,7 +84,7 @@ impl std::str::FromStr for RegionReplacementState { /// | | /// v | /// | -/// Completed --- +/// Complete --- /// ``` /// /// which are captured in the RegionReplacementState enum. Annotated on the diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 6eec9500dc..fbbaddcefc 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -120,6 +120,7 @@ pub use rack::RackInit; pub use rack::SledUnderlayAllocationResult; pub use region::RegionAllocationFor; pub use region::RegionAllocationParameters; +pub use region_snapshot_replacement::InsertRegionSnapshotReplacementStepResult; pub use silo::Discoverability; pub use sled::SledTransition; pub use sled::TransitionError; @@ -132,6 +133,7 @@ pub use volume::CrucibleTargets; pub use volume::ExistingTarget; pub use volume::ReplacementTarget; pub use volume::VolumeCheckoutReason; +pub use volume::VolumeReplaceResult; pub use volume::VolumeReplacementParams; pub use volume::VolumeToDelete; pub use volume::VolumeWithTarget; diff --git a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs index d6a43edbfe..234ee783e4 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs @@ -23,13 +23,23 @@ use crate::db::pagination::Paginator; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use crate::db::TransactionError; -use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use omicron_common::api::external::Error; use uuid::Uuid; +#[must_use] +#[derive(Debug, PartialEq)] +pub enum InsertRegionSnapshotReplacementStepResult { + /// A new region snapshot replacement step was inserted. + Inserted { step_id: Uuid }, + + /// A region snapshot replacement step exists already that references this + /// volume id, so no new record is inserted. + AlreadyHandled { existing_step_id: Uuid }, +} + impl DataStore { /// Create and insert a region snapshot replacement request for a /// RegionSnapshot, returning the ID of the request. @@ -614,28 +624,23 @@ impl DataStore { opctx: &OpContext, request_id: Uuid, volume_id: Uuid, - ) -> Result { + ) -> Result { let request = RegionSnapshotReplacementStep::new(request_id, volume_id); - let request_id = request.id; - - self.insert_region_snapshot_replacement_step(opctx, request).await?; - Ok(request_id) + self.insert_region_snapshot_replacement_step(opctx, request).await } pub async fn insert_region_snapshot_replacement_step( &self, opctx: &OpContext, request: RegionSnapshotReplacementStep, - ) -> Result<(), Error> { + ) -> Result { let conn = self.pool_connection_authorized(opctx).await?; - let err = OptionalError::new(); self.transaction_retry_wrapper( "insert_region_snapshot_replacement_step", ) .transaction(&conn, |conn| { - let err = err.clone(); let request = request.clone(); async move { @@ -656,11 +661,24 @@ impl DataStore { .optional()?; if let Some(found_record) = maybe_record { - return Err(err.bail(Error::conflict(format!( - "{:?} already referenced in old snapshot volume for \ - request {:?}", - request.volume_id, found_record.id, - )))); + return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id: found_record.id, + }); + } + + // Skip inserting this record if we found an existing region + // snapshot replacement step for it + + let maybe_record = dsl::region_snapshot_replacement_step + .filter(dsl::volume_id.eq(request.volume_id)) + .get_result_async::(&conn) + .await + .optional()?; + + if let Some(found_record) = maybe_record { + return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id: found_record.id, + }); } // The region snapshot replacement step saga could invoke a @@ -675,22 +693,18 @@ impl DataStore { .execute_async(&conn) .await?; + let request_id = request.id; + diesel::insert_into(dsl::region_snapshot_replacement_step) .values(request) .execute_async(&conn) .await?; - Ok(()) + Ok(InsertRegionSnapshotReplacementStepResult::Inserted { step_id: request_id }) } }) .await - .map_err(|e| { - if let Some(err) = err.take() { - return err; - } - - public_error_from_diesel(e, ErrorHandler::Server) - }) + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } pub async fn get_region_snapshot_replacement_step_by_id( @@ -1173,10 +1187,15 @@ mod test { Uuid::new_v4(), // volume id ); - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); } assert_eq!( @@ -1207,10 +1226,15 @@ mod test { step.replacement_state = RegionSnapshotReplacementStepState::Running; - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); } assert_eq!( @@ -1242,10 +1266,15 @@ mod test { step.replacement_state = RegionSnapshotReplacementStepState::VolumeDeleted; - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); } assert_eq!( @@ -1288,11 +1317,16 @@ mod test { RegionSnapshotReplacementStep::new(Uuid::new_v4(), volume_id); let first_request_id = step.id; - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + let step = RegionSnapshotReplacementStep::new(Uuid::new_v4(), volume_id); @@ -1333,11 +1367,16 @@ mod test { .await .unwrap(); - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step.clone()) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + // Ensure that transitioning the first step to volume deleted still // works. @@ -1388,19 +1427,31 @@ mod test { let mut step = RegionSnapshotReplacementStep::new(request_id, Uuid::new_v4()); step.replacement_state = RegionSnapshotReplacementStepState::Complete; - datastore + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + let mut step = RegionSnapshotReplacementStep::new(request_id, Uuid::new_v4()); step.replacement_state = RegionSnapshotReplacementStepState::Complete; - datastore + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + assert_eq!( 2, datastore @@ -1435,19 +1486,35 @@ mod test { RegionSnapshotReplacementStep::new(request_id, volume_id); step.replacement_state = RegionSnapshotReplacementStepState::Complete; step.old_snapshot_volume_id = Some(old_snapshot_volume_id); - datastore + + let first_step_id = step.id; + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + let step = RegionSnapshotReplacementStep::new( request_id, old_snapshot_volume_id, ); - datastore + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step) .await - .unwrap_err(); + .unwrap(); + + assert_eq!( + result, + InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id: first_step_id + } + ); db.cleanup().await.unwrap(); logctx.cleanup_successful(); @@ -1468,6 +1535,7 @@ mod test { let volume_id = Uuid::new_v4(); let request = RegionReplacement::new(Uuid::new_v4(), volume_id); + datastore .insert_region_replacement_request(&opctx, request) .await @@ -1475,6 +1543,7 @@ mod test { let request = RegionSnapshotReplacementStep::new(Uuid::new_v4(), volume_id); + datastore .insert_region_snapshot_replacement_step(&opctx, request) .await diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index f5c1f121e4..2e9fc5e45e 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1806,13 +1806,28 @@ pub struct ReplacementTarget(pub SocketAddrV6); #[derive(Debug, Clone, Copy)] pub struct VolumeToDelete(pub Uuid); +// The result type returned from both `volume_replace_region` and +// `volume_replace_snapshot` +#[must_use] +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub enum VolumeReplaceResult { + // based on the VCRs, seems like the replacement already happened + AlreadyHappened, + + // this call performed the replacement + Done, + + // the "existing" volume was deleted + ExistingVolumeDeleted, +} + impl DataStore { /// Replace a read-write region in a Volume with a new region. pub async fn volume_replace_region( &self, existing: VolumeReplacementParams, replacement: VolumeReplacementParams, - ) -> Result<(), Error> { + ) -> Result { // In a single transaction: // // - set the existing region's volume id to the replacement's volume id @@ -1911,9 +1926,6 @@ impl DataStore { #[error("Serde error during Volume region replacement: {0}")] SerdeError(#[from] serde_json::Error), - #[error("Target Volume deleted")] - TargetVolumeDeleted, - #[error("Region replacement error: {0}")] RegionReplacementError(#[from] anyhow::Error), } @@ -1947,9 +1959,11 @@ impl DataStore { let old_volume = if let Some(old_volume) = maybe_old_volume { old_volume } else { - // Existing volume was deleted, so return an error. We - // can't perform the region replacement now! - return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); + // Existing volume was deleted, so return here. We can't + // perform the region replacement now, and this will + // short-circuit the rest of the process. + + return Ok(VolumeReplaceResult::ExistingVolumeDeleted); }; let old_vcr: VolumeConstructionRequest = @@ -1976,7 +1990,7 @@ impl DataStore { if !old_region_in_vcr && new_region_in_vcr { // It does seem like the replacement happened - return Ok(()); + return Ok(VolumeReplaceResult::AlreadyHappened); } use db::schema::region::dsl as region_dsl; @@ -2061,7 +2075,7 @@ impl DataStore { }) })?; - Ok(()) + Ok(VolumeReplaceResult::Done) } }) .await @@ -2074,10 +2088,6 @@ impl DataStore { Error::internal_error(&err.to_string()) } - VolumeReplaceRegionError::TargetVolumeDeleted => { - Error::internal_error(&err.to_string()) - } - VolumeReplaceRegionError::RegionReplacementError(_) => { Error::internal_error(&err.to_string()) } @@ -2110,7 +2120,7 @@ impl DataStore { existing: ExistingTarget, replacement: ReplacementTarget, volume_to_delete_id: VolumeToDelete, - ) -> Result<(), Error> { + ) -> Result { #[derive(Debug, thiserror::Error)] enum VolumeReplaceSnapshotError { #[error("Error from Volume snapshot replacement: {0}")] @@ -2119,9 +2129,6 @@ impl DataStore { #[error("Serde error during Volume snapshot replacement: {0}")] SerdeError(#[from] serde_json::Error), - #[error("Target Volume deleted")] - TargetVolumeDeleted, - #[error("Snapshot replacement error: {0}")] SnapshotReplacementError(#[from] anyhow::Error), @@ -2163,11 +2170,11 @@ impl DataStore { let old_volume = if let Some(old_volume) = maybe_old_volume { old_volume } else { - // Existing volume was deleted, so return an error. We - // can't perform the snapshot replacement now! - return Err(err.bail( - VolumeReplaceSnapshotError::TargetVolumeDeleted - )); + // Existing volume was deleted, so return here. We can't + // perform the region snapshot replacement now, and this + // will short-circuit the rest of the process. + + return Ok(VolumeReplaceResult::ExistingVolumeDeleted); }; let old_vcr: VolumeConstructionRequest = @@ -2201,7 +2208,7 @@ impl DataStore { if !old_target_in_vcr && new_target_in_vcr { // It does seem like the replacement happened - return Ok(()); + return Ok(VolumeReplaceResult::AlreadyHappened); } // Update the existing volume's construction request to @@ -2312,7 +2319,7 @@ impl DataStore { )); } - Ok(()) + Ok(VolumeReplaceResult::Done) } }) .await @@ -2325,10 +2332,6 @@ impl DataStore { Error::internal_error(&err.to_string()) } - VolumeReplaceSnapshotError::TargetVolumeDeleted => { - Error::internal_error(&err.to_string()) - } - VolumeReplaceSnapshotError::SnapshotReplacementError(_) => { Error::internal_error(&err.to_string()) } @@ -2862,7 +2865,7 @@ mod tests { let target = region_and_volume_ids[0]; let replacement = region_and_volume_ids[3]; - db_datastore + let volume_replace_region_result = db_datastore .volume_replace_region( /* target */ db::datastore::VolumeReplacementParams { @@ -2884,6 +2887,8 @@ mod tests { .await .unwrap(); + assert_eq!(volume_replace_region_result, VolumeReplaceResult::Done); + let vcr: VolumeConstructionRequest = serde_json::from_str( db_datastore.volume_get(volume_id).await.unwrap().unwrap().data(), ) @@ -2922,7 +2927,7 @@ mod tests { ); // Now undo the replacement. Note volume ID is not swapped. - db_datastore + let volume_replace_region_result = db_datastore .volume_replace_region( /* target */ db::datastore::VolumeReplacementParams { @@ -2944,6 +2949,8 @@ mod tests { .await .unwrap(); + assert_eq!(volume_replace_region_result, VolumeReplaceResult::Done); + let vcr: VolumeConstructionRequest = serde_json::from_str( db_datastore.volume_get(volume_id).await.unwrap().unwrap().data(), ) @@ -3074,7 +3081,7 @@ mod tests { // Do the replacement - db_datastore + let volume_replace_snapshot_result = db_datastore .volume_replace_snapshot( VolumeWithTarget(volume_id), ExistingTarget("[fd00:1122:3344:104::1]:400".parse().unwrap()), @@ -3086,6 +3093,8 @@ mod tests { .await .unwrap(); + assert_eq!(volume_replace_snapshot_result, VolumeReplaceResult::Done,); + // Ensure the shape of the resulting VCRs let vcr: VolumeConstructionRequest = serde_json::from_str( @@ -3190,7 +3199,7 @@ mod tests { // Now undo the replacement. Note volume ID is not swapped. - db_datastore + let volume_replace_snapshot_result = db_datastore .volume_replace_snapshot( VolumeWithTarget(volume_id), ExistingTarget("[fd55:1122:3344:101::1]:111".parse().unwrap()), @@ -3202,6 +3211,8 @@ mod tests { .await .unwrap(); + assert_eq!(volume_replace_snapshot_result, VolumeReplaceResult::Done,); + let vcr: VolumeConstructionRequest = serde_json::from_str( db_datastore.volume_get(volume_id).await.unwrap().unwrap().data(), ) diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs index 134995d848..c83511c2a8 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs @@ -167,6 +167,7 @@ mod test { use nexus_db_model::RegionSnapshotReplacement; use nexus_db_model::RegionSnapshotReplacementStep; use nexus_db_model::RegionSnapshotReplacementStepState; + use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; use nexus_test_utils_macros::nexus_test; use uuid::Uuid; @@ -277,15 +278,26 @@ mod test { step_2.operating_saga_id = Some(operating_saga_id); let step_2_id = step_2.id; - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step_1) .await .unwrap(); - datastore + + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step_2) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + // Activate the task, it should do nothing yet let result: RegionSnapshotReplacementFinishStatus = diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs index cd13a56642..9123196098 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs @@ -32,6 +32,7 @@ use futures::future::BoxFuture; use futures::FutureExt; use nexus_db_model::RegionSnapshotReplacementStep; use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; use nexus_db_queries::db::DataStore; use nexus_types::identity::Asset; use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; @@ -314,15 +315,26 @@ impl RegionSnapshotReplacementFindAffected { ) .await { - Ok(step_request_id) => { - let s = format!("created {step_request_id}"); - info!( - log, - "{s}"; - "request id" => ?request.id, - "volume id" => ?volume.id(), - ); - status.step_records_created_ok.push(s); + Ok(insertion_result) => match insertion_result { + InsertRegionSnapshotReplacementStepResult::Inserted { step_id } => { + let s = format!("created {step_id}"); + info!( + log, + "{s}"; + "request id" => ?request.id, + "volume id" => ?volume.id(), + ); + status.step_records_created_ok.push(s); + } + + InsertRegionSnapshotReplacementStepResult::AlreadyHandled { .. } => { + info!( + log, + "step already exists for volume id"; + "request id" => ?request.id, + "volume id" => ?volume.id(), + ); + } } Err(e) => { @@ -695,7 +707,7 @@ mod test { // Now, add some Complete records and make sure the garbage collection // saga is invoked. - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, { let mut record = RegionSnapshotReplacementStep::new( Uuid::new_v4(), @@ -711,7 +723,12 @@ mod test { .await .unwrap(); - datastore + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + + let result = datastore .insert_region_snapshot_replacement_step(&opctx, { let mut record = RegionSnapshotReplacementStep::new( Uuid::new_v4(), @@ -727,6 +744,11 @@ mod test { .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + // Activate the task - it should pick the complete steps up and try to // run the region snapshot replacement step garbage collect saga diff --git a/nexus/src/app/sagas/region_replacement_start.rs b/nexus/src/app/sagas/region_replacement_start.rs index 1bc1491468..5b51ee77a6 100644 --- a/nexus/src/app/sagas/region_replacement_start.rs +++ b/nexus/src/app/sagas/region_replacement_start.rs @@ -49,6 +49,7 @@ use super::{ ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; +use crate::app::db::datastore::VolumeReplaceResult; use crate::app::sagas::common_storage::find_only_new_region; use crate::app::sagas::declare_saga_actions; use crate::app::RegionAllocationStrategy; @@ -535,7 +536,7 @@ async fn srrs_replace_region_in_volume( // `volume_replace_region` will swap the old region for the new region, // assigning the old region to the new volume id for later (attempted) // deletion. After this is done, repair or reconciliation needs to occur. - osagactx + let volume_replace_region_result = osagactx .datastore() .volume_replace_region( /* target */ @@ -554,7 +555,30 @@ async fn srrs_replace_region_in_volume( .await .map_err(ActionError::action_failed)?; - Ok(()) + info!(log, "replacement returned {:?}", volume_replace_region_result); + + match volume_replace_region_result { + VolumeReplaceResult::AlreadyHappened | VolumeReplaceResult::Done => { + // The replacement was done either by this run of this saga node, or + // a previous one (and this is a rerun). This can only be returned + // if the transaction occurred on the non-deleted volume so proceed + // with the rest of the saga (to properly clean up allocated + // resources). + + Ok(()) + } + + VolumeReplaceResult::ExistingVolumeDeleted => { + // Unwind the saga here to clean up the resources allocated during + // this saga. The associated background task will transition this + // request's state to Completed. + + Err(ActionError::action_failed(format!( + "existing volume {} deleted", + old_volume_id + ))) + } + } } async fn srrs_replace_region_in_volume_undo( @@ -610,7 +634,12 @@ async fn srrs_replace_region_in_volume_undo( // Note: volume ID is not swapped! The fake volume hasn't been created yet, // and we have to target the original volume id. - osagactx + // + // It's ok if this function returns ExistingVolumeDeleted here: we don't + // want to throw an error and cause the saga to be stuck unwinding, as this + // would hold the lock on the replacement request. + + let volume_replace_region_result = osagactx .datastore() .volume_replace_region( /* target */ @@ -628,6 +657,12 @@ async fn srrs_replace_region_in_volume_undo( ) .await?; + info!( + log, + "undo: volume_replace_region returned {:?}", + volume_replace_region_result, + ); + Ok(()) } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index 941899d862..6d56732388 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -55,6 +55,7 @@ use crate::app::db::datastore::ExistingTarget; use crate::app::db::datastore::RegionAllocationFor; use crate::app::db::datastore::RegionAllocationParameters; use crate::app::db::datastore::ReplacementTarget; +use crate::app::db::datastore::VolumeReplaceResult; use crate::app::db::datastore::VolumeToDelete; use crate::app::db::datastore::VolumeWithTarget; use crate::app::db::lookup::LookupPath; @@ -650,7 +651,7 @@ async fn rsrss_replace_snapshot_in_volume( // `volume_replace_snapshot` will swap the old snapshot for the new region. // No repair or reconcilation needs to occur after this. - osagactx + let volume_replace_snapshot_result = osagactx .datastore() .volume_replace_snapshot( VolumeWithTarget(replacement_params.old_volume_id), @@ -661,7 +662,27 @@ async fn rsrss_replace_snapshot_in_volume( .await .map_err(ActionError::action_failed)?; - Ok(()) + match volume_replace_snapshot_result { + VolumeReplaceResult::AlreadyHappened | VolumeReplaceResult::Done => { + // The replacement was done either by this run of this saga node, or + // a previous one (and this is a rerun). This can only be returned + // if the transaction occurred on the non-deleted volume so proceed + // with the rest of the saga. + + Ok(()) + } + + VolumeReplaceResult::ExistingVolumeDeleted => { + // Unwind the saga here to clean up the resources allocated during + // this saga. The associated background task will transition this + // request's state to Completed. + + Err(ActionError::action_failed(format!( + "existing volume {} deleted", + replacement_params.old_volume_id + ))) + } + } } async fn rsrss_replace_snapshot_in_volume_undo( @@ -687,7 +708,13 @@ async fn rsrss_replace_snapshot_in_volume_undo( replacement_params.old_volume_id, ); - osagactx + // Note only the ExistingTarget and ReplacementTarget arguments are swapped + // here! + // + // It's ok if this function returns ExistingVolumeDeleted here: we don't + // want to throw an error and cause the saga to be stuck unwinding, as this + // would hold the lock on the replacement request. + let volume_replace_snapshot_result = osagactx .datastore() .volume_replace_snapshot( VolumeWithTarget(replacement_params.old_volume_id), @@ -697,6 +724,12 @@ async fn rsrss_replace_snapshot_in_volume_undo( ) .await?; + info!( + log, + "undo: volume_replace_snapshot returned {:?}", + volume_replace_snapshot_result, + ); + Ok(()) } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step.rs b/nexus/src/app/sagas/region_snapshot_replacement_step.rs index 507904736a..bf2b91a616 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_step.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_step.rs @@ -49,6 +49,7 @@ use super::{ }; use crate::app::db::datastore::ExistingTarget; use crate::app::db::datastore::ReplacementTarget; +use crate::app::db::datastore::VolumeReplaceResult; use crate::app::db::datastore::VolumeToDelete; use crate::app::db::datastore::VolumeWithTarget; use crate::app::db::lookup::LookupPath; @@ -90,7 +91,7 @@ declare_saga_actions! { + rssrs_create_fake_volume - rssrs_create_fake_volume_undo } - REPLACE_SNAPSHOT_IN_VOLUME -> "unused_3" { + REPLACE_SNAPSHOT_IN_VOLUME -> "volume_replace_snapshot_result" { + rsrss_replace_snapshot_in_volume - rsrss_replace_snapshot_in_volume_undo } @@ -340,9 +341,10 @@ async fn rssrs_create_fake_volume_undo( async fn rsrss_replace_snapshot_in_volume( sagactx: NexusActionContext, -) -> Result<(), ActionError> { +) -> Result { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let replace_params = sagactx.lookup::("replace_params")?; @@ -350,7 +352,8 @@ async fn rsrss_replace_snapshot_in_volume( // `volume_replace_snapshot` will swap the old snapshot for the new region. // No repair or reconcilation needs to occur after this. - osagactx + + let volume_replace_snapshot_result = osagactx .datastore() .volume_replace_snapshot( VolumeWithTarget(params.request.volume_id), @@ -361,7 +364,23 @@ async fn rsrss_replace_snapshot_in_volume( .await .map_err(ActionError::action_failed)?; - Ok(()) + info!( + &log, + "volume_replace_snapshot returned {:?}", volume_replace_snapshot_result, + ); + + match volume_replace_snapshot_result { + VolumeReplaceResult::AlreadyHappened | VolumeReplaceResult::Done => { + // This transaction occurred on the non-deleted volume, so proceed + // with the saga. + } + + VolumeReplaceResult::ExistingVolumeDeleted => { + // Proceed with the saga but skip the notification step. + } + } + + Ok(volume_replace_snapshot_result) } async fn rsrss_replace_snapshot_in_volume_undo( @@ -369,12 +388,16 @@ async fn rsrss_replace_snapshot_in_volume_undo( ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let replace_params = sagactx.lookup::("replace_params")?; let new_volume_id = sagactx.lookup::("new_volume_id")?; - osagactx + // It's ok if this function returned ExistingVolumeDeleted, don't cause the + // saga to get stuck unwinding! + + let volume_replace_snapshot_result = osagactx .datastore() .volume_replace_snapshot( VolumeWithTarget(params.request.volume_id), @@ -384,6 +407,12 @@ async fn rsrss_replace_snapshot_in_volume_undo( ) .await?; + info!( + &log, + "undo: volume_replace_snapshot returned {:?}", + volume_replace_snapshot_result, + ); + Ok(()) } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs index 93335b6125..2d0317b1f4 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs @@ -129,6 +129,7 @@ pub(crate) mod test { use nexus_db_model::Volume; use nexus_db_queries::authn::saga::Serialized; use nexus_db_queries::context::OpContext; + use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; use nexus_test_utils_macros::nexus_test; use sled_agent_client::types::CrucibleOpts; use sled_agent_client::types::VolumeConstructionRequest; @@ -192,11 +193,16 @@ pub(crate) mod test { RegionSnapshotReplacementStepState::Complete; request.old_snapshot_volume_id = Some(old_snapshot_volume_id); - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, request.clone()) .await .unwrap(); + assert!(matches!( + result, + InsertRegionSnapshotReplacementStepResult::Inserted { .. } + )); + // Run the saga let params = Params { serialized_authn: Serialized::for_opctx(&opctx), From 2f6a559b319f48a6e623cb74f4a26215dc935260 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 19 Sep 2024 07:52:19 +0000 Subject: [PATCH 2/3] fixup unique_region_snapshot_replacement_step_per_volume --- .../datastore/region_snapshot_replacement.rs | 55 +++++++++++++++---- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs index 234ee783e4..ff9e27829b 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs @@ -667,7 +667,7 @@ impl DataStore { } // Skip inserting this record if we found an existing region - // snapshot replacement step for it + // snapshot replacement step for it in a non-complete state. let maybe_record = dsl::region_snapshot_replacement_step .filter(dsl::volume_id.eq(request.volume_id)) @@ -676,9 +676,22 @@ impl DataStore { .optional()?; if let Some(found_record) = maybe_record { - return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { - existing_step_id: found_record.id, - }); + match found_record.replacement_state { + RegionSnapshotReplacementStepState::Complete | + RegionSnapshotReplacementStepState::VolumeDeleted => { + // Ok, we can insert another record with a matching + // volume ID because the volume_repair record would + // have been deleted during the transition to + // Complete. + } + + RegionSnapshotReplacementStepState::Requested | + RegionSnapshotReplacementStepState::Running => { + return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id: found_record.id, + }); + } + } } // The region snapshot replacement step saga could invoke a @@ -1330,10 +1343,18 @@ mod test { let step = RegionSnapshotReplacementStep::new(Uuid::new_v4(), volume_id); - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step.clone()) .await - .unwrap_err(); + .unwrap(); + + let InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id, + } = result + else { + panic!("wrong return type"); + }; + assert_eq!(existing_step_id, first_request_id); // Ensure that transitioning the first step to running doesn't change // things. @@ -1349,10 +1370,18 @@ mod test { .await .unwrap(); - datastore + let result = datastore .insert_region_snapshot_replacement_step(&opctx, step.clone()) .await - .unwrap_err(); + .unwrap(); + + let InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + existing_step_id, + } = result + else { + panic!("wrong return type"); + }; + assert_eq!(existing_step_id, first_request_id); // Ensure that transitioning the first step to complete means another // can be added. @@ -1372,10 +1401,12 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + let InsertRegionSnapshotReplacementStepResult::Inserted { step_id } = + result + else { + panic!("wrong return type"); + }; + assert_eq!(step_id, step.id); // Ensure that transitioning the first step to volume deleted still // works. From d2135f21b6398a678679121cd9fe2c9230f76fbe Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Sat, 21 Sep 2024 00:08:36 +0000 Subject: [PATCH 3/3] responding to review comments --- nexus/db-queries/src/db/datastore/mod.rs | 3 +- .../datastore/region_snapshot_replacement.rs | 77 ++++++------------- .../region_snapshot_replacement_finish.rs | 6 +- .../tasks/region_snapshot_replacement_step.rs | 10 +-- .../src/app/sagas/region_replacement_start.rs | 6 +- .../sagas/region_snapshot_replacement_step.rs | 2 +- ...apshot_replacement_step_garbage_collect.rs | 4 +- 7 files changed, 40 insertions(+), 68 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index fbbaddcefc..ec317c184f 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -84,7 +84,7 @@ mod rack; mod region; mod region_replacement; mod region_snapshot; -mod region_snapshot_replacement; +pub mod region_snapshot_replacement; mod role; mod saga; mod silo; @@ -120,7 +120,6 @@ pub use rack::RackInit; pub use rack::SledUnderlayAllocationResult; pub use region::RegionAllocationFor; pub use region::RegionAllocationParameters; -pub use region_snapshot_replacement::InsertRegionSnapshotReplacementStepResult; pub use silo::Discoverability; pub use sled::SledTransition; pub use sled::TransitionError; diff --git a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs index ff9e27829b..25751e3920 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs @@ -30,8 +30,8 @@ use omicron_common::api::external::Error; use uuid::Uuid; #[must_use] -#[derive(Debug, PartialEq)] -pub enum InsertRegionSnapshotReplacementStepResult { +#[derive(Debug, PartialEq, Eq)] +pub enum InsertStepResult { /// A new region snapshot replacement step was inserted. Inserted { step_id: Uuid }, @@ -624,7 +624,7 @@ impl DataStore { opctx: &OpContext, request_id: Uuid, volume_id: Uuid, - ) -> Result { + ) -> Result { let request = RegionSnapshotReplacementStep::new(request_id, volume_id); self.insert_region_snapshot_replacement_step(opctx, request).await @@ -634,7 +634,7 @@ impl DataStore { &self, opctx: &OpContext, request: RegionSnapshotReplacementStep, - ) -> Result { + ) -> Result { let conn = self.pool_connection_authorized(opctx).await?; self.transaction_retry_wrapper( @@ -661,7 +661,7 @@ impl DataStore { .optional()?; if let Some(found_record) = maybe_record { - return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + return Ok(InsertStepResult::AlreadyHandled { existing_step_id: found_record.id, }); } @@ -677,17 +677,17 @@ impl DataStore { if let Some(found_record) = maybe_record { match found_record.replacement_state { - RegionSnapshotReplacementStepState::Complete | - RegionSnapshotReplacementStepState::VolumeDeleted => { + RegionSnapshotReplacementStepState::Complete + | RegionSnapshotReplacementStepState::VolumeDeleted => { // Ok, we can insert another record with a matching // volume ID because the volume_repair record would // have been deleted during the transition to // Complete. } - RegionSnapshotReplacementStepState::Requested | - RegionSnapshotReplacementStepState::Running => { - return Ok(InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + RegionSnapshotReplacementStepState::Requested + | RegionSnapshotReplacementStepState::Running => { + return Ok(InsertStepResult::AlreadyHandled { existing_step_id: found_record.id, }); } @@ -713,7 +713,7 @@ impl DataStore { .execute_async(&conn) .await?; - Ok(InsertRegionSnapshotReplacementStepResult::Inserted { step_id: request_id }) + Ok(InsertStepResult::Inserted { step_id: request_id }) } }) .await @@ -1205,10 +1205,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); } assert_eq!( @@ -1244,10 +1241,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); } assert_eq!( @@ -1284,10 +1278,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); } assert_eq!( @@ -1335,10 +1326,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); let step = RegionSnapshotReplacementStep::new(Uuid::new_v4(), volume_id); @@ -1348,11 +1336,9 @@ mod test { .await .unwrap(); - let InsertRegionSnapshotReplacementStepResult::AlreadyHandled { - existing_step_id, - } = result + let InsertStepResult::AlreadyHandled { existing_step_id } = result else { - panic!("wrong return type"); + panic!("wrong return type: {result:?}"); }; assert_eq!(existing_step_id, first_request_id); @@ -1375,11 +1361,9 @@ mod test { .await .unwrap(); - let InsertRegionSnapshotReplacementStepResult::AlreadyHandled { - existing_step_id, - } = result + let InsertStepResult::AlreadyHandled { existing_step_id } = result else { - panic!("wrong return type"); + panic!("wrong return type: {result:?}"); }; assert_eq!(existing_step_id, first_request_id); @@ -1401,10 +1385,8 @@ mod test { .await .unwrap(); - let InsertRegionSnapshotReplacementStepResult::Inserted { step_id } = - result - else { - panic!("wrong return type"); + let InsertStepResult::Inserted { step_id } = result else { + panic!("wrong return type: {result:?}"); }; assert_eq!(step_id, step.id); @@ -1464,10 +1446,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); let mut step = RegionSnapshotReplacementStep::new(request_id, Uuid::new_v4()); @@ -1478,10 +1457,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); assert_eq!( 2, @@ -1525,10 +1501,7 @@ mod test { .await .unwrap(); - assert!(matches!( - result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } - )); + assert!(matches!(result, InsertStepResult::Inserted { .. })); let step = RegionSnapshotReplacementStep::new( request_id, @@ -1542,7 +1515,7 @@ mod test { assert_eq!( result, - InsertRegionSnapshotReplacementStepResult::AlreadyHandled { + InsertStepResult::AlreadyHandled { existing_step_id: first_step_id } ); diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs index c83511c2a8..caa2fa7bed 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_finish.rs @@ -167,7 +167,7 @@ mod test { use nexus_db_model::RegionSnapshotReplacement; use nexus_db_model::RegionSnapshotReplacementStep; use nexus_db_model::RegionSnapshotReplacementStepState; - use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; + use nexus_db_queries::db::datastore::region_snapshot_replacement; use nexus_test_utils_macros::nexus_test; use uuid::Uuid; @@ -285,7 +285,7 @@ mod test { assert!(matches!( result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } + region_snapshot_replacement::InsertStepResult::Inserted { .. } )); let result = datastore @@ -295,7 +295,7 @@ mod test { assert!(matches!( result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } + region_snapshot_replacement::InsertStepResult::Inserted { .. } )); // Activate the task, it should do nothing yet diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs index 9123196098..3dbb306754 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs @@ -32,7 +32,7 @@ use futures::future::BoxFuture; use futures::FutureExt; use nexus_db_model::RegionSnapshotReplacementStep; use nexus_db_queries::context::OpContext; -use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; +use nexus_db_queries::db::datastore::region_snapshot_replacement; use nexus_db_queries::db::DataStore; use nexus_types::identity::Asset; use nexus_types::internal_api::background::RegionSnapshotReplacementStepStatus; @@ -316,7 +316,7 @@ impl RegionSnapshotReplacementFindAffected { .await { Ok(insertion_result) => match insertion_result { - InsertRegionSnapshotReplacementStepResult::Inserted { step_id } => { + region_snapshot_replacement::InsertStepResult::Inserted { step_id } => { let s = format!("created {step_id}"); info!( log, @@ -327,7 +327,7 @@ impl RegionSnapshotReplacementFindAffected { status.step_records_created_ok.push(s); } - InsertRegionSnapshotReplacementStepResult::AlreadyHandled { .. } => { + region_snapshot_replacement::InsertStepResult::AlreadyHandled { .. } => { info!( log, "step already exists for volume id"; @@ -725,7 +725,7 @@ mod test { assert!(matches!( result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } + region_snapshot_replacement::InsertStepResult::Inserted { .. } )); let result = datastore @@ -746,7 +746,7 @@ mod test { assert!(matches!( result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } + region_snapshot_replacement::InsertStepResult::Inserted { .. } )); // Activate the task - it should pick the complete steps up and try to diff --git a/nexus/src/app/sagas/region_replacement_start.rs b/nexus/src/app/sagas/region_replacement_start.rs index 5b51ee77a6..a71a7498ac 100644 --- a/nexus/src/app/sagas/region_replacement_start.rs +++ b/nexus/src/app/sagas/region_replacement_start.rs @@ -555,7 +555,7 @@ async fn srrs_replace_region_in_volume( .await .map_err(ActionError::action_failed)?; - info!(log, "replacement returned {:?}", volume_replace_region_result); + debug!(log, "replacement returned {:?}", volume_replace_region_result); match volume_replace_region_result { VolumeReplaceResult::AlreadyHappened | VolumeReplaceResult::Done => { @@ -573,10 +573,10 @@ async fn srrs_replace_region_in_volume( // this saga. The associated background task will transition this // request's state to Completed. - Err(ActionError::action_failed(format!( + Err(ActionError::action_failed(Error::conflict(format!( "existing volume {} deleted", old_volume_id - ))) + )))) } } } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step.rs b/nexus/src/app/sagas/region_snapshot_replacement_step.rs index bf2b91a616..66d9426cdd 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_step.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_step.rs @@ -364,7 +364,7 @@ async fn rsrss_replace_snapshot_in_volume( .await .map_err(ActionError::action_failed)?; - info!( + debug!( &log, "volume_replace_snapshot returned {:?}", volume_replace_snapshot_result, ); diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs index 2d0317b1f4..dedfdb213e 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_step_garbage_collect.rs @@ -129,7 +129,7 @@ pub(crate) mod test { use nexus_db_model::Volume; use nexus_db_queries::authn::saga::Serialized; use nexus_db_queries::context::OpContext; - use nexus_db_queries::db::datastore::InsertRegionSnapshotReplacementStepResult; + use nexus_db_queries::db::datastore::region_snapshot_replacement; use nexus_test_utils_macros::nexus_test; use sled_agent_client::types::CrucibleOpts; use sled_agent_client::types::VolumeConstructionRequest; @@ -200,7 +200,7 @@ pub(crate) mod test { assert!(matches!( result, - InsertRegionSnapshotReplacementStepResult::Inserted { .. } + region_snapshot_replacement::InsertStepResult::Inserted { .. } )); // Run the saga