Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick a non-expunged clone source #7283

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions nexus/db-queries/src/db/datastore/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,42 @@ impl DataStore {

Ok(records)
}

/// Find regions not on expunged disks that match a volume id
pub async fn find_non_expunged_regions(
leftwo marked this conversation as resolved.
Show resolved Hide resolved
&self,
opctx: &OpContext,
volume_id: Uuid,
) -> LookupResult<Vec<Region>> {
let conn = self.pool_connection_authorized(opctx).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::physical_disk::dsl as physical_disk_dsl;
use db::schema::region::dsl as region_dsl;
use db::schema::zpool::dsl as zpool_dsl;

region_dsl::region
.filter(region_dsl::dataset_id.eq_any(
dataset_dsl::dataset
.filter(dataset_dsl::time_deleted.is_null())
.filter(dataset_dsl::pool_id.eq_any(
zpool_dsl::zpool
.filter(zpool_dsl::time_deleted.is_null())
.filter(zpool_dsl::physical_disk_id.eq_any(
physical_disk_dsl::physical_disk
.filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService))
.select(physical_disk_dsl::id)
))
.select(zpool_dsl::id)
))
.select(dataset_dsl::id)
))
.filter(region_dsl::volume_id.eq(volume_id))
.select(Region::as_select())
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}
}

#[cfg(test)]
Expand Down
36 changes: 36 additions & 0 deletions nexus/db-queries/src/db/datastore/region_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,40 @@ impl DataStore {
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Find region snapshots not on expunged disks that match a snapshot id
pub async fn find_non_expunged_region_snapshots(
leftwo marked this conversation as resolved.
Show resolved Hide resolved
&self,
opctx: &OpContext,
snapshot_id: Uuid,
) -> LookupResult<Vec<RegionSnapshot>> {
let conn = self.pool_connection_authorized(opctx).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::physical_disk::dsl as physical_disk_dsl;
use db::schema::region_snapshot::dsl as region_snapshot_dsl;
use db::schema::zpool::dsl as zpool_dsl;

region_snapshot_dsl::region_snapshot
.filter(region_snapshot_dsl::dataset_id.eq_any(
dataset_dsl::dataset
.filter(dataset_dsl::time_deleted.is_null())
.filter(dataset_dsl::pool_id.eq_any(
zpool_dsl::zpool
.filter(zpool_dsl::time_deleted.is_null())
.filter(zpool_dsl::physical_disk_id.eq_any(
physical_disk_dsl::physical_disk
.filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService))
.select(physical_disk_dsl::id)
))
.select(zpool_dsl::id)
))
.select(dataset_dsl::id)
))
.filter(region_snapshot_dsl::snapshot_id.eq(snapshot_id))
.select(RegionSnapshot::as_select())
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}
}
223 changes: 194 additions & 29 deletions nexus/src/app/sagas/region_snapshot_replacement_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::app::{authn, db};
use nexus_types::identity::Asset;
use nexus_types::identity::Resource;
use omicron_common::api::external::Error;
use omicron_uuid_kinds::DatasetUuid;
use serde::Deserialize;
use serde::Serialize;
use sled_agent_client::types::CrucibleOpts;
Expand All @@ -91,6 +92,9 @@ declare_saga_actions! {
+ rsrss_set_saga_id
- rsrss_set_saga_id_undo
}
GET_CLONE_SOURCE -> "clone_source" {
+ rsrss_get_clone_source
}
GET_ALLOC_REGION_PARAMS -> "alloc_region_params" {
+ rsrss_get_alloc_region_params
}
Expand Down Expand Up @@ -194,6 +198,7 @@ impl NexusSaga for SagaRegionSnapshotReplacementStart {
));

builder.append(set_saga_id_action());
builder.append(get_clone_source_action());
builder.append(get_alloc_region_params_action());
builder.append(alloc_new_region_action());
builder.append(find_new_region_action());
Expand Down Expand Up @@ -265,6 +270,145 @@ async fn rsrss_set_saga_id_undo(
Ok(())
}

#[derive(Debug, Deserialize, Serialize)]
enum CloneSource {
RegionSnapshot { dataset_id: DatasetUuid, region_id: Uuid },
Region { region_id: Uuid },
}

