From a0199f5b7962d3a770c6af31db93b305efb1e703 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 10 Dec 2024 16:50:37 +0000 Subject: [PATCH] Pick a non-expunged clone source When performing region snapshot replacement, the associated start saga chose the request's region snapshot as the clone source, but if that region snapshot was backed by an expunged dataset then it may be gone. This commit adds logic to choose another clone source, either another region snapshot from the same snapshot, or one of the read-only regions for that snapshot. Basic sanity tests were added for ensuring that region replacements and region snapshot replacements resulting from expungement can occur. It was an oversight not to originally include these! Rn order to support these new sanity tests, the simulated pantry has to fake activating volumes in the background. This commit also refactors the simulated Pantry to have one Mutex around an "inner" struct instead of many Mutexes. Fixes #7209 --- nexus/db-queries/src/db/datastore/region.rs | 36 ++ .../src/db/datastore/region_snapshot.rs | 36 ++ .../region_snapshot_replacement_start.rs | 223 ++++++++-- .../crucible_replacements.rs | 381 +++++++++++++++++- sled-agent/src/sim/storage.rs | 97 +++-- 5 files changed, 699 insertions(+), 74 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 8e59462aa3..67bd37cf69 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -548,6 +548,42 @@ impl DataStore { Ok(records) } + + /// Find regions not on expunged disks that match a volume id + pub async fn find_non_expunged_regions( + &self, + opctx: &OpContext, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region::dsl as region_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_dsl::region + .filter(region_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_dsl::volume_id.eq(volume_id)) + .select(Region::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] diff --git a/nexus/db-queries/src/db/datastore/region_snapshot.rs b/nexus/db-queries/src/db/datastore/region_snapshot.rs index 0129869f4f..f7a34fdb52 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot.rs @@ -120,4 +120,40 @@ impl DataStore { .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Find region snapshots not on expunged disks that match a snapshot id + pub async fn find_non_expunged_region_snapshots( + &self, + opctx: &OpContext, + snapshot_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region_snapshot::dsl as region_snapshot_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_snapshot_dsl::region_snapshot + .filter(region_snapshot_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_snapshot_dsl::snapshot_id.eq(snapshot_id)) + .select(RegionSnapshot::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index bb5fd60209..d5e2cd9bd5 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -65,6 +65,7 @@ use crate::app::{authn, db}; use nexus_types::identity::Asset; use nexus_types::identity::Resource; use omicron_common::api::external::Error; +use omicron_uuid_kinds::DatasetUuid; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::CrucibleOpts; @@ -91,6 +92,9 @@ declare_saga_actions! { + rsrss_set_saga_id - rsrss_set_saga_id_undo } + GET_CLONE_SOURCE -> "clone_source" { + + rsrss_get_clone_source + } GET_ALLOC_REGION_PARAMS -> "alloc_region_params" { + rsrss_get_alloc_region_params } @@ -194,6 +198,7 @@ impl NexusSaga for SagaRegionSnapshotReplacementStart { )); builder.append(set_saga_id_action()); + builder.append(get_clone_source_action()); builder.append(get_alloc_region_params_action()); builder.append(alloc_new_region_action()); builder.append(find_new_region_action()); @@ -265,6 +270,145 @@ async fn rsrss_set_saga_id_undo( Ok(()) } +#[derive(Debug, Deserialize, Serialize)] +enum CloneSource { + RegionSnapshot { dataset_id: DatasetUuid, region_id: Uuid }, + Region { region_id: Uuid }, +} + +async fn rsrss_get_clone_source( + sagactx: NexusActionContext, +) -> Result { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + // If the downstairs we're cloning from is on an expunged dataset, the clone + // will never succeed. Find either a region snapshot or a read-only region + // that is associated with the request snapshot that has not been expunged. + // + // Importantly, determine the clone source before new region alloc step in + // this saga, otherwise the query that searches for read-only region + // candidates will match the newly allocated region (that is not created + // yet!). + + let request_dataset_on_in_service_physical_disk = osagactx + .datastore() + .dataset_physical_disk_in_service(params.request.old_dataset_id.into()) + .await + .map_err(ActionError::action_failed)?; + + let clone_source = if request_dataset_on_in_service_physical_disk { + // If the request region snapshot's dataset has not been expunged, + // it can be used + CloneSource::RegionSnapshot { + dataset_id: params.request.old_dataset_id.into(), + region_id: params.request.old_region_id, + } + } else { + info!( + log, + "request region snapshot dataset expunged, finding another"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %params.request.old_dataset_id, + ); + + // Select another region snapshot that's part of this snapshot - they + // will all have identical contents. + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let mut non_expunged_region_snapshots = osagactx + .datastore() + .find_non_expunged_region_snapshots( + &opctx, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + match non_expunged_region_snapshots.pop() { + Some(candidate) => { + info!( + log, + "found another non-expunged region snapshot"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id, + "region_id" => %candidate.region_id, + ); + + CloneSource::RegionSnapshot { + dataset_id: candidate.dataset_id.into(), + region_id: candidate.region_id, + } + } + + None => { + // If a candidate region snapshot was not found, look for the + // snapshot's read-only regions. + + info!( + log, + "no region snapshot clone source candidates"; + "snapshot_id" => %params.request.old_snapshot_id, + ); + + // Look up the existing snapshot + let maybe_db_snapshot = osagactx + .datastore() + .snapshot_get(&opctx, params.request.old_snapshot_id) + .await + .map_err(ActionError::action_failed)?; + + let Some(db_snapshot) = maybe_db_snapshot else { + return Err(ActionError::action_failed( + Error::internal_error(&format!( + "snapshot {} was hard deleted!", + params.request.old_snapshot_id + )), + )); + }; + + let mut non_expunged_read_only_regions = osagactx + .datastore() + .find_non_expunged_regions(&opctx, db_snapshot.volume_id) + .await + .map_err(ActionError::action_failed)?; + + match non_expunged_read_only_regions.pop() { + Some(candidate) => { + info!( + log, + "found region clone source candidate"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id(), + "region_id" => %candidate.id(), + ); + + CloneSource::Region { region_id: candidate.id() } + } + + None => { + // If all targets of a Volume::Region are on expunged + // datasets, then the user's data is gone, and this code + // will fail to select a clone source. + + return Err(ActionError::action_failed(format!( + "no clone source candidate for {}!", + params.request.old_snapshot_id, + ))); + } + } + } + } + }; + + Ok(clone_source) +} + #[derive(Debug, Deserialize, Serialize)] struct AllocRegionParams { block_size: u64, @@ -445,46 +589,67 @@ async fn rsrss_new_region_ensure( "new_dataset_and_region", )?; - let region_snapshot = osagactx - .datastore() - .region_snapshot_get( - params.request.old_dataset_id.into(), - params.request.old_region_id, - params.request.old_snapshot_id, - ) - .await - .map_err(ActionError::action_failed)?; + let clone_source = sagactx.lookup::("clone_source")?; + + let mut source_repair_addr: SocketAddrV6 = match clone_source { + CloneSource::RegionSnapshot { dataset_id, region_id } => { + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + dataset_id, + region_id, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; - let Some(region_snapshot) = region_snapshot else { - return Err(ActionError::action_failed(format!( - "region snapshot {} {} {} deleted!", - params.request.old_dataset_id, - params.request.old_region_id, - params.request.old_snapshot_id, - ))); - }; + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + dataset_id, region_id, params.request.old_snapshot_id, + ))); + }; - let (new_dataset, new_region) = new_dataset_and_region; + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, - // Currently, the repair port is set using a fixed offset above the - // downstairs port. Once this goes away, Nexus will require a way to query - // for the repair port! + Err(e) => { + return Err(ActionError::action_failed(format!( + "error parsing region_snapshot.snapshot_addr: {e}" + ))); + } + } + } - let mut source_repair_addr: SocketAddrV6 = - match region_snapshot.snapshot_addr.parse() { - Ok(addr) => addr, + CloneSource::Region { region_id } => { + let maybe_addr = osagactx + .datastore() + .region_addr(region_id) + .await + .map_err(ActionError::action_failed)?; - Err(e) => { - return Err(ActionError::action_failed(format!( - "error parsing region_snapshot.snapshot_addr: {e}" - ))); + match maybe_addr { + Some(addr) => addr, + + None => { + return Err(ActionError::action_failed(format!( + "region clone source {region_id} has no port!" + ))); + } } - }; + } + }; + + // Currently, the repair port is set using a fixed offset above the + // downstairs port. Once this goes away, Nexus will require a way to query + // for the repair port! source_repair_addr.set_port( source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET, ); + let (new_dataset, new_region) = new_dataset_and_region; + let ensured_region = osagactx .nexus() .ensure_region_in_dataset( diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs index c301372d5f..30754eca3c 100644 --- a/nexus/tests/integration_tests/crucible_replacements.rs +++ b/nexus/tests/integration_tests/crucible_replacements.rs @@ -694,7 +694,7 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( // expunged. let expunged_regions = datastore - .find_regions_on_expunged_physical_disks(&opctx) + .find_read_write_regions_on_expunged_physical_disks(&opctx) .await .unwrap(); @@ -879,11 +879,36 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( .await; // Assert the region snapshot was deleted. - assert!(datastore - .region_snapshot_get(dataset.id(), region.id(), snapshot.identity.id) - .await - .unwrap() - .is_none()); + wait_for_condition( + || { + let dataset_id = dataset.id(); + let region_id = region.id(); + let snapshot_id = snapshot.identity.id; + + async move { + let region_snapshot = datastore + .region_snapshot_get(dataset_id, region_id, snapshot_id) + .await + .unwrap(); + + match region_snapshot { + Some(_) => { + // Region snapshot not garbage collected yet + Err(CondCheckError::<()>::NotYet) + } + + None => { + // Region snapshot garbage collected ok + Ok(()) + } + } + } + }, + &std::time::Duration::from_millis(500), + &std::time::Duration::from_secs(60), + ) + .await + .expect("region snapshot garbage collected"); // Assert that the disk's volume is still only soft-deleted, because the two // other associated region snapshots still exist. @@ -957,12 +982,19 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( // The saga transitioned the request ok Ok(()) } else if state == RegionReplacementState::Driving { - // The saga is still running + // The drive saga is still running + Err(CondCheckError::<()>::NotYet) + } else if state == RegionReplacementState::Running { + // The drive saga hasn't started yet Err(CondCheckError::<()>::NotYet) } else if state == RegionReplacementState::Completing { // The saga transitioned the request ok, and it's now being // finished by the region replacement finish saga Ok(()) + } else if state == RegionReplacementState::Complete { + // The saga transitioned the request ok, and it was finished + // by the region replacement finish saga + Ok(()) } else { // Any other state is not expected panic!("unexpected state {state:?}!"); @@ -1705,3 +1737,338 @@ async fn test_delete_volume_region_snapshot_replacement_step( test_harness.assert_no_crucible_resources_leaked().await; } + +/// Tests that replacement can occur until completion +#[nexus_test] +async fn test_replacement_sanity(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging the physical disk, save the DB model + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + // Next, expunge a physical disk that contains a region + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (dataset, _) = &disk_allocated_regions[0]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Now, run all replacement tasks to completion + let internal_client = &cptestctx.internal_client; + run_replacement_tasks_to_completion(&internal_client).await; +} + +/// Tests that multiple replacements can occur until completion +#[nexus_test] +async fn test_region_replacement_triple_sanity( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + for i in disk_allocated_regions { + let (dataset, _) = &i; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + } + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} + +/// Tests that multiple replacements can occur until completion, after expunging +/// two physical disks before any replacements occur (aka we can lose two +/// physical disks and still recover) +#[nexus_test] +async fn test_region_replacement_triple_sanity_2( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Expunge two physical disks before any replacements occur + for i in [0, 1] { + let (dataset, _) = &disk_allocated_regions[i]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + // Expunge the last physical disk + { + let (dataset, _) = &disk_allocated_regions[2]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 8fd648096a..2299ba9db9 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -117,6 +117,8 @@ impl CrucibleDataInner { bail!("region creation error!"); } + let read_only = params.source.is_some(); + let region = Region { id: params.id, block_size: params.block_size, @@ -129,8 +131,8 @@ impl CrucibleDataInner { cert_pem: None, key_pem: None, root_pem: None, - source: None, - read_only: params.source.is_some(), + source: params.source, + read_only, }; let old = self.regions.insert(id, region.clone()); @@ -1364,29 +1366,41 @@ pub struct PantryVolume { activate_job: Option, } +pub struct PantryInner { + /// Map Volume UUID to PantryVolume struct + volumes: HashMap, + + jobs: HashSet, + + /// Auto activate volumes attached in the background + auto_activate_volumes: bool, +} + /// Simulated crucible pantry pub struct Pantry { pub id: OmicronZoneUuid, - /// Map Volume UUID to PantryVolume struct - volumes: Mutex>, sled_agent: Arc, - jobs: Mutex>, + inner: Mutex, } impl Pantry { pub fn new(sled_agent: Arc) -> Self { Self { id: OmicronZoneUuid::new_v4(), - volumes: Mutex::new(HashMap::default()), sled_agent, - jobs: Mutex::new(HashSet::default()), + inner: Mutex::new(PantryInner { + volumes: HashMap::default(), + jobs: HashSet::default(), + auto_activate_volumes: false, + }), } } pub async fn status(&self) -> Result { + let inner = self.inner.lock().await; Ok(PantryStatus { - volumes: self.volumes.lock().await.keys().cloned().collect(), - num_job_handles: self.jobs.lock().await.len(), + volumes: inner.volumes.keys().cloned().collect(), + num_job_handles: inner.jobs.len(), }) } @@ -1394,8 +1408,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; - match volumes.get(&volume_id) { + let inner = self.inner.lock().await; + + match inner.volumes.get(&volume_id) { Some(entry) => Ok(entry.vcr.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1407,9 +1422,9 @@ impl Pantry { volume_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<()> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, @@ -1425,29 +1440,34 @@ impl Pantry { Ok(()) } + pub async fn set_auto_activate_volumes(&self) { + self.inner.lock().await.auto_activate_volumes = true; + } + pub async fn attach_activate_background( &self, volume_id: String, activate_job_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; + + let auto_activate_volumes = inner.auto_activate_volumes; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, status: VolumeStatus { - active: false, - seen_active: false, + active: auto_activate_volumes, + seen_active: auto_activate_volumes, num_job_handles: 1, }, activate_job: Some(activate_job_id.clone()), }, ); - jobs.insert(activate_job_id); + inner.jobs.insert(activate_job_id); Ok(()) } @@ -1457,8 +1477,8 @@ impl Pantry { volume_id: String, ) -> Result { let activate_job = { - let volumes = self.volumes.lock().await; - volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() + let inner = self.inner.lock().await; + inner.volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() }; let mut status = self.volume_status(volume_id.clone()).await?; @@ -1475,9 +1495,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; + let inner = self.inner.lock().await; - match volumes.get(&volume_id) { + match inner.volumes.get(&volume_id) { Some(pantry_volume) => Ok(pantry_volume.status.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1489,9 +1509,9 @@ impl Pantry { volume_id: String, status: VolumeStatus, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - match volumes.get_mut(&volume_id) { + match inner.volumes.get_mut(&volume_id) { Some(pantry_volume) => { pantry_volume.status = status; Ok(()) @@ -1505,8 +1525,8 @@ impl Pantry { &self, job_id: String, ) -> Result { - let jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } Ok(true) @@ -1516,11 +1536,11 @@ impl Pantry { &self, job_id: String, ) -> Result, HttpError> { - let mut jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let mut inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } - jobs.remove(&job_id); + inner.jobs.remove(&job_id); Ok(Ok(true)) } @@ -1533,9 +1553,9 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } @@ -1549,8 +1569,9 @@ impl Pantry { // the simulated instance ensure, then call // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the // same. - let volumes = self.volumes.lock().await; - let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr; + let inner = self.inner.lock().await; + let volume_construction_request = + &inner.volumes.get(&volume_id).unwrap().vcr; self.sled_agent .map_disk_ids_to_region_ids(volume_construction_request) @@ -1630,16 +1651,16 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } pub async fn detach(&self, volume_id: String) -> Result<()> { - let mut volumes = self.volumes.lock().await; - volumes.remove(&volume_id); + let mut inner = self.inner.lock().await; + inner.volumes.remove(&volume_id); Ok(()) } }