From 94bc3cbb395d7639e7cc697f4f39163ba09e86fc Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 13 Aug 2024 21:26:08 +0000 Subject: [PATCH] pull common code into get_replace_params fn --- .../region_snapshot_replacement_start.rs | 122 +++++++----------- 1 file changed, 45 insertions(+), 77 deletions(-) diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index 84661005b3..1a020193d6 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -62,7 +62,6 @@ use crate::app::sagas::common_storage::find_only_new_region; use crate::app::sagas::declare_saga_actions; use crate::app::RegionAllocationStrategy; use crate::app::{authn, db}; -use anyhow::bail; use nexus_types::identity::Asset; use nexus_types::identity::Resource; use omicron_common::api::external::Error; @@ -551,10 +550,17 @@ async fn rsrss_create_fake_volume_undo( Ok(()) } -async fn rsrss_replace_snapshot_in_volume( - sagactx: NexusActionContext, -) -> Result<(), ActionError> { - let log = sagactx.user_data().log(); +#[derive(Debug)] +struct ReplaceParams { + old_volume_id: Uuid, + old_snapshot_address: SocketAddrV6, + new_region_address: SocketAddrV6, + new_volume_id: Uuid, +} + +async fn get_replace_params( + sagactx: &NexusActionContext, +) -> Result { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; @@ -612,16 +618,32 @@ async fn rsrss_replace_snapshot_in_volume( 0, ); - // If this node is rerun, the forward action will have overwritten - // db_region's volume id, so get the cached copy. let old_volume_id = sagactx.lookup::("old_snapshot_volume_id")?; + // Return the replacement parameters for the forward action case - the undo + // will swap the existing and replacement target + Ok(ReplaceParams { + old_volume_id, + old_snapshot_address, + new_region_address, + new_volume_id, + }) +} + +async fn rsrss_replace_snapshot_in_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + let replacement_params = get_replace_params(&sagactx).await?; + info!( log, "replacing {} with {} in volume {}", - old_snapshot_address, - new_region_address, - old_volume_id, + replacement_params.old_snapshot_address, + replacement_params.new_region_address, + replacement_params.old_volume_id, ); // `volume_replace_snapshot` will swap the old snapshot for the new region. @@ -629,10 +651,10 @@ async fn rsrss_replace_snapshot_in_volume( osagactx .datastore() .volume_replace_snapshot( - VolumeWithTarget(old_volume_id), - ExistingTarget(old_snapshot_address), - ReplacementTarget(new_region_address), - VolumeToDelete(new_volume_id), + VolumeWithTarget(replacement_params.old_volume_id), + ExistingTarget(replacement_params.old_snapshot_address), + ReplacementTarget(replacement_params.new_region_address), + VolumeToDelete(replacement_params.new_volume_id), ) .await .map_err(ActionError::action_failed)?; @@ -644,82 +666,28 @@ async fn rsrss_replace_snapshot_in_volume_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { // Undo the forward action's volume_replace_snapshot call by swapping the - // target_addr fields in the parameters. + // existing target and replacement target parameters. let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); - let params = sagactx.saga_params::()?; - let new_volume_id = sagactx.lookup::("new_volume_id")?; - - let region_snapshot = osagactx - .datastore() - .region_snapshot_get( - params.request.old_dataset_id, - params.request.old_region_id, - params.request.old_snapshot_id, - ) - .await - .map_err(ActionError::action_failed)?; - - let Some(region_snapshot) = region_snapshot else { - bail!( - "region snapshot {} {} {} deleted!", - params.request.old_dataset_id, - params.request.old_region_id, - params.request.old_snapshot_id, - ); - }; - - let old_snapshot_address: SocketAddrV6 = - match region_snapshot.snapshot_addr.parse() { - Ok(addr) => addr, - - Err(e) => { - bail!( - "parsing {} as SocketAddrV6 failed: {e}", - region_snapshot.snapshot_addr, - ); - } - }; - - let (new_dataset, ensured_region) = sagactx.lookup::<( - db::model::Dataset, - crucible_agent_client::types::Region, - )>( - "ensured_dataset_and_region", - )?; - - let Some(new_dataset_address) = new_dataset.address() else { - bail!("dataset {} does not have an address!", new_dataset.id()); - }; - - let new_region_address = SocketAddrV6::new( - *new_dataset_address.ip(), - ensured_region.port_number, - 0, - 0, - ); - - // The forward action will have overwritten db_region's volume id, so get - // the cached copy. - let old_volume_id = sagactx.lookup::("old_snapshot_volume_id")?; + let replacement_params = get_replace_params(&sagactx).await?; info!( log, "undo: replacing {} with {} in volume {}", - old_snapshot_address, - new_region_address, - old_volume_id, + replacement_params.old_snapshot_address, + replacement_params.new_region_address, + replacement_params.old_volume_id, ); osagactx .datastore() .volume_replace_snapshot( - VolumeWithTarget(old_volume_id), - ExistingTarget(new_region_address), - ReplacementTarget(old_snapshot_address), - VolumeToDelete(new_volume_id), + VolumeWithTarget(replacement_params.old_volume_id), + ExistingTarget(replacement_params.new_region_address), + ReplacementTarget(replacement_params.old_snapshot_address), + VolumeToDelete(replacement_params.new_volume_id), ) .await?;