async fn rsrss_get_clone_source(
sagactx: NexusActionContext,
) -> Result<CloneSource, ActionError> {
let params = sagactx.saga_params::<Params>()?;
let osagactx = sagactx.user_data();
let log = osagactx.log();

// If the downstairs we're cloning from is on an expunged dataset, the clone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just update this comment to what we discussed over chat

// will never succeed. Find either a region snapshot or a read-only region
// that is associated with the request snapshot that has not been expunged.
//
// Importantly, determine the clone source before new region alloc step in
// this saga, otherwise the query that searches for read-only region
// candidates will match the newly allocated region (that is not created
// yet!).

let request_dataset_on_in_service_physical_disk = osagactx
.datastore()
.dataset_physical_disk_in_service(params.request.old_dataset_id.into())
.await
.map_err(ActionError::action_failed)?;

let clone_source = if request_dataset_on_in_service_physical_disk {
// If the request region snapshot's dataset has not been expunged,
// it can be used
CloneSource::RegionSnapshot {
dataset_id: params.request.old_dataset_id.into(),
region_id: params.request.old_region_id,
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I'm following correclty, we land here with an expected source (request.old_dataset_id, request.old_region_id`) But we need to verify that it's not expunged yet, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, though this function changed a bit as a result of our call :)

info!(
log,
"request region snapshot dataset expunged, finding another";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %params.request.old_dataset_id,
);

// Select another region snapshot that's part of this snapshot - they
// will all have identical contents.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand where we are correctly here:

We have a snapshot with a failed downstairs we want to replace.
We have chosen another downstairs target to clone from, but we then discovered that source was on an expunged disk?
So now we are going to try and get that third (final) downstairs as a source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly yeah!


let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

let mut non_expunged_region_snapshots = osagactx
.datastore()
.find_non_expunged_region_snapshots(
&opctx,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;

match non_expunged_region_snapshots.pop() {
Some(candidate) => {
info!(
log,
"found another non-expunged region snapshot";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %candidate.dataset_id,
"region_id" => %candidate.region_id,
);

CloneSource::RegionSnapshot {
dataset_id: candidate.dataset_id.into(),
region_id: candidate.region_id,
}
}

None => {
// If a candidate region snapshot was not found, look for the
// snapshot's read-only regions.

info!(
log,
"no region snapshot clone source candidates";
"snapshot_id" => %params.request.old_snapshot_id,
);

// Look up the existing snapshot
let maybe_db_snapshot = osagactx
.datastore()
.snapshot_get(&opctx, params.request.old_snapshot_id)
.await
.map_err(ActionError::action_failed)?;

let Some(db_snapshot) = maybe_db_snapshot else {
return Err(ActionError::action_failed(
Error::internal_error(&format!(
"snapshot {} was hard deleted!",
params.request.old_snapshot_id
)),
));
};

let mut non_expunged_read_only_regions = osagactx
.datastore()
.find_non_expunged_regions(&opctx, db_snapshot.volume_id)
.await
.map_err(ActionError::action_failed)?;

match non_expunged_read_only_regions.pop() {
Some(candidate) => {
info!(
log,
"found region clone source candidate";
"snapshot_id" => %params.request.old_snapshot_id,
"dataset_id" => %candidate.dataset_id(),
"region_id" => %candidate.id(),
);

CloneSource::Region { region_id: candidate.id() }
}

None => {
// If all targets of a Volume::Region are on expunged
// datasets, then the user's data is gone, and this code
// will fail to select a clone source.

return Err(ActionError::action_failed(format!(
"no clone source candidate for {}!",
params.request.old_snapshot_id,
)));
}
}
}
}
};

Ok(clone_source)
}

#[derive(Debug, Deserialize, Serialize)]
struct AllocRegionParams {
block_size: u64,
Expand Down Expand Up @@ -445,46 +589,67 @@ async fn rsrss_new_region_ensure(
"new_dataset_and_region",
)?;

let region_snapshot = osagactx
.datastore()
.region_snapshot_get(
params.request.old_dataset_id.into(),
params.request.old_region_id,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;
let clone_source = sagactx.lookup::<CloneSource>("clone_source")?;

let mut source_repair_addr: SocketAddrV6 = match clone_source {
CloneSource::RegionSnapshot { dataset_id, region_id } => {
let region_snapshot = osagactx
.datastore()
.region_snapshot_get(
dataset_id,
region_id,
params.request.old_snapshot_id,
)
.await
.map_err(ActionError::action_failed)?;

let Some(region_snapshot) = region_snapshot else {
return Err(ActionError::action_failed(format!(
"region snapshot {} {} {} deleted!",
params.request.old_dataset_id,
params.request.old_region_id,
params.request.old_snapshot_id,
)));
};
let Some(region_snapshot) = region_snapshot else {
return Err(ActionError::action_failed(format!(
"region snapshot {} {} {} deleted!",
dataset_id, region_id, params.request.old_snapshot_id,
)));
};

let (new_dataset, new_region) = new_dataset_and_region;
match region_snapshot.snapshot_addr.parse() {
Ok(addr) => addr,

// Currently, the repair port is set using a fixed offset above the
// downstairs port. Once this goes away, Nexus will require a way to query
// for the repair port!
Err(e) => {
return Err(ActionError::action_failed(format!(
"error parsing region_snapshot.snapshot_addr: {e}"
)));
}
}
}

let mut source_repair_addr: SocketAddrV6 =
match region_snapshot.snapshot_addr.parse() {
Ok(addr) => addr,
CloneSource::Region { region_id } => {
let maybe_addr = osagactx
.datastore()
.region_addr(region_id)
.await
.map_err(ActionError::action_failed)?;

Err(e) => {
return Err(ActionError::action_failed(format!(
"error parsing region_snapshot.snapshot_addr: {e}"
)));
match maybe_addr {
Some(addr) => addr,

None => {
return Err(ActionError::action_failed(format!(
"region clone source {region_id} has no port!"
)));
}
}
};
}
};

// Currently, the repair port is set using a fixed offset above the
// downstairs port. Once this goes away, Nexus will require a way to query
// for the repair port!

source_repair_addr.set_port(
source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET,
);

let (new_dataset, new_region) = new_dataset_and_region;

let ensured_region = osagactx
.nexus()
.ensure_region_in_dataset(
Expand Down
Loading
Loading