diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 8e59462aa3..67bd37cf69 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -548,6 +548,42 @@ impl DataStore { Ok(records) } + + /// Find regions not on expunged disks that match a volume id + pub async fn find_non_expunged_regions( + &self, + opctx: &OpContext, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region::dsl as region_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_dsl::region + .filter(region_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_dsl::volume_id.eq(volume_id)) + .select(Region::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] diff --git a/nexus/db-queries/src/db/datastore/region_snapshot.rs b/nexus/db-queries/src/db/datastore/region_snapshot.rs index 0129869f4f..f7a34fdb52 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot.rs @@ -120,4 +120,40 @@ impl DataStore { .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Find region snapshots not on expunged disks that match a snapshot id + pub async fn find_non_expunged_region_snapshots( + &self, + opctx: &OpContext, + snapshot_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region_snapshot::dsl as region_snapshot_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_snapshot_dsl::region_snapshot + .filter(region_snapshot_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_snapshot_dsl::snapshot_id.eq(snapshot_id)) + .select(RegionSnapshot::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index bb5fd60209..d5e2cd9bd5 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -65,6 +65,7 @@ use crate::app::{authn, db}; use nexus_types::identity::Asset; use nexus_types::identity::Resource; use omicron_common::api::external::Error; +use omicron_uuid_kinds::DatasetUuid; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::CrucibleOpts; @@ -91,6 +92,9 @@ declare_saga_actions! { + rsrss_set_saga_id - rsrss_set_saga_id_undo } + GET_CLONE_SOURCE -> "clone_source" { + + rsrss_get_clone_source + } GET_ALLOC_REGION_PARAMS -> "alloc_region_params" { + rsrss_get_alloc_region_params } @@ -194,6 +198,7 @@ impl NexusSaga for SagaRegionSnapshotReplacementStart { )); builder.append(set_saga_id_action()); + builder.append(get_clone_source_action()); builder.append(get_alloc_region_params_action()); builder.append(alloc_new_region_action()); builder.append(find_new_region_action()); @@ -265,6 +270,145 @@ async fn rsrss_set_saga_id_undo( Ok(()) } +#[derive(Debug, Deserialize, Serialize)] +enum CloneSource { + RegionSnapshot { dataset_id: DatasetUuid, region_id: Uuid }, + Region { region_id: Uuid }, +} + +async fn rsrss_get_clone_source( + sagactx: NexusActionContext, +) -> Result { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + // If the downstairs we're cloning from is on an expunged dataset, the clone + // will never succeed. Find either a region snapshot or a read-only region + // that is associated with the request snapshot that has not been expunged. + // + // Importantly, determine the clone source before new region alloc step in + // this saga, otherwise the query that searches for read-only region + // candidates will match the newly allocated region (that is not created + // yet!). + + let request_dataset_on_in_service_physical_disk = osagactx + .datastore() + .dataset_physical_disk_in_service(params.request.old_dataset_id.into()) + .await + .map_err(ActionError::action_failed)?; + + let clone_source = if request_dataset_on_in_service_physical_disk { + // If the request region snapshot's dataset has not been expunged, + // it can be used + CloneSource::RegionSnapshot { + dataset_id: params.request.old_dataset_id.into(), + region_id: params.request.old_region_id, + } + } else { + info!( + log, + "request region snapshot dataset expunged, finding another"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %params.request.old_dataset_id, + ); + + // Select another region snapshot that's part of this snapshot - they + // will all have identical contents. + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let mut non_expunged_region_snapshots = osagactx + .datastore() + .find_non_expunged_region_snapshots( + &opctx, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + match non_expunged_region_snapshots.pop() { + Some(candidate) => { + info!( + log, + "found another non-expunged region snapshot"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id, + "region_id" => %candidate.region_id, + ); + + CloneSource::RegionSnapshot { + dataset_id: candidate.dataset_id.into(), + region_id: candidate.region_id, + } + } + + None => { + // If a candidate region snapshot was not found, look for the + // snapshot's read-only regions. + + info!( + log, + "no region snapshot clone source candidates"; + "snapshot_id" => %params.request.old_snapshot_id, + ); + + // Look up the existing snapshot + let maybe_db_snapshot = osagactx + .datastore() + .snapshot_get(&opctx, params.request.old_snapshot_id) + .await + .map_err(ActionError::action_failed)?; + + let Some(db_snapshot) = maybe_db_snapshot else { + return Err(ActionError::action_failed( + Error::internal_error(&format!( + "snapshot {} was hard deleted!", + params.request.old_snapshot_id + )), + )); + }; + + let mut non_expunged_read_only_regions = osagactx + .datastore() + .find_non_expunged_regions(&opctx, db_snapshot.volume_id) + .await + .map_err(ActionError::action_failed)?; + + match non_expunged_read_only_regions.pop() { + Some(candidate) => { + info!( + log, + "found region clone source candidate"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id(), + "region_id" => %candidate.id(), + ); + + CloneSource::Region { region_id: candidate.id() } + } + + None => { + // If all targets of a Volume::Region are on expunged + // datasets, then the user's data is gone, and this code + // will fail to select a clone source. + + return Err(ActionError::action_failed(format!( + "no clone source candidate for {}!", + params.request.old_snapshot_id, + ))); + } + } + } + } + }; + + Ok(clone_source) +} + #[derive(Debug, Deserialize, Serialize)] struct AllocRegionParams { block_size: u64, @@ -445,46 +589,67 @@ async fn rsrss_new_region_ensure( "new_dataset_and_region", )?; - let region_snapshot = osagactx - .datastore() - .region_snapshot_get( - params.request.old_dataset_id.into(), - params.request.old_region_id, - params.request.old_snapshot_id, - ) - .await - .map_err(ActionError::action_failed)?; + let clone_source = sagactx.lookup::("clone_source")?; + + let mut source_repair_addr: SocketAddrV6 = match clone_source { + CloneSource::RegionSnapshot { dataset_id, region_id } => { + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + dataset_id, + region_id, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; - let Some(region_snapshot) = region_snapshot else { - return Err(ActionError::action_failed(format!( - "region snapshot {} {} {} deleted!", - params.request.old_dataset_id, - params.request.old_region_id, - params.request.old_snapshot_id, - ))); - }; + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + dataset_id, region_id, params.request.old_snapshot_id, + ))); + }; - let (new_dataset, new_region) = new_dataset_and_region; + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, - // Currently, the repair port is set using a fixed offset above the - // downstairs port. Once this goes away, Nexus will require a way to query - // for the repair port! + Err(e) => { + return Err(ActionError::action_failed(format!( + "error parsing region_snapshot.snapshot_addr: {e}" + ))); + } + } + } - let mut source_repair_addr: SocketAddrV6 = - match region_snapshot.snapshot_addr.parse() { - Ok(addr) => addr, + CloneSource::Region { region_id } => { + let maybe_addr = osagactx + .datastore() + .region_addr(region_id) + .await + .map_err(ActionError::action_failed)?; - Err(e) => { - return Err(ActionError::action_failed(format!( - "error parsing region_snapshot.snapshot_addr: {e}" - ))); + match maybe_addr { + Some(addr) => addr, + + None => { + return Err(ActionError::action_failed(format!( + "region clone source {region_id} has no port!" + ))); + } } - }; + } + }; + + // Currently, the repair port is set using a fixed offset above the + // downstairs port. Once this goes away, Nexus will require a way to query + // for the repair port! source_repair_addr.set_port( source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET, ); + let (new_dataset, new_region) = new_dataset_and_region; + let ensured_region = osagactx .nexus() .ensure_region_in_dataset( diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs index c301372d5f..30754eca3c 100644 --- a/nexus/tests/integration_tests/crucible_replacements.rs +++ b/nexus/tests/integration_tests/crucible_replacements.rs @@ -694,7 +694,7 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( // expunged. let expunged_regions = datastore - .find_regions_on_expunged_physical_disks(&opctx) + .find_read_write_regions_on_expunged_physical_disks(&opctx) .await .unwrap(); @@ -879,11 +879,36 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( .await; // Assert the region snapshot was deleted. - assert!(datastore - .region_snapshot_get(dataset.id(), region.id(), snapshot.identity.id) - .await - .unwrap() - .is_none()); + wait_for_condition( + || { + let dataset_id = dataset.id(); + let region_id = region.id(); + let snapshot_id = snapshot.identity.id; + + async move { + let region_snapshot = datastore + .region_snapshot_get(dataset_id, region_id, snapshot_id) + .await + .unwrap(); + + match region_snapshot { + Some(_) => { + // Region snapshot not garbage collected yet + Err(CondCheckError::<()>::NotYet) + } + + None => { + // Region snapshot garbage collected ok + Ok(()) + } + } + } + }, + &std::time::Duration::from_millis(500), + &std::time::Duration::from_secs(60), + ) + .await + .expect("region snapshot garbage collected"); // Assert that the disk's volume is still only soft-deleted, because the two // other associated region snapshots still exist. @@ -957,12 +982,19 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( // The saga transitioned the request ok Ok(()) } else if state == RegionReplacementState::Driving { - // The saga is still running + // The drive saga is still running + Err(CondCheckError::<()>::NotYet) + } else if state == RegionReplacementState::Running { + // The drive saga hasn't started yet Err(CondCheckError::<()>::NotYet) } else if state == RegionReplacementState::Completing { // The saga transitioned the request ok, and it's now being // finished by the region replacement finish saga Ok(()) + } else if state == RegionReplacementState::Complete { + // The saga transitioned the request ok, and it was finished + // by the region replacement finish saga + Ok(()) } else { // Any other state is not expected panic!("unexpected state {state:?}!"); @@ -1705,3 +1737,338 @@ async fn test_delete_volume_region_snapshot_replacement_step( test_harness.assert_no_crucible_resources_leaked().await; } + +/// Tests that replacement can occur until completion +#[nexus_test] +async fn test_replacement_sanity(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging the physical disk, save the DB model + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + // Next, expunge a physical disk that contains a region + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (dataset, _) = &disk_allocated_regions[0]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Now, run all replacement tasks to completion + let internal_client = &cptestctx.internal_client; + run_replacement_tasks_to_completion(&internal_client).await; +} + +/// Tests that multiple replacements can occur until completion +#[nexus_test] +async fn test_region_replacement_triple_sanity( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + for i in disk_allocated_regions { + let (dataset, _) = &i; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + } + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} + +/// Tests that multiple replacements can occur until completion, after expunging +/// two physical disks before any replacements occur (aka we can lose two +/// physical disks and still recover) +#[nexus_test] +async fn test_region_replacement_triple_sanity_2( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Expunge two physical disks before any replacements occur + for i in [0, 1] { + let (dataset, _) = &disk_allocated_regions[i]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + // Expunge the last physical disk + { + let (dataset, _) = &disk_allocated_regions[2]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 8fd648096a..2299ba9db9 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -117,6 +117,8 @@ impl CrucibleDataInner { bail!("region creation error!"); } + let read_only = params.source.is_some(); + let region = Region { id: params.id, block_size: params.block_size, @@ -129,8 +131,8 @@ impl CrucibleDataInner { cert_pem: None, key_pem: None, root_pem: None, - source: None, - read_only: params.source.is_some(), + source: params.source, + read_only, }; let old = self.regions.insert(id, region.clone()); @@ -1364,29 +1366,41 @@ pub struct PantryVolume { activate_job: Option, } +pub struct PantryInner { + /// Map Volume UUID to PantryVolume struct + volumes: HashMap, + + jobs: HashSet, + + /// Auto activate volumes attached in the background + auto_activate_volumes: bool, +} + /// Simulated crucible pantry pub struct Pantry { pub id: OmicronZoneUuid, - /// Map Volume UUID to PantryVolume struct - volumes: Mutex>, sled_agent: Arc, - jobs: Mutex>, + inner: Mutex, } impl Pantry { pub fn new(sled_agent: Arc) -> Self { Self { id: OmicronZoneUuid::new_v4(), - volumes: Mutex::new(HashMap::default()), sled_agent, - jobs: Mutex::new(HashSet::default()), + inner: Mutex::new(PantryInner { + volumes: HashMap::default(), + jobs: HashSet::default(), + auto_activate_volumes: false, + }), } } pub async fn status(&self) -> Result { + let inner = self.inner.lock().await; Ok(PantryStatus { - volumes: self.volumes.lock().await.keys().cloned().collect(), - num_job_handles: self.jobs.lock().await.len(), + volumes: inner.volumes.keys().cloned().collect(), + num_job_handles: inner.jobs.len(), }) } @@ -1394,8 +1408,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; - match volumes.get(&volume_id) { + let inner = self.inner.lock().await; + + match inner.volumes.get(&volume_id) { Some(entry) => Ok(entry.vcr.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1407,9 +1422,9 @@ impl Pantry { volume_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<()> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, @@ -1425,29 +1440,34 @@ impl Pantry { Ok(()) } + pub async fn set_auto_activate_volumes(&self) { + self.inner.lock().await.auto_activate_volumes = true; + } + pub async fn attach_activate_background( &self, volume_id: String, activate_job_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; + + let auto_activate_volumes = inner.auto_activate_volumes; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, status: VolumeStatus { - active: false, - seen_active: false, + active: auto_activate_volumes, + seen_active: auto_activate_volumes, num_job_handles: 1, }, activate_job: Some(activate_job_id.clone()), }, ); - jobs.insert(activate_job_id); + inner.jobs.insert(activate_job_id); Ok(()) } @@ -1457,8 +1477,8 @@ impl Pantry { volume_id: String, ) -> Result { let activate_job = { - let volumes = self.volumes.lock().await; - volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() + let inner = self.inner.lock().await; + inner.volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() }; let mut status = self.volume_status(volume_id.clone()).await?; @@ -1475,9 +1495,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; + let inner = self.inner.lock().await; - match volumes.get(&volume_id) { + match inner.volumes.get(&volume_id) { Some(pantry_volume) => Ok(pantry_volume.status.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1489,9 +1509,9 @@ impl Pantry { volume_id: String, status: VolumeStatus, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - match volumes.get_mut(&volume_id) { + match inner.volumes.get_mut(&volume_id) { Some(pantry_volume) => { pantry_volume.status = status; Ok(()) @@ -1505,8 +1525,8 @@ impl Pantry { &self, job_id: String, ) -> Result { - let jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } Ok(true) @@ -1516,11 +1536,11 @@ impl Pantry { &self, job_id: String, ) -> Result, HttpError> { - let mut jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let mut inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } - jobs.remove(&job_id); + inner.jobs.remove(&job_id); Ok(Ok(true)) } @@ -1533,9 +1553,9 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } @@ -1549,8 +1569,9 @@ impl Pantry { // the simulated instance ensure, then call // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the // same. - let volumes = self.volumes.lock().await; - let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr; + let inner = self.inner.lock().await; + let volume_construction_request = + &inner.volumes.get(&volume_id).unwrap().vcr; self.sled_agent .map_disk_ids_to_region_ids(volume_construction_request) @@ -1630,16 +1651,16 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } pub async fn detach(&self, volume_id: String) -> Result<()> { - let mut volumes = self.volumes.lock().await; - volumes.remove(&volume_id); + let mut inner = self.inner.lock().await; + inner.volumes.remove(&volume_id); Ok(()) } }