Skip to content

Commit

Permalink
pull common code into get_replace_params fn
Browse files Browse the repository at this point in the history
  • Loading branch information
jmpesp committed Aug 13, 2024
1 parent a2bda34 commit 94bc3cb
Showing 1 changed file with 45 additions and 77 deletions.
122 changes: 45 additions & 77 deletions nexus/src/app/sagas/region_snapshot_replacement_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ use crate::app::sagas::common_storage::find_only_new_region;
use crate::app::sagas::declare_saga_actions;
use crate::app::RegionAllocationStrategy;
use crate::app::{authn, db};
use anyhow::bail;
use nexus_types::identity::Asset;
use nexus_types::identity::Resource;
use omicron_common::api::external::Error;
Expand Down Expand Up @@ -551,10 +550,17 @@ async fn rsrss_create_fake_volume_undo(
Ok(())
}

async fn rsrss_replace_snapshot_in_volume(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let log = sagactx.user_data().log();
#[derive(Debug)]
struct ReplaceParams {
old_volume_id: Uuid,
old_snapshot_address: SocketAddrV6,
new_region_address: SocketAddrV6,
new_volume_id: Uuid,
}

async fn get_replace_params(
sagactx: &NexusActionContext,
) -> Result<ReplaceParams, ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;

Expand Down Expand Up @@ -612,27 +618,43 @@ async fn rsrss_replace_snapshot_in_volume(
0,
);

// If this node is rerun, the forward action will have overwritten
// db_region's volume id, so get the cached copy.
let old_volume_id = sagactx.lookup::<Uuid>("old_snapshot_volume_id")?;

// Return the replacement parameters for the forward action case - the undo
// will swap the existing and replacement target
Ok(ReplaceParams {
old_volume_id,
old_snapshot_address,
new_region_address,
new_volume_id,
})
}

async fn rsrss_replace_snapshot_in_volume(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let log = sagactx.user_data().log();
let osagactx = sagactx.user_data();

let replacement_params = get_replace_params(&sagactx).await?;

info!(
log,
"replacing {} with {} in volume {}",
old_snapshot_address,
new_region_address,
old_volume_id,
replacement_params.old_snapshot_address,
replacement_params.new_region_address,
replacement_params.old_volume_id,
);

// `volume_replace_snapshot` will swap the old snapshot for the new region.
// No repair or reconcilation needs to occur after this.
osagactx
.datastore()
.volume_replace_snapshot(
VolumeWithTarget(old_volume_id),
ExistingTarget(old_snapshot_address),
ReplacementTarget(new_region_address),
VolumeToDelete(new_volume_id),
VolumeWithTarget(replacement_params.old_volume_id),
ExistingTarget(replacement_params.old_snapshot_address),
ReplacementTarget(replacement_params.new_region_address),
VolumeToDelete(replacement_params.new_volume_id),
)
.await
.map_err(ActionError::action_failed)?;
Expand All @@ -644,82 +666,28 @@ async fn rsrss_replace_snapshot_in_volume_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
// Undo the forward action's volume_replace_snapshot call by swapping the
// target_addr fields in the parameters.
// existing target and replacement target parameters.

let log = sagactx.user_data().log();
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;

let new_volume_id = sagactx.lookup::<Uuid>("new_volume_id")?;

let region_snapshot = osagactx
.datastore()
.region_snapshot_get(
params.request.old_dataset_id,
params.request.old_region_id,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;

let Some(region_snapshot) = region_snapshot else {
bail!(
"region snapshot {} {} {} deleted!",
params.request.old_dataset_id,
params.request.old_region_id,
params.request.old_snapshot_id,
);
};

let old_snapshot_address: SocketAddrV6 =
match region_snapshot.snapshot_addr.parse() {
Ok(addr) => addr,

Err(e) => {
bail!(
"parsing {} as SocketAddrV6 failed: {e}",
region_snapshot.snapshot_addr,
);
}
};

let (new_dataset, ensured_region) = sagactx.lookup::<(
db::model::Dataset,
crucible_agent_client::types::Region,
)>(
"ensured_dataset_and_region",
)?;

let Some(new_dataset_address) = new_dataset.address() else {
bail!("dataset {} does not have an address!", new_dataset.id());
};

let new_region_address = SocketAddrV6::new(
*new_dataset_address.ip(),
ensured_region.port_number,
0,
0,
);

// The forward action will have overwritten db_region's volume id, so get
// the cached copy.
let old_volume_id = sagactx.lookup::<Uuid>("old_snapshot_volume_id")?;
let replacement_params = get_replace_params(&sagactx).await?;

info!(
log,
"undo: replacing {} with {} in volume {}",
old_snapshot_address,
new_region_address,
old_volume_id,
replacement_params.old_snapshot_address,
replacement_params.new_region_address,
replacement_params.old_volume_id,
);

osagactx
.datastore()
.volume_replace_snapshot(
VolumeWithTarget(old_volume_id),
ExistingTarget(new_region_address),
ReplacementTarget(old_snapshot_address),
VolumeToDelete(new_volume_id),
VolumeWithTarget(replacement_params.old_volume_id),
ExistingTarget(replacement_params.new_region_address),
ReplacementTarget(replacement_params.old_snapshot_address),
VolumeToDelete(replacement_params.new_volume_id),
)
.await?;

Expand Down

0 comments on commit 94bc3cb

Please sign in to comment.