From 74a4c614ef9df0a923f7973629d258a7aa5fdd8e Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Tue, 2 Jul 2024 13:18:15 -0700 Subject: [PATCH] fix tests --- nexus/src/app/background/init.rs | 32 + .../background/tasks/region_replacement.rs | 12 +- .../tasks/region_replacement_driver.rs | 941 +++++++++--------- nexus/src/app/mod.rs | 8 - nexus/src/app/saga.rs | 3 - 5 files changed, 492 insertions(+), 504 deletions(-) diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index f12ca9360e..de2136db0c 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -729,7 +729,9 @@ fn init_dns( #[cfg(test)] pub mod test { + use crate::app::saga::SagaStarter; use dropshot::HandlerTaskMode; + use futures::FutureExt; use nexus_db_model::DnsGroup; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::datastore::DnsVersionUpdateBuilder; @@ -738,9 +740,39 @@ pub mod test { use nexus_types::internal_api::params as nexus_params; use omicron_test_utils::dev::poll; use std::net::SocketAddr; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; use std::time::Duration; use tempfile::TempDir; + /// Used by various tests of tasks that kick off sagas + pub(crate) struct NoopSagaStarter { + count: AtomicU64, + } + + impl NoopSagaStarter { + pub(crate) fn new() -> Self { + Self { count: AtomicU64::new(0) } + } + + pub(crate) fn count_reset(&self) -> u64 { + self.count.swap(0, Ordering::SeqCst) + } + } + + impl SagaStarter for NoopSagaStarter { + fn saga_start( + &self, + _: steno::SagaDag, + ) -> futures::prelude::future::BoxFuture< + '_, + Result<(), omicron_common::api::external::Error>, + > { + let _ = self.count.fetch_add(1, Ordering::SeqCst); + async { Ok(()) }.boxed() + } + } + type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; diff --git a/nexus/src/app/background/tasks/region_replacement.rs b/nexus/src/app/background/tasks/region_replacement.rs index 719e622311..11aa303b10 100644 --- a/nexus/src/app/background/tasks/region_replacement.rs +++ b/nexus/src/app/background/tasks/region_replacement.rs @@ -198,6 +198,7 @@ impl BackgroundTask for RegionReplacementDetector { #[cfg(test)] mod test { use super::*; + use crate::app::background::init::test::NoopSagaStarter; use nexus_db_model::RegionReplacement; use nexus_test_utils_macros::nexus_test; use uuid::Uuid; @@ -216,10 +217,9 @@ mod test { datastore.clone(), ); - let mut task = RegionReplacementDetector::new( - datastore.clone(), - nexus.saga_starter(), - ); + let starter = Arc::new(NoopSagaStarter::new()); + let mut task = + RegionReplacementDetector::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -250,8 +250,6 @@ mod test { }) ); - // XXX-dap - // saga_request_rx.try_recv().unwrap(); - todo!(); + assert_eq!(starter.count_reset(), 1); } } diff --git a/nexus/src/app/background/tasks/region_replacement_driver.rs b/nexus/src/app/background/tasks/region_replacement_driver.rs index 6345beb7cc..284344370a 100644 --- a/nexus/src/app/background/tasks/region_replacement_driver.rs +++ b/nexus/src/app/background/tasks/region_replacement_driver.rs @@ -131,7 +131,7 @@ impl RegionReplacementDriver { .await; match result { Ok(_) => { - let s = format!("{request_id}: drive invoked ok"); + let s = format!("{request_id}: drive saga started ok"); info!(&log, "{s}"); status.drive_invoked_ok.push(s); } @@ -243,488 +243,457 @@ impl BackgroundTask for RegionReplacementDriver { } } -// XXX-dap -// #[cfg(test)] -// mod test { -// use super::*; -// use async_bb8_diesel::AsyncRunQueryDsl; -// use chrono::Utc; -// use nexus_db_model::Region; -// use nexus_db_model::RegionReplacement; -// use nexus_db_model::RegionReplacementState; -// use nexus_db_model::UpstairsRepairNotification; -// use nexus_db_model::UpstairsRepairNotificationType; -// use nexus_db_model::UpstairsRepairType; -// use nexus_test_utils_macros::nexus_test; -// use omicron_uuid_kinds::DownstairsRegionKind; -// use omicron_uuid_kinds::GenericUuid; -// use omicron_uuid_kinds::TypedUuid; -// use omicron_uuid_kinds::UpstairsKind; -// use omicron_uuid_kinds::UpstairsRepairKind; -// use omicron_uuid_kinds::UpstairsSessionKind; -// use tokio::sync::mpsc; -// use uuid::Uuid; -// -// type ControlPlaneTestContext = -// nexus_test_utils::ControlPlaneTestContext; -// -// #[nexus_test(server = crate::Server)] -// async fn test_running_region_replacement_causes_drive( -// cptestctx: &ControlPlaneTestContext, -// ) { -// let nexus = &cptestctx.server.server_context().nexus; -// let datastore = nexus.datastore(); -// let opctx = OpContext::for_tests( -// cptestctx.logctx.log.clone(), -// datastore.clone(), -// ); -// -// let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); -// let mut task = -// RegionReplacementDriver::new(datastore.clone(), saga_request_tx); -// -// // Noop test -// let result = task.activate(&opctx).await; -// assert_eq!(result, json!(RegionReplacementDriverStatus::default())); -// -// // Add a region replacement request for a fake region, and change it to -// // state Running. -// let region_id = Uuid::new_v4(); -// let new_region_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// -// let request = { -// let mut request = RegionReplacement::new(region_id, volume_id); -// request.replacement_state = RegionReplacementState::Running; -// request.new_region_id = Some(new_region_id); -// request -// }; -// -// let request_id = request.id; -// -// datastore -// .insert_region_replacement_request(&opctx, request) -// .await -// .unwrap(); -// -// // Activate the task - it should pick that up and try to run the region -// // replacement drive saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert_eq!( -// result.drive_invoked_ok, -// vec![format!("{request_id}: drive invoked ok")] -// ); -// assert!(result.finish_invoked_ok.is_empty()); -// assert!(result.errors.is_empty()); -// -// let request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// request, -// sagas::SagaRequest::RegionReplacementDrive { .. } -// )); -// } -// -// #[nexus_test(server = crate::Server)] -// async fn test_done_region_replacement_causes_finish( -// cptestctx: &ControlPlaneTestContext, -// ) { -// let nexus = &cptestctx.server.server_context().nexus; -// let datastore = nexus.datastore(); -// let opctx = OpContext::for_tests( -// cptestctx.logctx.log.clone(), -// datastore.clone(), -// ); -// -// let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); -// let mut task = -// RegionReplacementDriver::new(datastore.clone(), saga_request_tx); -// -// // Noop test -// let result = task.activate(&opctx).await; -// assert_eq!(result, json!(RegionReplacementDriverStatus::default())); -// -// // Insert some region records -// let old_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// let new_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// { -// let conn = datastore.pool_connection_for_tests().await.unwrap(); -// -// use nexus_db_model::schema::region::dsl; -// diesel::insert_into(dsl::region) -// .values(old_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// -// diesel::insert_into(dsl::region) -// .values(new_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// } -// -// // Add a region replacement request for that region, and change it to -// // state ReplacementDone. Set the new_region_id to the region created -// // above. -// let request = { -// let mut request = -// RegionReplacement::new(old_region.id(), old_region.volume_id()); -// request.replacement_state = RegionReplacementState::ReplacementDone; -// request.new_region_id = Some(new_region.id()); -// request.old_region_volume_id = Some(Uuid::new_v4()); -// request -// }; -// -// let request_id = request.id; -// -// datastore -// .insert_region_replacement_request(&opctx, request) -// .await -// .unwrap(); -// -// // Activate the task - it should pick that up and try to run the region -// // replacement finish saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert!(result.drive_invoked_ok.is_empty()); -// assert_eq!( -// result.finish_invoked_ok, -// vec![format!("{request_id}: finish invoked ok")] -// ); -// assert!(result.errors.is_empty()); -// -// let request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// request, -// sagas::SagaRequest::RegionReplacementFinish { .. } -// )); -// } -// -// #[nexus_test(server = crate::Server)] -// async fn test_mark_region_replacement_done_after_notification( -// cptestctx: &ControlPlaneTestContext, -// ) { -// let nexus = &cptestctx.server.server_context().nexus; -// let datastore = nexus.datastore(); -// let opctx = OpContext::for_tests( -// cptestctx.logctx.log.clone(), -// datastore.clone(), -// ); -// -// let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); -// let mut task = -// RegionReplacementDriver::new(datastore.clone(), saga_request_tx); -// -// // Noop test -// let result = task.activate(&opctx).await; -// assert_eq!(result, json!(RegionReplacementDriverStatus::default())); -// -// // Insert some region records -// let old_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// let new_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// { -// let conn = datastore.pool_connection_for_tests().await.unwrap(); -// -// use nexus_db_model::schema::region::dsl; -// diesel::insert_into(dsl::region) -// .values(old_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// -// diesel::insert_into(dsl::region) -// .values(new_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// } -// -// // Add a region replacement request for that region, and change it to -// // state Running. Set the new_region_id to the region created above. -// let request = { -// let mut request = -// RegionReplacement::new(old_region.id(), old_region.volume_id()); -// request.replacement_state = RegionReplacementState::Running; -// request.new_region_id = Some(new_region.id()); -// request.old_region_volume_id = Some(Uuid::new_v4()); -// request -// }; -// -// let request_id = request.id; -// -// datastore -// .insert_region_replacement_request(&opctx, request.clone()) -// .await -// .unwrap(); -// -// // Activate the task - it should pick that up and try to run the region -// // replacement drive saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert_eq!( -// result.drive_invoked_ok, -// vec![format!("{request_id}: drive invoked ok")] -// ); -// assert!(result.finish_invoked_ok.is_empty()); -// assert!(result.errors.is_empty()); -// -// let saga_request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// saga_request, -// sagas::SagaRequest::RegionReplacementDrive { .. } -// )); -// -// // Now, pretend that an Upstairs sent a notification that it -// // successfully finished a repair -// -// { -// datastore -// .upstairs_repair_notification( -// &opctx, -// UpstairsRepairNotification::new( -// Utc::now(), // client time -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// UpstairsRepairType::Live, -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// TypedUuid::::from_untyped_uuid( -// new_region.id(), -// ), // downstairs that was repaired -// "[fd00:1122:3344:101::2]:12345".parse().unwrap(), -// UpstairsRepairNotificationType::Succeeded, -// ), -// ) -// .await -// .unwrap(); -// } -// -// // Activating the task now should -// // 1) switch the state to ReplacementDone -// // 2) start the finish saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert_eq!(result.finish_invoked_ok.len(), 1); -// -// { -// let request_in_db = datastore -// .get_region_replacement_request_by_id(&opctx, request.id) -// .await -// .unwrap(); -// assert_eq!( -// request_in_db.replacement_state, -// RegionReplacementState::ReplacementDone -// ); -// } -// -// let saga_request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// saga_request, -// sagas::SagaRequest::RegionReplacementFinish { .. } -// )); -// } -// -// #[nexus_test(server = crate::Server)] -// async fn test_no_mark_region_replacement_done_after_failed_notification( -// cptestctx: &ControlPlaneTestContext, -// ) { -// let nexus = &cptestctx.server.server_context().nexus; -// let datastore = nexus.datastore(); -// let opctx = OpContext::for_tests( -// cptestctx.logctx.log.clone(), -// datastore.clone(), -// ); -// -// let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); -// let mut task = -// RegionReplacementDriver::new(datastore.clone(), saga_request_tx); -// -// // Noop test -// let result = task.activate(&opctx).await; -// assert_eq!(result, json!(RegionReplacementDriverStatus::default())); -// -// // Insert some region records -// let old_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// let new_region = { -// let dataset_id = Uuid::new_v4(); -// let volume_id = Uuid::new_v4(); -// Region::new( -// dataset_id, -// volume_id, -// 512_i64.try_into().unwrap(), -// 10, -// 10, -// ) -// }; -// -// { -// let conn = datastore.pool_connection_for_tests().await.unwrap(); -// -// use nexus_db_model::schema::region::dsl; -// diesel::insert_into(dsl::region) -// .values(old_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// -// diesel::insert_into(dsl::region) -// .values(new_region.clone()) -// .execute_async(&*conn) -// .await -// .unwrap(); -// } -// -// // Add a region replacement request for that region, and change it to -// // state Running. Set the new_region_id to the region created above. -// let request = { -// let mut request = -// RegionReplacement::new(old_region.id(), old_region.volume_id()); -// request.replacement_state = RegionReplacementState::Running; -// request.new_region_id = Some(new_region.id()); -// request -// }; -// -// let request_id = request.id; -// -// datastore -// .insert_region_replacement_request(&opctx, request.clone()) -// .await -// .unwrap(); -// -// // Activate the task - it should pick that up and try to run the region -// // replacement drive saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert_eq!( -// result.drive_invoked_ok, -// vec![format!("{request_id}: drive invoked ok")] -// ); -// assert!(result.finish_invoked_ok.is_empty()); -// assert!(result.errors.is_empty()); -// -// let saga_request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// saga_request, -// sagas::SagaRequest::RegionReplacementDrive { .. } -// )); -// -// // Now, pretend that an Upstairs sent a notification that it failed to -// // finish a repair -// -// { -// datastore -// .upstairs_repair_notification( -// &opctx, -// UpstairsRepairNotification::new( -// Utc::now(), // client time -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// UpstairsRepairType::Live, -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// TypedUuid::::from_untyped_uuid( -// Uuid::new_v4(), -// ), -// TypedUuid::::from_untyped_uuid( -// new_region.id(), -// ), // downstairs that was repaired -// "[fd00:1122:3344:101::2]:12345".parse().unwrap(), -// UpstairsRepairNotificationType::Failed, -// ), -// ) -// .await -// .unwrap(); -// } -// -// // Activating the task now should start the drive saga -// let result: RegionReplacementDriverStatus = -// serde_json::from_value(task.activate(&opctx).await).unwrap(); -// -// assert_eq!( -// result.drive_invoked_ok, -// vec![format!("{request_id}: drive invoked ok")] -// ); -// assert!(result.finish_invoked_ok.is_empty()); -// assert!(result.errors.is_empty()); -// -// let saga_request = saga_request_rx.try_recv().unwrap(); -// -// assert!(matches!( -// saga_request, -// sagas::SagaRequest::RegionReplacementDrive { .. } -// )); -// } -// } +#[cfg(test)] +mod test { + use super::*; + use crate::app::background::init::test::NoopSagaStarter; + use async_bb8_diesel::AsyncRunQueryDsl; + use chrono::Utc; + use nexus_db_model::Region; + use nexus_db_model::RegionReplacement; + use nexus_db_model::RegionReplacementState; + use nexus_db_model::UpstairsRepairNotification; + use nexus_db_model::UpstairsRepairNotificationType; + use nexus_db_model::UpstairsRepairType; + use nexus_test_utils_macros::nexus_test; + use omicron_uuid_kinds::DownstairsRegionKind; + use omicron_uuid_kinds::GenericUuid; + use omicron_uuid_kinds::TypedUuid; + use omicron_uuid_kinds::UpstairsKind; + use omicron_uuid_kinds::UpstairsRepairKind; + use omicron_uuid_kinds::UpstairsSessionKind; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + #[nexus_test(server = crate::Server)] + async fn test_running_region_replacement_causes_drive( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopSagaStarter::new()); + let mut task = + RegionReplacementDriver::new(datastore.clone(), starter.clone()); + + // Noop test + let result = task.activate(&opctx).await; + assert_eq!(result, json!(RegionReplacementDriverStatus::default())); + + // Add a region replacement request for a fake region, and change it to + // state Running. + let region_id = Uuid::new_v4(); + let new_region_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + + let request = { + let mut request = RegionReplacement::new(region_id, volume_id); + request.replacement_state = RegionReplacementState::Running; + request.new_region_id = Some(new_region_id); + request + }; + + let request_id = request.id; + + datastore + .insert_region_replacement_request(&opctx, request) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // replacement drive saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result.drive_invoked_ok, + vec![format!("{request_id}: drive saga started ok")] + ); + assert!(result.finish_invoked_ok.is_empty()); + assert!(result.errors.is_empty()); + + assert_eq!(starter.count_reset(), 1); + } + + #[nexus_test(server = crate::Server)] + async fn test_done_region_replacement_causes_finish( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopSagaStarter::new()); + let mut task = + RegionReplacementDriver::new(datastore.clone(), starter.clone()); + + // Noop test + let result = task.activate(&opctx).await; + assert_eq!(result, json!(RegionReplacementDriverStatus::default())); + + // Insert some region records + let old_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + let new_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + use nexus_db_model::schema::region::dsl; + diesel::insert_into(dsl::region) + .values(old_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + + diesel::insert_into(dsl::region) + .values(new_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + } + + // Add a region replacement request for that region, and change it to + // state ReplacementDone. Set the new_region_id to the region created + // above. + let request = { + let mut request = + RegionReplacement::new(old_region.id(), old_region.volume_id()); + request.replacement_state = RegionReplacementState::ReplacementDone; + request.new_region_id = Some(new_region.id()); + request.old_region_volume_id = Some(Uuid::new_v4()); + request + }; + + let request_id = request.id; + + datastore + .insert_region_replacement_request(&opctx, request) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // replacement finish saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert!(result.drive_invoked_ok.is_empty()); + assert_eq!( + result.finish_invoked_ok, + vec![format!("{request_id}: finish saga started ok")] + ); + assert!(result.errors.is_empty()); + + assert_eq!(starter.count_reset(), 1); + } + + #[nexus_test(server = crate::Server)] + async fn test_mark_region_replacement_done_after_notification( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopSagaStarter::new()); + let mut task = + RegionReplacementDriver::new(datastore.clone(), starter.clone()); + + // Noop test + let result = task.activate(&opctx).await; + assert_eq!(result, json!(RegionReplacementDriverStatus::default())); + + // Insert some region records + let old_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + let new_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + use nexus_db_model::schema::region::dsl; + diesel::insert_into(dsl::region) + .values(old_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + + diesel::insert_into(dsl::region) + .values(new_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + } + + // Add a region replacement request for that region, and change it to + // state Running. Set the new_region_id to the region created above. + let request = { + let mut request = + RegionReplacement::new(old_region.id(), old_region.volume_id()); + request.replacement_state = RegionReplacementState::Running; + request.new_region_id = Some(new_region.id()); + request.old_region_volume_id = Some(Uuid::new_v4()); + request + }; + + let request_id = request.id; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // replacement drive saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result.drive_invoked_ok, + vec![format!("{request_id}: drive saga started ok")] + ); + assert!(result.finish_invoked_ok.is_empty()); + assert!(result.errors.is_empty()); + + assert_eq!(starter.count_reset(), 1); + + // Now, pretend that an Upstairs sent a notification that it + // successfully finished a repair + + { + datastore + .upstairs_repair_notification( + &opctx, + UpstairsRepairNotification::new( + Utc::now(), // client time + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + UpstairsRepairType::Live, + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + TypedUuid::::from_untyped_uuid( + new_region.id(), + ), // downstairs that was repaired + "[fd00:1122:3344:101::2]:12345".parse().unwrap(), + UpstairsRepairNotificationType::Succeeded, + ), + ) + .await + .unwrap(); + } + + // Activating the task now should + // 1) switch the state to ReplacementDone + // 2) start the finish saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!(result.finish_invoked_ok.len(), 1); + + { + let request_in_db = datastore + .get_region_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + assert_eq!( + request_in_db.replacement_state, + RegionReplacementState::ReplacementDone + ); + } + + assert_eq!(starter.count_reset(), 1); + } + + #[nexus_test(server = crate::Server)] + async fn test_no_mark_region_replacement_done_after_failed_notification( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopSagaStarter::new()); + let mut task = + RegionReplacementDriver::new(datastore.clone(), starter.clone()); + + // Noop test + let result = task.activate(&opctx).await; + assert_eq!(result, json!(RegionReplacementDriverStatus::default())); + + // Insert some region records + let old_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + let new_region = { + let dataset_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + Region::new( + dataset_id, + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ) + }; + + { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + use nexus_db_model::schema::region::dsl; + diesel::insert_into(dsl::region) + .values(old_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + + diesel::insert_into(dsl::region) + .values(new_region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + } + + // Add a region replacement request for that region, and change it to + // state Running. Set the new_region_id to the region created above. + let request = { + let mut request = + RegionReplacement::new(old_region.id(), old_region.volume_id()); + request.replacement_state = RegionReplacementState::Running; + request.new_region_id = Some(new_region.id()); + request + }; + + let request_id = request.id; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // replacement drive saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result.drive_invoked_ok, + vec![format!("{request_id}: drive saga started ok")] + ); + assert!(result.finish_invoked_ok.is_empty()); + assert!(result.errors.is_empty()); + + assert_eq!(starter.count_reset(), 1); + + // Now, pretend that an Upstairs sent a notification that it failed to + // finish a repair + + { + datastore + .upstairs_repair_notification( + &opctx, + UpstairsRepairNotification::new( + Utc::now(), // client time + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + UpstairsRepairType::Live, + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + TypedUuid::::from_untyped_uuid( + Uuid::new_v4(), + ), + TypedUuid::::from_untyped_uuid( + new_region.id(), + ), // downstairs that was repaired + "[fd00:1122:3344:101::2]:12345".parse().unwrap(), + UpstairsRepairNotificationType::Failed, + ), + ) + .await + .unwrap(); + } + + // Activating the task now should start the drive saga + let result: RegionReplacementDriverStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result.drive_invoked_ok, + vec![format!("{request_id}: drive saga started ok")] + ); + assert!(result.finish_invoked_ok.is_empty()); + assert!(result.errors.is_empty()); + + assert_eq!(starter.count_reset(), 1); + } +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 624e8c5d6d..841f64fa61 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -37,9 +37,6 @@ use std::sync::Arc; use std::sync::OnceLock; use uuid::Uuid; -#[cfg(test)] -use self::saga::SagaStarter; - // The implementation of Nexus is large, and split into a number of submodules // by resource. mod address_lot; @@ -548,11 +545,6 @@ impl Nexus { &self.authz } - #[cfg(test)] - pub(crate) fn saga_starter(&self) -> Arc { - self.sagas.clone() - } - pub(crate) async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { let mut my_rx = self.populate_status.clone(); loop { diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 1dc45d2931..840c95fc15 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -49,7 +49,6 @@ //! or inject errors. use super::sagas::NexusSaga; -use super::sagas::SagaInitError; use super::sagas::ACTION_REGISTRY; use crate::saga_interface::SagaContext; use crate::Nexus; @@ -67,10 +66,8 @@ use omicron_common::api::external::ResourceType; use omicron_common::bail_unless; use std::sync::Arc; use std::sync::OnceLock; -use steno::DagBuilder; use steno::SagaDag; use steno::SagaId; -use steno::SagaName; use steno::SagaResult; use steno::SagaResultOk; use uuid::Uuid;