Skip to content

Commit

Permalink
Pick a non-expunged clone source
Browse files Browse the repository at this point in the history
When performing region snapshot replacement, the associated start saga
chose the request's region snapshot as the clone source, but if that
region snapshot was backed by an expunged dataset then it may be gone.

This commit adds logic to choose another clone source, either another
region snapshot from the same snapshot, or one of the read-only regions
for that snapshot.

Basic sanity tests were added for ensuring that region replacements and
region snapshot replacements resulting from expungement can occur. It
was an oversight not to originally include these! Rn order to support
these new sanity tests, the simulated pantry has to fake activating
volumes in the background. This commit also refactors the simulated
Pantry to have one Mutex around an "inner" struct instead of many
Mutexes.

Fixes #7209
  • Loading branch information
jmpesp committed Dec 19, 2024
1 parent 09b150f commit a0199f5
Show file tree
Hide file tree
Showing 5 changed files with 699 additions and 74 deletions.
36 changes: 36 additions & 0 deletions nexus/db-queries/src/db/datastore/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,42 @@ impl DataStore {

Ok(records)
}

/// Find regions not on expunged disks that match a volume id
pub async fn find_non_expunged_regions(
&self,
opctx: &OpContext,
volume_id: Uuid,
) -> LookupResult<Vec<Region>> {
let conn = self.pool_connection_authorized(opctx).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::physical_disk::dsl as physical_disk_dsl;
use db::schema::region::dsl as region_dsl;
use db::schema::zpool::dsl as zpool_dsl;

region_dsl::region
.filter(region_dsl::dataset_id.eq_any(
dataset_dsl::dataset
.filter(dataset_dsl::time_deleted.is_null())
.filter(dataset_dsl::pool_id.eq_any(
zpool_dsl::zpool
.filter(zpool_dsl::time_deleted.is_null())
.filter(zpool_dsl::physical_disk_id.eq_any(
physical_disk_dsl::physical_disk
.filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService))
.select(physical_disk_dsl::id)
))
.select(zpool_dsl::id)
))
.select(dataset_dsl::id)
))
.filter(region_dsl::volume_id.eq(volume_id))
.select(Region::as_select())
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}
}

#[cfg(test)]
Expand Down
36 changes: 36 additions & 0 deletions nexus/db-queries/src/db/datastore/region_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,40 @@ impl DataStore {
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Find region snapshots not on expunged disks that match a snapshot id
pub async fn find_non_expunged_region_snapshots(
&self,
opctx: &OpContext,
snapshot_id: Uuid,
) -> LookupResult<Vec<RegionSnapshot>> {
let conn = self.pool_connection_authorized(opctx).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::physical_disk::dsl as physical_disk_dsl;
use db::schema::region_snapshot::dsl as region_snapshot_dsl;
use db::schema::zpool::dsl as zpool_dsl;

region_snapshot_dsl::region_snapshot
.filter(region_snapshot_dsl::dataset_id.eq_any(
dataset_dsl::dataset
.filter(dataset_dsl::time_deleted.is_null())
.filter(dataset_dsl::pool_id.eq_any(
zpool_dsl::zpool
.filter(zpool_dsl::time_deleted.is_null())
.filter(zpool_dsl::physical_disk_id.eq_any(
physical_disk_dsl::physical_disk
.filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService))
.select(physical_disk_dsl::id)
))
.select(zpool_dsl::id)
))
.select(dataset_dsl::id)
))
.filter(region_snapshot_dsl::snapshot_id.eq(snapshot_id))
.select(RegionSnapshot::as_select())
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}
}
223 changes: 194 additions & 29 deletions nexus/src/app/sagas/region_snapshot_replacement_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::app::{authn, db};
use nexus_types::identity::Asset;
use nexus_types::identity::Resource;
use omicron_common::api::external::Error;
use omicron_uuid_kinds::DatasetUuid;
use serde::Deserialize;
use serde::Serialize;
use sled_agent_client::types::CrucibleOpts;
Expand All @@ -91,6 +92,9 @@ declare_saga_actions! {
+ rsrss_set_saga_id
- rsrss_set_saga_id_undo
}
GET_CLONE_SOURCE -> "clone_source" {
+ rsrss_get_clone_source
}
GET_ALLOC_REGION_PARAMS -> "alloc_region_params" {
+ rsrss_get_alloc_region_params
}
Expand Down Expand Up @@ -194,6 +198,7 @@ impl NexusSaga for SagaRegionSnapshotReplacementStart {
));

builder.append(set_saga_id_action());
builder.append(get_clone_source_action());
builder.append(get_alloc_region_params_action());
builder.append(alloc_new_region_action());
builder.append(find_new_region_action());
Expand Down Expand Up @@ -265,6 +270,145 @@ async fn rsrss_set_saga_id_undo(
Ok(())
}

#[derive(Debug, Deserialize, Serialize)]
enum CloneSource {
RegionSnapshot { dataset_id: DatasetUuid, region_id: Uuid },
Region { region_id: Uuid },
}

