diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 193a205a96..136f1b6f62 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -1129,6 +1129,14 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { println!(" > {line}"); } + println!( + " region replacement requests set to completed ok: {}", + status.requests_completed_ok.len() + ); + for line in &status.requests_completed_ok { + println!(" > {line}"); + } + println!(" errors: {}", status.errors.len()); for line in &status.errors { println!(" > {line}"); diff --git a/nexus/db-model/src/region_replacement.rs b/nexus/db-model/src/region_replacement.rs index 995c55001c..57ead4b68e 100644 --- a/nexus/db-model/src/region_replacement.rs +++ b/nexus/db-model/src/region_replacement.rs @@ -116,6 +116,13 @@ impl std::str::FromStr for RegionReplacementState { /// "finish" notification is seen by the region replacement drive background /// task. This check is done before invoking the region replacement drive saga. /// +/// If the volume whose region is being replaced is soft-deleted or +/// hard-deleted, then the replacement request will be transitioned along the +/// states to Complete while avoiding operations that are meant to operate on +/// that volume. If the volume is soft-deleted or hard-deleted while the +/// replacement request is in the "Requested" state, the replacement request +/// will transition straight to Complete, and no operations will be performed. +/// /// See also: RegionReplacementStep records #[derive( Queryable, diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs index 62598d710e..b9b5a0827f 100644 --- a/nexus/db-queries/src/db/datastore/region_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -657,7 +657,7 @@ impl DataStore { /// Transition a RegionReplacement record from Completing to Complete, /// clearing the operating saga id. Also removes the `volume_repair` record - /// that is taking a "lock" on the Volume. + /// that is taking a lock on the Volume. pub async fn set_region_replacement_complete( &self, opctx: &OpContext, @@ -723,6 +723,75 @@ impl DataStore { }) } + /// Transition a RegionReplacement record from Requested to Complete, which + /// occurs when the associated volume is soft or hard deleted. Also removes + /// the `volume_repair` record that is taking a lock on the Volume. + pub async fn set_region_replacement_complete_from_requested( + &self, + opctx: &OpContext, + request: RegionReplacement, + ) -> Result<(), Error> { + type TxnError = TransactionError; + + assert_eq!( + request.replacement_state, + RegionReplacementState::Requested, + ); + + self.pool_connection_authorized(opctx) + .await? + .transaction_async(|conn| async move { + Self::volume_repair_delete_query( + request.volume_id, + request.id, + ) + .execute_async(&conn) + .await?; + + use db::schema::region_replacement::dsl; + + let result = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(request.id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Requested), + ) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Complete), + )) + .check_if_exists::(request.id) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.replacement_state == RegionReplacementState::Complete { + Ok(()) + } else { + Err(TxnError::CustomError(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + request.id, + record.replacement_state, + record.operating_saga_id, + )))) + } + } + } + }) + .await + .map_err(|e| match e { + TxnError::CustomError(error) => error, + + TxnError::Database(error) => { + public_error_from_diesel(error, ErrorHandler::Server) + } + }) + } + /// Nexus has been notified by an Upstairs (or has otherwised determined) /// that a region replacement is done, so update the record. Filter on the /// following: diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index c643b86d24..b000325954 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1572,6 +1572,14 @@ impl DataStore { .optional() .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Return true if a volume was soft-deleted or hard-deleted + pub async fn volume_deleted(&self, volume_id: Uuid) -> Result { + match self.volume_get(volume_id).await? { + Some(v) => Ok(v.time_deleted.is_some()), + None => Ok(true), + } + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)] diff --git a/nexus/src/app/background/tasks/region_replacement.rs b/nexus/src/app/background/tasks/region_replacement.rs index b1c717c77b..d5013db134 100644 --- a/nexus/src/app/background/tasks/region_replacement.rs +++ b/nexus/src/app/background/tasks/region_replacement.rs @@ -170,8 +170,79 @@ impl BackgroundTask for RegionReplacementDetector { }; for request in requests { + // If the replacement request is in the `requested` state and + // the request's volume was soft-deleted or hard-deleted, avoid + // sending the start request and instead transition the request + // to completed + + let volume_deleted = match self + .datastore + .volume_deleted(request.volume_id) + .await + { + Ok(volume_deleted) => volume_deleted, + + Err(e) => { + let s = format!( + "error checking if volume id {} was deleted: {e}", + request.volume_id, + ); + error!(&log, "{s}"); + + status.errors.push(s); + continue; + } + }; + let request_id = request.id; + if volume_deleted { + // Volume was soft or hard deleted, so proceed with clean + // up, which if this is in state Requested there won't be + // any additional associated state, so transition the record + // to Completed. + + info!( + &log, + "request {} volume {} was soft or hard deleted!", + request_id, + request.volume_id, + ); + + let result = self + .datastore + .set_region_replacement_complete_from_requested( + opctx, request, + ) + .await; + + match result { + Ok(()) => { + let s = format!( + "request {} transitioned from requested to \ + complete", + request_id, + ); + + info!(&log, "{s}"); + status.requests_completed_ok.push(s); + } + + Err(e) => { + let s = format!( + "error transitioning {} from requested to \ + complete: {e}", + request_id, + ); + + error!(&log, "{s}"); + status.errors.push(s); + } + } + + continue; + } + let result = self .send_start_request( authn::saga::Serialized::for_opctx(opctx), diff --git a/nexus/src/app/sagas/region_replacement_drive.rs b/nexus/src/app/sagas/region_replacement_drive.rs index d95591848e..18c675c449 100644 --- a/nexus/src/app/sagas/region_replacement_drive.rs +++ b/nexus/src/app/sagas/region_replacement_drive.rs @@ -309,6 +309,32 @@ async fn srrd_drive_region_replacement_check( ¶ms.serialized_authn, ); + // It doesn't make sense to perform any of this saga if the volume was soft + // or hard deleted: for example, this happens if the higher level resource + // like the disk was deleted. Volume deletion potentially results in the + // clean-up of Crucible resources, so it wouldn't even be valid to attempt + // to drive forward any type of live repair or reconciliation. + // + // Setting Done here will cause this saga to transition the replacement + // request to ReplacementDone. + + let volume_deleted = osagactx + .datastore() + .volume_deleted(params.request.volume_id) + .await + .map_err(ActionError::action_failed)?; + + if volume_deleted { + info!( + log, + "volume was soft or hard deleted!"; + "region replacement id" => %params.request.id, + "volume id" => %params.request.volume_id, + ); + + return Ok(DriveCheck::Done); + } + let last_request_step = osagactx .datastore() .current_region_replacement_request_step(&opctx, params.request.id) diff --git a/nexus/test-utils/src/background.rs b/nexus/test-utils/src/background.rs index 58792e547d..5a7851f103 100644 --- a/nexus/test-utils/src/background.rs +++ b/nexus/test-utils/src/background.rs @@ -11,6 +11,7 @@ use nexus_client::types::CurrentStatus; use nexus_client::types::CurrentStatusRunning; use nexus_client::types::LastResult; use nexus_client::types::LastResultCompleted; +use nexus_types::internal_api::background::*; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use std::time::Duration; @@ -84,3 +85,198 @@ pub async fn activate_background_task( last_task_poll } + +/// Run the region_replacement background task, returning how many actions +/// were taken +pub async fn run_region_replacement( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = + activate_background_task(&internal_client, "region_replacement").await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.requests_created_ok.len() + + status.start_invoked_ok.len() + + status.requests_completed_ok.len() +} + +/// Run the region_replacement_driver background task, returning how many actions +/// were taken +pub async fn run_region_replacement_driver( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = + activate_background_task(&internal_client, "region_replacement_driver") + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.drive_invoked_ok.len() + status.finish_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_start background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_start( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_start", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = + serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.requests_created_ok.len() + status.start_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_garbage_collection background task, +/// returning how many actions were taken +pub async fn run_region_snapshot_replacement_garbage_collection( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_garbage_collection", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::< + RegionSnapshotReplacementGarbageCollectStatus, + >(last_result_completed.details) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.garbage_collect_requested.len() +} + +/// Run the region_snapshot_replacement_step background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_step( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_step", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + eprintln!("{:?}", &status.errors); + + assert!(status.errors.is_empty()); + + status.step_records_created_ok.len() + + status.step_garbage_collect_invoked_ok.len() + + status.step_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_finish background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_finish( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_finish", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = + serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.records_set_to_done.len() +} + +/// Run all replacement related background tasks until they aren't doing +/// anything anymore. +pub async fn run_replacement_tasks_to_completion( + internal_client: &ClientTestContext, +) { + wait_for_condition( + || async { + let actions_taken = + // region replacement related + run_region_replacement(internal_client).await + + run_region_replacement_driver(internal_client).await + + // region snapshot replacement related + run_region_snapshot_replacement_start(internal_client).await + + run_region_snapshot_replacement_garbage_collection(internal_client).await + + run_region_snapshot_replacement_step(internal_client).await + + run_region_snapshot_replacement_finish(internal_client).await; + + if actions_taken > 0 { + Err(CondCheckError::<()>::NotYet) + } else { + Ok(()) + } + }, + &Duration::from_secs(1), + &Duration::from_secs(10), + ) + .await + .unwrap(); +} diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs new file mode 100644 index 0000000000..16ce892528 --- /dev/null +++ b/nexus/tests/integration_tests/crucible_replacements.rs @@ -0,0 +1,577 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Tests related to region and region snapshot replacement + +use dropshot::test_util::ClientTestContext; +use nexus_db_model::PhysicalDiskPolicy; +use nexus_db_model::RegionReplacementState; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::lookup::LookupPath; +use nexus_test_utils::background::*; +use nexus_test_utils::http_testing::AuthnMode; +use nexus_test_utils::http_testing::NexusRequest; +use nexus_test_utils::resource_helpers::create_default_ip_pool; +use nexus_test_utils::resource_helpers::create_disk; +use nexus_test_utils::resource_helpers::create_project; +use nexus_test_utils_macros::nexus_test; +use omicron_uuid_kinds::GenericUuid; +use uuid::Uuid; + +type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; +type DiskTestBuilder<'a> = nexus_test_utils::resource_helpers::DiskTestBuilder< + 'a, + omicron_nexus::Server, +>; + +const PROJECT_NAME: &str = "now-this-is-pod-racing"; + +fn get_disk_url(disk_name: &str) -> String { + format!("/v1/disks/{disk_name}?project={}", PROJECT_NAME) +} + +async fn create_project_and_pool(client: &ClientTestContext) -> Uuid { + create_default_ip_pool(client).await; + let project = create_project(client, PROJECT_NAME).await; + project.identity.id +} + +/// Assert that the first part of region replacement does not create a freed +/// crucible region (that would be picked up by a volume delete saga) +#[nexus_test] +async fn test_region_replacement_does_not_create_freed_region( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Before expunging the physical disk, save the DB model + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + // Next, expunge a physical disk that contains a region + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (dataset, _) = &disk_allocated_regions[0]; + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id, + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, run the first part of region replacement: this will move the deleted + // region into a temporary volume. + + let internal_client = &cptestctx.internal_client; + + let _ = + activate_background_task(&internal_client, "region_replacement").await; + + // Assert there are no freed crucible regions that result from that + assert!(datastore.find_deleted_volume_regions().await.unwrap().is_empty()); +} + +/// Assert that a region replacement request in state "Requested" can have its +/// volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_requested( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "Running" can have its +/// volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_running( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "Running" that has +/// additionally had its volume attached to a Pantry can have its volume deleted +/// and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_running_on_pantry( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Run the "region replacement driver" task to attach the associated volume + // to the simulated pantry + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + let most_recent_step = datastore + .current_region_replacement_request_step(&opctx, region_replacement.id) + .await + .unwrap() + .unwrap(); + + assert!(most_recent_step.pantry_address().is_some()); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "ReplacementDone" can have +/// its volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_replacement_done( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Run the "region replacement driver" task to attach the associated volume + // to the simulated pantry + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + let most_recent_step = datastore + .current_region_replacement_request_step(&opctx, region_replacement.id) + .await + .unwrap() + .unwrap(); + + assert!(most_recent_step.pantry_address().is_some()); + + // Manually activate the background attachment + + let pantry = &cptestctx.sled_agent.pantry_server.as_ref().unwrap().pantry; + pantry + .activate_background_attachment( + region_replacement.volume_id.to_string(), + ) + .await + .unwrap(); + + // Run the "region replacement driver" task again, this time it should + // transition the request to ReplacementDone + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::ReplacementDone, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} diff --git a/nexus/tests/integration_tests/mod.rs b/nexus/tests/integration_tests/mod.rs index fdf14dbd07..f18accb132 100644 --- a/nexus/tests/integration_tests/mod.rs +++ b/nexus/tests/integration_tests/mod.rs @@ -11,6 +11,7 @@ mod basic; mod certificates; mod commands; mod console_api; +mod crucible_replacements; mod demo_saga; mod device_auth; mod disks; diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index dbd8ad1d8e..92e78107bf 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -12,6 +12,7 @@ use uuid::Uuid; pub struct RegionReplacementStatus { pub requests_created_ok: Vec, pub start_invoked_ok: Vec, + pub requests_completed_ok: Vec, pub errors: Vec, } diff --git a/sled-agent/src/sim/http_entrypoints_pantry.rs b/sled-agent/src/sim/http_entrypoints_pantry.rs index a93cb6fca9..148a6c882a 100644 --- a/sled-agent/src/sim/http_entrypoints_pantry.rs +++ b/sled-agent/src/sim/http_entrypoints_pantry.rs @@ -23,13 +23,19 @@ pub fn api() -> CruciblePantryApiDescription { fn register_endpoints( api: &mut CruciblePantryApiDescription, ) -> Result<(), ApiDescriptionRegisterError> { + api.register(pantry_status)?; + api.register(volume_status)?; api.register(attach)?; + api.register(attach_activate_background)?; + // api.register(replace)?; api.register(is_job_finished)?; api.register(job_result_ok)?; api.register(import_from_url)?; api.register(snapshot)?; api.register(bulk_write)?; + // api.register(bulk_read)?; api.register(scrub)?; + // api.register(validate)?; api.register(detach)?; Ok(()) @@ -46,11 +52,64 @@ pub fn api() -> CruciblePantryApiDescription { // pantry here, to avoid skew. However, this was wholesale copied from the // crucible repo! +#[derive(Serialize, JsonSchema)] +pub struct PantryStatus { + /// Which volumes does this Pantry know about? Note this may include volumes + /// that are no longer active, and haven't been garbage collected yet. + pub volumes: Vec, + + /// How many job handles? + pub num_job_handles: usize, +} + +/// Get the Pantry's status +#[endpoint { + method = GET, + path = "/crucible/pantry/0", +}] +async fn pantry_status( + rc: RequestContext>, +) -> Result, HttpError> { + let pantry = rc.context(); + + let status = pantry.status().await?; + + Ok(HttpResponseOk(status)) +} + #[derive(Deserialize, JsonSchema)] struct VolumePath { pub id: String, } +#[derive(Clone, Deserialize, Serialize, JsonSchema)] +pub struct VolumeStatus { + /// Is the Volume currently active? + pub active: bool, + + /// Has the Pantry ever seen this Volume active? + pub seen_active: bool, + + /// How many job handles are there for this Volume? + pub num_job_handles: usize, +} + +/// Get a current Volume's status +#[endpoint { + method = GET, + path = "/crucible/pantry/0/volume/{id}", +}] +async fn volume_status( + rc: RequestContext>, + path: TypedPath, +) -> Result, HttpError> { + let path = path.into_inner(); + let pantry = rc.context(); + + let status = pantry.volume_status(path.id.clone()).await?; + Ok(HttpResponseOk(status)) +} + #[derive(Deserialize, JsonSchema)] struct AttachRequest { pub volume_construction_request: VolumeConstructionRequest, @@ -84,6 +143,38 @@ async fn attach( Ok(HttpResponseOk(AttachResult { id: path.id })) } +#[derive(Deserialize, JsonSchema)] +struct AttachBackgroundRequest { + pub volume_construction_request: VolumeConstructionRequest, + pub job_id: String, +} + +/// Construct a volume from a VolumeConstructionRequest, storing the result in +/// the Pantry. Activate in a separate job so as not to block the request. +#[endpoint { + method = POST, + path = "/crucible/pantry/0/volume/{id}/background", +}] +async fn attach_activate_background( + rc: RequestContext>, + path: TypedPath, + body: TypedBody, +) -> Result { + let path = path.into_inner(); + let body = body.into_inner(); + let pantry = rc.context(); + + pantry + .attach_activate_background( + path.id.clone(), + body.job_id, + body.volume_construction_request, + ) + .await?; + + Ok(HttpResponseUpdatedNoContent()) +} + #[derive(Deserialize, JsonSchema)] struct JobPath { pub id: String, diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 5bbafa2ac3..589ba87700 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -9,6 +9,8 @@ //! through Nexus' external API. use crate::sim::http_entrypoints_pantry::ExpectedDigest; +use crate::sim::http_entrypoints_pantry::PantryStatus; +use crate::sim::http_entrypoints_pantry::VolumeStatus; use crate::sim::SledAgent; use anyhow::{self, bail, Result}; use chrono::prelude::*; @@ -1152,10 +1154,17 @@ impl Storage { } } +pub struct PantryVolume { + vcr: VolumeConstructionRequest, // Please rewind! + status: VolumeStatus, + activate_job: Option, +} + /// Simulated crucible pantry pub struct Pantry { pub id: OmicronZoneUuid, - vcrs: Mutex>, // Please rewind! + /// Map Volume UUID to PantryVolume struct + volumes: Mutex>, sled_agent: Arc, jobs: Mutex>, } @@ -1164,19 +1173,26 @@ impl Pantry { pub fn new(sled_agent: Arc) -> Self { Self { id: OmicronZoneUuid::new_v4(), - vcrs: Mutex::new(HashMap::default()), + volumes: Mutex::new(HashMap::default()), sled_agent, jobs: Mutex::new(HashSet::default()), } } + pub async fn status(&self) -> Result { + Ok(PantryStatus { + volumes: self.volumes.lock().await.keys().cloned().collect(), + num_job_handles: self.jobs.lock().await.len(), + }) + } + pub async fn entry( &self, volume_id: String, ) -> Result { - let vcrs = self.vcrs.lock().await; - match vcrs.get(&volume_id) { - Some(entry) => Ok(entry.clone()), + let volumes = self.volumes.lock().await; + match volumes.get(&volume_id) { + Some(entry) => Ok(entry.vcr.clone()), None => Err(HttpError::for_not_found(None, volume_id)), } @@ -1187,11 +1203,100 @@ impl Pantry { volume_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<()> { - let mut vcrs = self.vcrs.lock().await; - vcrs.insert(volume_id, volume_construction_request); + let mut volumes = self.volumes.lock().await; + + volumes.insert( + volume_id, + PantryVolume { + vcr: volume_construction_request, + status: VolumeStatus { + active: true, + seen_active: true, + num_job_handles: 0, + }, + activate_job: None, + }, + ); + Ok(()) } + pub async fn attach_activate_background( + &self, + volume_id: String, + activate_job_id: String, + volume_construction_request: VolumeConstructionRequest, + ) -> Result<(), HttpError> { + let mut volumes = self.volumes.lock().await; + let mut jobs = self.jobs.lock().await; + + volumes.insert( + volume_id, + PantryVolume { + vcr: volume_construction_request, + status: VolumeStatus { + active: false, + seen_active: false, + num_job_handles: 1, + }, + activate_job: Some(activate_job_id.clone()), + }, + ); + + jobs.insert(activate_job_id); + + Ok(()) + } + + pub async fn activate_background_attachment( + &self, + volume_id: String, + ) -> Result { + let activate_job = { + let volumes = self.volumes.lock().await; + volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() + }; + + let mut status = self.volume_status(volume_id.clone()).await?; + + status.active = true; + status.seen_active = true; + + self.update_volume_status(volume_id, status).await?; + + Ok(activate_job) + } + + pub async fn volume_status( + &self, + volume_id: String, + ) -> Result { + let volumes = self.volumes.lock().await; + + match volumes.get(&volume_id) { + Some(pantry_volume) => Ok(pantry_volume.status.clone()), + + None => Err(HttpError::for_not_found(None, volume_id)), + } + } + + pub async fn update_volume_status( + &self, + volume_id: String, + status: VolumeStatus, + ) -> Result<(), HttpError> { + let mut volumes = self.volumes.lock().await; + + match volumes.get_mut(&volume_id) { + Some(pantry_volume) => { + pantry_volume.status = status; + Ok(()) + } + + None => Err(HttpError::for_not_found(None, volume_id)), + } + } + pub async fn is_job_finished( &self, job_id: String, @@ -1240,11 +1345,11 @@ impl Pantry { // the simulated instance ensure, then call // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the // same. - let vcrs = self.vcrs.lock().await; - let volume_construction_request = vcrs.get(&volume_id).unwrap(); + let volumes = self.volumes.lock().await; + let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr; self.sled_agent - .map_disk_ids_to_region_ids(&volume_construction_request) + .map_disk_ids_to_region_ids(volume_construction_request) .await?; self.sled_agent @@ -1329,8 +1434,8 @@ impl Pantry { } pub async fn detach(&self, volume_id: String) -> Result<()> { - let mut vcrs = self.vcrs.lock().await; - vcrs.remove(&volume_id); + let mut volumes = self.volumes.lock().await; + volumes.remove(&volume_id); Ok(()) } }