From 4c83c6de42a0cacd814c29bad902bd53a36da972 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Sat, 21 Sep 2024 01:05:27 +0000 Subject: [PATCH] Fix for deleted volumes during region replacement Volumes can be deleted at any time, but the tasks and sagas that perform region replacement did not account for this. This commit adds checks in a few places for if a volume is soft-deleted or hard-deleted, and bails out of any affected region replacement accordingly: - If the replacement request is in the Requested state and the volume was seen to be soft-deleted or hard-deleted in the "region replacement" background task, then transition the region replacement request to Complete - If the replacement request is in the Running state, and the volume was seen to be soft-deleted or hard-deleted in the region replacement drive saga, then skip any operations on that volume in that saga and allow that saga to transition the region replacement request to ReplacementDone. Later the rest of the region replacement machinery will transition the request to Complete and clean up resources as appropriate. Testing this required fleshing out the simulated Crucible Pantry with support for the new endpoints that the region replacement drive saga queries. Full parity is left for future work, and the endpoints required were left in but commented out. This commit was peeled off work in progress to address #6353. --- dev-tools/omdb/src/bin/omdb/nexus.rs | 8 + nexus/db-model/src/region_replacement.rs | 7 + .../src/db/datastore/region_replacement.rs | 71 ++- nexus/db-queries/src/db/datastore/volume.rs | 8 + .../background/tasks/region_replacement.rs | 71 +++ .../src/app/sagas/region_replacement_drive.rs | 26 + nexus/test-utils/src/background.rs | 196 ++++++ .../crucible_replacements.rs | 577 ++++++++++++++++++ nexus/tests/integration_tests/mod.rs | 1 + nexus/types/src/internal_api/background.rs | 1 + sled-agent/src/sim/http_entrypoints_pantry.rs | 91 +++ sled-agent/src/sim/storage.rs | 129 +++- 12 files changed, 1173 insertions(+), 13 deletions(-) create mode 100644 nexus/tests/integration_tests/crucible_replacements.rs diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 193a205a96..136f1b6f62 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -1129,6 +1129,14 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { println!(" > {line}"); } + println!( + " region replacement requests set to completed ok: {}", + status.requests_completed_ok.len() + ); + for line in &status.requests_completed_ok { + println!(" > {line}"); + } + println!(" errors: {}", status.errors.len()); for line in &status.errors { println!(" > {line}"); diff --git a/nexus/db-model/src/region_replacement.rs b/nexus/db-model/src/region_replacement.rs index 995c55001c..57ead4b68e 100644 --- a/nexus/db-model/src/region_replacement.rs +++ b/nexus/db-model/src/region_replacement.rs @@ -116,6 +116,13 @@ impl std::str::FromStr for RegionReplacementState { /// "finish" notification is seen by the region replacement drive background /// task. This check is done before invoking the region replacement drive saga. /// +/// If the volume whose region is being replaced is soft-deleted or +/// hard-deleted, then the replacement request will be transitioned along the +/// states to Complete while avoiding operations that are meant to operate on +/// that volume. If the volume is soft-deleted or hard-deleted while the +/// replacement request is in the "Requested" state, the replacement request +/// will transition straight to Complete, and no operations will be performed. +/// /// See also: RegionReplacementStep records #[derive( Queryable, diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs index 62598d710e..b9b5a0827f 100644 --- a/nexus/db-queries/src/db/datastore/region_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -657,7 +657,7 @@ impl DataStore { /// Transition a RegionReplacement record from Completing to Complete, /// clearing the operating saga id. Also removes the `volume_repair` record - /// that is taking a "lock" on the Volume. + /// that is taking a lock on the Volume. pub async fn set_region_replacement_complete( &self, opctx: &OpContext, @@ -723,6 +723,75 @@ impl DataStore { }) } + /// Transition a RegionReplacement record from Requested to Complete, which + /// occurs when the associated volume is soft or hard deleted. Also removes + /// the `volume_repair` record that is taking a lock on the Volume. + pub async fn set_region_replacement_complete_from_requested( + &self, + opctx: &OpContext, + request: RegionReplacement, + ) -> Result<(), Error> { + type TxnError = TransactionError; + + assert_eq!( + request.replacement_state, + RegionReplacementState::Requested, + ); + + self.pool_connection_authorized(opctx) + .await? + .transaction_async(|conn| async move { + Self::volume_repair_delete_query( + request.volume_id, + request.id, + ) + .execute_async(&conn) + .await?; + + use db::schema::region_replacement::dsl; + + let result = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(request.id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Requested), + ) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Complete), + )) + .check_if_exists::(request.id) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.replacement_state == RegionReplacementState::Complete { + Ok(()) + } else { + Err(TxnError::CustomError(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + request.id, + record.replacement_state, + record.operating_saga_id, + )))) + } + } + } + }) + .await + .map_err(|e| match e { + TxnError::CustomError(error) => error, + + TxnError::Database(error) => { + public_error_from_diesel(error, ErrorHandler::Server) + } + }) + } + /// Nexus has been notified by an Upstairs (or has otherwised determined) /// that a region replacement is done, so update the record. Filter on the /// following: diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index c643b86d24..b000325954 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1572,6 +1572,14 @@ impl DataStore { .optional() .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Return true if a volume was soft-deleted or hard-deleted + pub async fn volume_deleted(&self, volume_id: Uuid) -> Result { + match self.volume_get(volume_id).await? { + Some(v) => Ok(v.time_deleted.is_some()), + None => Ok(true), + } + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)] diff --git a/nexus/src/app/background/tasks/region_replacement.rs b/nexus/src/app/background/tasks/region_replacement.rs index b1c717c77b..d5013db134 100644 --- a/nexus/src/app/background/tasks/region_replacement.rs +++ b/nexus/src/app/background/tasks/region_replacement.rs @@ -170,8 +170,79 @@ impl BackgroundTask for RegionReplacementDetector { }; for request in requests { + // If the replacement request is in the `requested` state and + // the request's volume was soft-deleted or hard-deleted, avoid + // sending the start request and instead transition the request + // to completed + + let volume_deleted = match self + .datastore + .volume_deleted(request.volume_id) + .await + { + Ok(volume_deleted) => volume_deleted, + + Err(e) => { + let s = format!( + "error checking if volume id {} was deleted: {e}", + request.volume_id, + ); + error!(&log, "{s}"); + + status.errors.push(s); + continue; + } + }; + let request_id = request.id; + if volume_deleted { + // Volume was soft or hard deleted, so proceed with clean + // up, which if this is in state Requested there won't be + // any additional associated state, so transition the record + // to Completed. + + info!( + &log, + "request {} volume {} was soft or hard deleted!", + request_id, + request.volume_id, + ); + + let result = self + .datastore + .set_region_replacement_complete_from_requested( + opctx, request, + ) + .await; + + match result { + Ok(()) => { + let s = format!( + "request {} transitioned from requested to \ + complete", + request_id, + ); + + info!(&log, "{s}"); + status.requests_completed_ok.push(s); + } + + Err(e) => { + let s = format!( + "error transitioning {} from requested to \ + complete: {e}", + request_id, + ); + + error!(&log, "{s}"); + status.errors.push(s); + } + } + + continue; + } + let result = self .send_start_request( authn::saga::Serialized::for_opctx(opctx), diff --git a/nexus/src/app/sagas/region_replacement_drive.rs b/nexus/src/app/sagas/region_replacement_drive.rs index d95591848e..18c675c449 100644 --- a/nexus/src/app/sagas/region_replacement_drive.rs +++ b/nexus/src/app/sagas/region_replacement_drive.rs @@ -309,6 +309,32 @@ async fn srrd_drive_region_replacement_check( ¶ms.serialized_authn, ); + // It doesn't make sense to perform any of this saga if the volume was soft + // or hard deleted: for example, this happens if the higher level resource + // like the disk was deleted. Volume deletion potentially results in the + // clean-up of Crucible resources, so it wouldn't even be valid to attempt + // to drive forward any type of live repair or reconciliation. + // + // Setting Done here will cause this saga to transition the replacement + // request to ReplacementDone. + + let volume_deleted = osagactx + .datastore() + .volume_deleted(params.request.volume_id) + .await + .map_err(ActionError::action_failed)?; + + if volume_deleted { + info!( + log, + "volume was soft or hard deleted!"; + "region replacement id" => %params.request.id, + "volume id" => %params.request.volume_id, + ); + + return Ok(DriveCheck::Done); + } + let last_request_step = osagactx .datastore() .current_region_replacement_request_step(&opctx, params.request.id) diff --git a/nexus/test-utils/src/background.rs b/nexus/test-utils/src/background.rs index 58792e547d..5a7851f103 100644 --- a/nexus/test-utils/src/background.rs +++ b/nexus/test-utils/src/background.rs @@ -11,6 +11,7 @@ use nexus_client::types::CurrentStatus; use nexus_client::types::CurrentStatusRunning; use nexus_client::types::LastResult; use nexus_client::types::LastResultCompleted; +use nexus_types::internal_api::background::*; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use std::time::Duration; @@ -84,3 +85,198 @@ pub async fn activate_background_task( last_task_poll } + +/// Run the region_replacement background task, returning how many actions +/// were taken +pub async fn run_region_replacement( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = + activate_background_task(&internal_client, "region_replacement").await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.requests_created_ok.len() + + status.start_invoked_ok.len() + + status.requests_completed_ok.len() +} + +/// Run the region_replacement_driver background task, returning how many actions +/// were taken +pub async fn run_region_replacement_driver( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = + activate_background_task(&internal_client, "region_replacement_driver") + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.drive_invoked_ok.len() + status.finish_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_start background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_start( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_start", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = + serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.requests_created_ok.len() + status.start_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_garbage_collection background task, +/// returning how many actions were taken +pub async fn run_region_snapshot_replacement_garbage_collection( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_garbage_collection", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::< + RegionSnapshotReplacementGarbageCollectStatus, + >(last_result_completed.details) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.garbage_collect_requested.len() +} + +/// Run the region_snapshot_replacement_step background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_step( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_step", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + eprintln!("{:?}", &status.errors); + + assert!(status.errors.is_empty()); + + status.step_records_created_ok.len() + + status.step_garbage_collect_invoked_ok.len() + + status.step_invoked_ok.len() +} + +/// Run the region_snapshot_replacement_finish background task, returning how many +/// actions were taken +pub async fn run_region_snapshot_replacement_finish( + internal_client: &ClientTestContext, +) -> usize { + let last_background_task = activate_background_task( + &internal_client, + "region_snapshot_replacement_finish", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!(); + }; + + let status = + serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); + + status.records_set_to_done.len() +} + +/// Run all replacement related background tasks until they aren't doing +/// anything anymore. +pub async fn run_replacement_tasks_to_completion( + internal_client: &ClientTestContext, +) { + wait_for_condition( + || async { + let actions_taken = + // region replacement related + run_region_replacement(internal_client).await + + run_region_replacement_driver(internal_client).await + + // region snapshot replacement related + run_region_snapshot_replacement_start(internal_client).await + + run_region_snapshot_replacement_garbage_collection(internal_client).await + + run_region_snapshot_replacement_step(internal_client).await + + run_region_snapshot_replacement_finish(internal_client).await; + + if actions_taken > 0 { + Err(CondCheckError::<()>::NotYet) + } else { + Ok(()) + } + }, + &Duration::from_secs(1), + &Duration::from_secs(10), + ) + .await + .unwrap(); +} diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs new file mode 100644 index 0000000000..16ce892528 --- /dev/null +++ b/nexus/tests/integration_tests/crucible_replacements.rs @@ -0,0 +1,577 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Tests related to region and region snapshot replacement + +use dropshot::test_util::ClientTestContext; +use nexus_db_model::PhysicalDiskPolicy; +use nexus_db_model::RegionReplacementState; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::lookup::LookupPath; +use nexus_test_utils::background::*; +use nexus_test_utils::http_testing::AuthnMode; +use nexus_test_utils::http_testing::NexusRequest; +use nexus_test_utils::resource_helpers::create_default_ip_pool; +use nexus_test_utils::resource_helpers::create_disk; +use nexus_test_utils::resource_helpers::create_project; +use nexus_test_utils_macros::nexus_test; +use omicron_uuid_kinds::GenericUuid; +use uuid::Uuid; + +type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; +type DiskTestBuilder<'a> = nexus_test_utils::resource_helpers::DiskTestBuilder< + 'a, + omicron_nexus::Server, +>; + +const PROJECT_NAME: &str = "now-this-is-pod-racing"; + +fn get_disk_url(disk_name: &str) -> String { + format!("/v1/disks/{disk_name}?project={}", PROJECT_NAME) +} + +async fn create_project_and_pool(client: &ClientTestContext) -> Uuid { + create_default_ip_pool(client).await; + let project = create_project(client, PROJECT_NAME).await; + project.identity.id +} + +/// Assert that the first part of region replacement does not create a freed +/// crucible region (that would be picked up by a volume delete saga) +#[nexus_test] +async fn test_region_replacement_does_not_create_freed_region( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Before expunging the physical disk, save the DB model + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + // Next, expunge a physical disk that contains a region + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (dataset, _) = &disk_allocated_regions[0]; + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id, + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, run the first part of region replacement: this will move the deleted + // region into a temporary volume. + + let internal_client = &cptestctx.internal_client; + + let _ = + activate_background_task(&internal_client, "region_replacement").await; + + // Assert there are no freed crucible regions that result from that + assert!(datastore.find_deleted_volume_regions().await.unwrap().is_empty()); +} + +/// Assert that a region replacement request in state "Requested" can have its +/// volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_requested( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "Running" can have its +/// volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_running( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "Running" that has +/// additionally had its volume attached to a Pantry can have its volume deleted +/// and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_running_on_pantry( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Run the "region replacement driver" task to attach the associated volume + // to the simulated pantry + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + let most_recent_step = datastore + .current_region_replacement_request_step(&opctx, region_replacement.id) + .await + .unwrap() + .unwrap(); + + assert!(most_recent_step.pantry_address().is_some()); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} + +/// Assert that a region replacement request in state "ReplacementDone" can have +/// its volume deleted and still transition to Complete +#[nexus_test] +async fn test_delete_volume_region_replacement_state_replacement_done( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + + // Manually create the region replacement request + + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (_, region) = &disk_allocated_regions[0]; + + let replacement_request_id = datastore + .create_region_replacement_request_for_region(&opctx, ®ion) + .await + .unwrap(); + + // Assert the request is in state Requested + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Requested, + ); + + // Run the "region replacement" task to transition the request to Running. + + run_region_replacement(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + // Run the "region replacement driver" task to attach the associated volume + // to the simulated pantry + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Running, + ); + + let most_recent_step = datastore + .current_region_replacement_request_step(&opctx, region_replacement.id) + .await + .unwrap() + .unwrap(); + + assert!(most_recent_step.pantry_address().is_some()); + + // Manually activate the background attachment + + let pantry = &cptestctx.sled_agent.pantry_server.as_ref().unwrap().pantry; + pantry + .activate_background_attachment( + region_replacement.volume_id.to_string(), + ) + .await + .unwrap(); + + // Run the "region replacement driver" task again, this time it should + // transition the request to ReplacementDone + + run_region_replacement_driver(&internal_client).await; + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::ReplacementDone, + ); + + // Delete the disk + + let disk_url = get_disk_url("disk"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // Make sure that all the background tasks can run to completion. + + run_replacement_tasks_to_completion(&internal_client).await; + + // Assert the request is in state Complete + + let region_replacement = datastore + .get_region_replacement_request_by_id(&opctx, replacement_request_id) + .await + .unwrap(); + + assert_eq!( + region_replacement.replacement_state, + RegionReplacementState::Complete, + ); + + // Assert there are no more Crucible resources + + assert!(disk_test.crucible_resources_deleted().await); +} diff --git a/nexus/tests/integration_tests/mod.rs b/nexus/tests/integration_tests/mod.rs index fdf14dbd07..f18accb132 100644 --- a/nexus/tests/integration_tests/mod.rs +++ b/nexus/tests/integration_tests/mod.rs @@ -11,6 +11,7 @@ mod basic; mod certificates; mod commands; mod console_api; +mod crucible_replacements; mod demo_saga; mod device_auth; mod disks; diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index dbd8ad1d8e..92e78107bf 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -12,6 +12,7 @@ use uuid::Uuid; pub struct RegionReplacementStatus { pub requests_created_ok: Vec, pub start_invoked_ok: Vec, + pub requests_completed_ok: Vec, pub errors: Vec, } diff --git a/sled-agent/src/sim/http_entrypoints_pantry.rs b/sled-agent/src/sim/http_entrypoints_pantry.rs index a93cb6fca9..148a6c882a 100644 --- a/sled-agent/src/sim/http_entrypoints_pantry.rs +++ b/sled-agent/src/sim/http_entrypoints_pantry.rs @@ -23,13 +23,19 @@ pub fn api() -> CruciblePantryApiDescription { fn register_endpoints( api: &mut CruciblePantryApiDescription, ) -> Result<(), ApiDescriptionRegisterError> { + api.register(pantry_status)?; + api.register(volume_status)?; api.register(attach)?; + api.register(attach_activate_background)?; + // api.register(replace)?; api.register(is_job_finished)?; api.register(job_result_ok)?; api.register(import_from_url)?; api.register(snapshot)?; api.register(bulk_write)?; + // api.register(bulk_read)?; api.register(scrub)?; + // api.register(validate)?; api.register(detach)?; Ok(()) @@ -46,11 +52,64 @@ pub fn api() -> CruciblePantryApiDescription { // pantry here, to avoid skew. However, this was wholesale copied from the // crucible repo! +#[derive(Serialize, JsonSchema)] +pub struct PantryStatus { + /// Which volumes does this Pantry know about? Note this may include volumes + /// that are no longer active, and haven't been garbage collected yet. + pub volumes: Vec, + + /// How many job handles? + pub num_job_handles: usize, +} + +/// Get the Pantry's status +#[endpoint { + method = GET, + path = "/crucible/pantry/0", +}] +async fn pantry_status( + rc: RequestContext>, +) -> Result, HttpError> { + let pantry = rc.context(); + + let status = pantry.status().await?; + + Ok(HttpResponseOk(status)) +} + #[derive(Deserialize, JsonSchema)] struct VolumePath { pub id: String, } +#[derive(Clone, Deserialize, Serialize, JsonSchema)] +pub struct VolumeStatus { + /// Is the Volume currently active? + pub active: bool, + + /// Has the Pantry ever seen this Volume active? + pub seen_active: bool, + + /// How many job handles are there for this Volume? + pub num_job_handles: usize, +} + +/// Get a current Volume's status +#[endpoint { + method = GET, + path = "/crucible/pantry/0/volume/{id}", +}] +async fn volume_status( + rc: RequestContext>, + path: TypedPath, +) -> Result, HttpError> { + let path = path.into_inner(); + let pantry = rc.context(); + + let status = pantry.volume_status(path.id.clone()).await?; + Ok(HttpResponseOk(status)) +} + #[derive(Deserialize, JsonSchema)] struct AttachRequest { pub volume_construction_request: VolumeConstructionRequest, @@ -84,6 +143,38 @@ async fn attach( Ok(HttpResponseOk(AttachResult { id: path.id })) } +#[derive(Deserialize, JsonSchema)] +struct AttachBackgroundRequest { + pub volume_construction_request: VolumeConstructionRequest, + pub job_id: String, +} + +/// Construct a volume from a VolumeConstructionRequest, storing the result in +/// the Pantry. Activate in a separate job so as not to block the request. +#[endpoint { + method = POST, + path = "/crucible/pantry/0/volume/{id}/background", +}] +async fn attach_activate_background( + rc: RequestContext>, + path: TypedPath, + body: TypedBody, +) -> Result { + let path = path.into_inner(); + let body = body.into_inner(); + let pantry = rc.context(); + + pantry + .attach_activate_background( + path.id.clone(), + body.job_id, + body.volume_construction_request, + ) + .await?; + + Ok(HttpResponseUpdatedNoContent()) +} + #[derive(Deserialize, JsonSchema)] struct JobPath { pub id: String, diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 5bbafa2ac3..589ba87700 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -9,6 +9,8 @@ //! through Nexus' external API. use crate::sim::http_entrypoints_pantry::ExpectedDigest; +use crate::sim::http_entrypoints_pantry::PantryStatus; +use crate::sim::http_entrypoints_pantry::VolumeStatus; use crate::sim::SledAgent; use anyhow::{self, bail, Result}; use chrono::prelude::*; @@ -1152,10 +1154,17 @@ impl Storage { } } +pub struct PantryVolume { + vcr: VolumeConstructionRequest, // Please rewind! + status: VolumeStatus, + activate_job: Option, +} + /// Simulated crucible pantry pub struct Pantry { pub id: OmicronZoneUuid, - vcrs: Mutex>, // Please rewind! + /// Map Volume UUID to PantryVolume struct + volumes: Mutex>, sled_agent: Arc, jobs: Mutex>, } @@ -1164,19 +1173,26 @@ impl Pantry { pub fn new(sled_agent: Arc) -> Self { Self { id: OmicronZoneUuid::new_v4(), - vcrs: Mutex::new(HashMap::default()), + volumes: Mutex::new(HashMap::default()), sled_agent, jobs: Mutex::new(HashSet::default()), } } + pub async fn status(&self) -> Result { + Ok(PantryStatus { + volumes: self.volumes.lock().await.keys().cloned().collect(), + num_job_handles: self.jobs.lock().await.len(), + }) + } + pub async fn entry( &self, volume_id: String, ) -> Result { - let vcrs = self.vcrs.lock().await; - match vcrs.get(&volume_id) { - Some(entry) => Ok(entry.clone()), + let volumes = self.volumes.lock().await; + match volumes.get(&volume_id) { + Some(entry) => Ok(entry.vcr.clone()), None => Err(HttpError::for_not_found(None, volume_id)), } @@ -1187,11 +1203,100 @@ impl Pantry { volume_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<()> { - let mut vcrs = self.vcrs.lock().await; - vcrs.insert(volume_id, volume_construction_request); + let mut volumes = self.volumes.lock().await; + + volumes.insert( + volume_id, + PantryVolume { + vcr: volume_construction_request, + status: VolumeStatus { + active: true, + seen_active: true, + num_job_handles: 0, + }, + activate_job: None, + }, + ); + Ok(()) } + pub async fn attach_activate_background( + &self, + volume_id: String, + activate_job_id: String, + volume_construction_request: VolumeConstructionRequest, + ) -> Result<(), HttpError> { + let mut volumes = self.volumes.lock().await; + let mut jobs = self.jobs.lock().await; + + volumes.insert( + volume_id, + PantryVolume { + vcr: volume_construction_request, + status: VolumeStatus { + active: false, + seen_active: false, + num_job_handles: 1, + }, + activate_job: Some(activate_job_id.clone()), + }, + ); + + jobs.insert(activate_job_id); + + Ok(()) + } + + pub async fn activate_background_attachment( + &self, + volume_id: String, + ) -> Result { + let activate_job = { + let volumes = self.volumes.lock().await; + volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() + }; + + let mut status = self.volume_status(volume_id.clone()).await?; + + status.active = true; + status.seen_active = true; + + self.update_volume_status(volume_id, status).await?; + + Ok(activate_job) + } + + pub async fn volume_status( + &self, + volume_id: String, + ) -> Result { + let volumes = self.volumes.lock().await; + + match volumes.get(&volume_id) { + Some(pantry_volume) => Ok(pantry_volume.status.clone()), + + None => Err(HttpError::for_not_found(None, volume_id)), + } + } + + pub async fn update_volume_status( + &self, + volume_id: String, + status: VolumeStatus, + ) -> Result<(), HttpError> { + let mut volumes = self.volumes.lock().await; + + match volumes.get_mut(&volume_id) { + Some(pantry_volume) => { + pantry_volume.status = status; + Ok(()) + } + + None => Err(HttpError::for_not_found(None, volume_id)), + } + } + pub async fn is_job_finished( &self, job_id: String, @@ -1240,11 +1345,11 @@ impl Pantry { // the simulated instance ensure, then call // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the // same. - let vcrs = self.vcrs.lock().await; - let volume_construction_request = vcrs.get(&volume_id).unwrap(); + let volumes = self.volumes.lock().await; + let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr; self.sled_agent - .map_disk_ids_to_region_ids(&volume_construction_request) + .map_disk_ids_to_region_ids(volume_construction_request) .await?; self.sled_agent @@ -1329,8 +1434,8 @@ impl Pantry { } pub async fn detach(&self, volume_id: String) -> Result<()> { - let mut vcrs = self.vcrs.lock().await; - vcrs.remove(&volume_id); + let mut volumes = self.volumes.lock().await; + volumes.remove(&volume_id); Ok(()) } }