async fn rsrss_get_clone_source(
sagactx: NexusActionContext,
) -> Result<CloneSource, ActionError> {
let params = sagactx.saga_params::<Params>()?;
let osagactx = sagactx.user_data();
let log = osagactx.log();

// If the downstairs we're cloning from is on an expunged dataset, the clone
// will never succeed. Find either a region snapshot or a read-only region
// that is associated with the request snapshot that has not been expunged.
//
// Importantly, determine the clone source before new region alloc step in
// this saga, otherwise the query that searches for read-only region
// candidates will match the newly allocated region (that is not created
// yet!).

let request_dataset_on_in_service_physical_disk = osagactx
.datastore()
.dataset_physical_disk_in_service(params.request.old_dataset_id.into())
.await
.map_err(ActionError::action_failed)?;

let clone_source = if request_dataset_on_in_service_physical_disk {
// If the request region snapshot's dataset has not been expunged,
// it can be used
CloneSource::RegionSnapshot {
dataset_id: params.request.old_dataset_id.into(),
region_id: params.request.old_region_id,
}
} else {
info!(
log,
"request region snapshot dataset expunged, finding another";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %params.request.old_dataset_id,
);

// Select another region snapshot that's part of this snapshot - they
// will all have identical contents.

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

let mut non_expunged_region_snapshots = osagactx
.datastore()
.find_non_expunged_region_snapshots(
&opctx,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;

match non_expunged_region_snapshots.pop() {
Some(candidate) => {
info!(
log,
"found another non-expunged region snapshot";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %candidate.dataset_id,
"region_id" => %candidate.region_id,
);

CloneSource::RegionSnapshot {
dataset_id: candidate.dataset_id.into(),
region_id: candidate.region_id,
}
}

None => {
// If a candidate region snapshot was not found, look for the
// snapshot's read-only regions.

info!(
log,
"no region snapshot clone source candidates";
"snapshot_id" => %params.request.old_snapshot_id,
);

// Look up the existing snapshot
let maybe_db_snapshot = osagactx
.datastore()
.snapshot_get(&opctx, params.request.old_snapshot_id)
.await
.map_err(ActionError::action_failed)?;

let Some(db_snapshot) = maybe_db_snapshot else {
return Err(ActionError::action_failed(
Error::internal_error(&format!(
"snapshot {} was hard deleted!",
params.request.old_snapshot_id
)),
));
};

let mut non_expunged_read_only_regions = osagactx
.datastore()
.find_non_expunged_regions(&opctx, db_snapshot.volume_id)
.await
.map_err(ActionError::action_failed)?;

match non_expunged_read_only_regions.pop() {
Some(candidate) => {
info!(
log,
"found region clone source candidate";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %candidate.dataset_id(),
"region_id" => %candidate.id(),
);

CloneSource::Region { region_id: candidate.id() }
}

None => {
// If all targets of a Volume::Region are on expunged
// datasets, then the user's data is gone, and this code
// will fail to select a clone source.

return Err(ActionError::action_failed(format!(
"no clone source candidate for {}!",
params.request.old_snapshot_id,
)));
}
}
}
}
};

Ok(clone_source)
}

#[derive(Debug, Deserialize, Serialize)]
struct AllocRegionParams {
block_size: u64,
Expand Down Expand Up @@ -445,46 +589,67 @@ async fn rsrss_new_region_ensure(
"new_dataset_and_region",
)?;

let region_snapshot = osagactx
.datastore()
.region_snapshot_get(
params.request.old_dataset_id.into(),
params.request.old_region_id,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;
let clone_source = sagactx.lookup::<CloneSource>("clone_source")?;

let mut source_repair_addr: SocketAddrV6 = match clone_source {
CloneSource::RegionSnapshot { dataset_id, region_id } => {
let region_snapshot = osagactx
.datastore()
.region_snapshot_get(
dataset_id,
region_id,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;

let Some(region_snapshot) = region_snapshot else {
return Err(ActionError::action_failed(format!(
"region snapshot {} {} {} deleted!",
params.request.old_dataset_id,
params.request.old_region_id,
params.request.old_snapshot_id,
)));
};
let Some(region_snapshot) = region_snapshot else {
return Err(ActionError::action_failed(format!(
"region snapshot {} {} {} deleted!",
dataset_id, region_id, params.request.old_snapshot_id,
)));
};

let (new_dataset, new_region) = new_dataset_and_region;
match region_snapshot.snapshot_addr.parse() {
Ok(addr) => addr,

// Currently, the repair port is set using a fixed offset above the
// downstairs port. Once this goes away, Nexus will require a way to query
// for the repair port!
Err(e) => {
return Err(ActionError::action_failed(format!(
"error parsing region_snapshot.snapshot_addr: {e}"
)));
}
}
}

let mut source_repair_addr: SocketAddrV6 =
match region_snapshot.snapshot_addr.parse() {
Ok(addr) => addr,
CloneSource::Region { region_id } => {
let maybe_addr = osagactx
.datastore()
.region_addr(region_id)
.await
.map_err(ActionError::action_failed)?;

Err(e) => {
return Err(ActionError::action_failed(format!(
"error parsing region_snapshot.snapshot_addr: {e}"
)));
match maybe_addr {
Some(addr) => addr,

None => {
return Err(ActionError::action_failed(format!(
"region clone source {region_id} has no port!"
)));
}
}
};
}
};

// Currently, the repair port is set using a fixed offset above the
// downstairs port. Once this goes away, Nexus will require a way to query
// for the repair port!

source_repair_addr.set_port(
source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET,
);

let (new_dataset, new_region) = new_dataset_and_region;

let ensured_region = osagactx
.nexus()
.ensure_region_in_dataset(
Expand Down
Loading

0 comments on commit a0199f5

Please sign in to comment.