diff --git a/nexus/src/app/background/driver.rs b/nexus/src/app/background/driver.rs index 8ea4459a8a..c93729a335 100644 --- a/nexus/src/app/background/driver.rs +++ b/nexus/src/app/background/driver.rs @@ -436,7 +436,6 @@ mod test { use super::Driver; use crate::app::background::driver::TaskDefinition; use crate::app::background::Activator; - use crate::app::sagas::SagaRequest; use assert_matches::assert_matches; use chrono::Utc; use futures::future::BoxFuture; @@ -448,7 +447,6 @@ mod test { use std::time::Instant; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; - use tokio::sync::mpsc::Sender; use tokio::sync::watch; type ControlPlaneTestContext = @@ -808,84 +806,4 @@ mod test { // such a task that would allow us to reliably distinguish between these // two without also spending a lot of wall-clock time on this test. } - - /// Simple BackgroundTask impl that sends a test-only SagaRequest - struct SagaRequestTask { - saga_request: Sender, - } - - impl SagaRequestTask { - fn new(saga_request: Sender) -> SagaRequestTask { - SagaRequestTask { saga_request } - } - } - - impl BackgroundTask for SagaRequestTask { - fn activate<'a>( - &'a mut self, - _: &'a OpContext, - ) -> BoxFuture<'a, serde_json::Value> { - async { - let _ = self.saga_request.send(SagaRequest::TestOnly).await; - serde_json::Value::Null - } - .boxed() - } - } - - #[nexus_test(server = crate::Server)] - async fn test_saga_request_flow(cptestctx: &ControlPlaneTestContext) { - let nexus = &cptestctx.server.server_context().nexus; - let datastore = nexus.datastore(); - let opctx = OpContext::for_tests( - cptestctx.logctx.log.clone(), - datastore.clone(), - ); - - let (saga_request, mut saga_request_recv) = SagaRequest::channel(); - let t1 = SagaRequestTask::new(saga_request); - - let mut driver = Driver::new(); - let (_dep_tx1, dep_rx1) = watch::channel(0); - let act1 = Activator::new(); - - let h1 = driver.register(TaskDefinition { - name: "t1", - description: "test saga request flow task", - period: Duration::from_secs(300), // should not fire in this test - task_impl: Box::new(t1), - opctx: opctx.child(std::collections::BTreeMap::new()), - watchers: vec![Box::new(dep_rx1.clone())], - activator: &act1, - }); - - assert!(matches!( - saga_request_recv.try_recv(), - Err(mpsc::error::TryRecvError::Empty), - )); - - driver.activate(&h1); - - // wait 1 second for the saga request to arrive - tokio::select! { - _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => { - assert!(false); - } - - saga_request = saga_request_recv.recv() => { - match saga_request { - None => { - assert!(false); - } - - Some(saga_request) => { - assert!(matches!( - saga_request, - SagaRequest::TestOnly, - )); - } - } - } - } - } } diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index b56f518a24..3c66d3242b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -113,7 +113,7 @@ use super::tasks::vpc_routes; use super::Activator; use super::Driver; use crate::app::oximeter::PRODUCER_LEASE_DURATION; -use crate::app::sagas::SagaRequest; +use crate::app::saga::StartSaga; use nexus_config::BackgroundTaskConfig; use nexus_config::DnsTasksConfig; use nexus_db_model::DnsGroup; @@ -122,7 +122,6 @@ use nexus_db_queries::db::DataStore; use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; use std::sync::Arc; -use tokio::sync::mpsc::Sender; use tokio::sync::watch; use uuid::Uuid; @@ -254,7 +253,7 @@ impl BackgroundTasksInitializer { rack_id: Uuid, nexus_id: Uuid, resolver: internal_dns::resolver::Resolver, - saga_request: Sender, + sagas: Arc, producer_registry: ProducerRegistry, ) -> Driver { let mut driver = self.driver; @@ -548,7 +547,7 @@ impl BackgroundTasksInitializer { { let detector = region_replacement::RegionReplacementDetector::new( datastore.clone(), - saga_request.clone(), + sagas.clone(), ); driver.register(TaskDefinition { @@ -569,7 +568,7 @@ impl BackgroundTasksInitializer { let detector = region_replacement_driver::RegionReplacementDriver::new( datastore.clone(), - saga_request.clone(), + sagas.clone(), ); driver.register(TaskDefinition { @@ -731,7 +730,9 @@ fn init_dns( #[cfg(test)] pub mod test { + use crate::app::saga::StartSaga; use dropshot::HandlerTaskMode; + use futures::FutureExt; use nexus_db_model::DnsGroup; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::datastore::DnsVersionUpdateBuilder; @@ -740,9 +741,39 @@ pub mod test { use nexus_types::internal_api::params as nexus_params; use omicron_test_utils::dev::poll; use std::net::SocketAddr; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; use std::time::Duration; use tempfile::TempDir; + /// Used by various tests of tasks that kick off sagas + pub(crate) struct NoopStartSaga { + count: AtomicU64, + } + + impl NoopStartSaga { + pub(crate) fn new() -> Self { + Self { count: AtomicU64::new(0) } + } + + pub(crate) fn count_reset(&self) -> u64 { + self.count.swap(0, Ordering::SeqCst) + } + } + + impl StartSaga for NoopStartSaga { + fn saga_start( + &self, + _: steno::SagaDag, + ) -> futures::prelude::future::BoxFuture< + '_, + Result<(), omicron_common::api::external::Error>, + > { + let _ = self.count.fetch_add(1, Ordering::SeqCst); + async { Ok(()) }.boxed() + } + } + type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; diff --git a/nexus/src/app/background/tasks/region_replacement.rs b/nexus/src/app/background/tasks/region_replacement.rs index 9e14c294ba..f852f21734 100644 --- a/nexus/src/app/background/tasks/region_replacement.rs +++ b/nexus/src/app/background/tasks/region_replacement.rs @@ -12,7 +12,10 @@ use crate::app::authn; use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; use crate::app::sagas; +use crate::app::sagas::region_replacement_start::SagaRegionReplacementStart; +use crate::app::sagas::NexusSaga; use crate::app::RegionAllocationStrategy; use futures::future::BoxFuture; use futures::FutureExt; @@ -23,39 +26,31 @@ use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::TypedUuid; use serde_json::json; use std::sync::Arc; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; pub struct RegionReplacementDetector { datastore: Arc, - saga_request: Sender, + sagas: Arc, } impl RegionReplacementDetector { - pub fn new( - datastore: Arc, - saga_request: Sender, - ) -> Self { - RegionReplacementDetector { datastore, saga_request } + pub fn new(datastore: Arc, sagas: Arc) -> Self { + RegionReplacementDetector { datastore, sagas } } async fn send_start_request( &self, serialized_authn: authn::saga::Serialized, request: RegionReplacement, - ) -> Result<(), SendError> { - let saga_request = sagas::SagaRequest::RegionReplacementStart { - params: sagas::region_replacement_start::Params { - serialized_authn, - request, - allocation_strategy: - RegionAllocationStrategy::RandomWithDistinctSleds { - seed: None, - }, - }, + ) -> Result<(), omicron_common::api::external::Error> { + let params = sagas::region_replacement_start::Params { + serialized_authn, + request, + allocation_strategy: + RegionAllocationStrategy::RandomWithDistinctSleds { seed: None }, }; - self.saga_request.send(saga_request).await + let saga_dag = SagaRegionReplacementStart::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await } } @@ -201,9 +196,9 @@ impl BackgroundTask for RegionReplacementDetector { #[cfg(test)] mod test { use super::*; + use crate::app::background::init::test::NoopStartSaga; use nexus_db_model::RegionReplacement; use nexus_test_utils_macros::nexus_test; - use tokio::sync::mpsc; use uuid::Uuid; type ControlPlaneTestContext = @@ -220,9 +215,9 @@ mod test { datastore.clone(), ); - let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let starter = Arc::new(NoopStartSaga::new()); let mut task = - RegionReplacementDetector::new(datastore.clone(), saga_request_tx); + RegionReplacementDetector::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -253,6 +248,6 @@ mod test { }) ); - saga_request_rx.try_recv().unwrap(); + assert_eq!(starter.count_reset(), 1); } } diff --git a/nexus/src/app/background/tasks/region_replacement_driver.rs b/nexus/src/app/background/tasks/region_replacement_driver.rs index 06155ffa24..9dd8b6055f 100644 --- a/nexus/src/app/background/tasks/region_replacement_driver.rs +++ b/nexus/src/app/background/tasks/region_replacement_driver.rs @@ -20,7 +20,11 @@ use crate::app::authn; use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; use crate::app::sagas; +use crate::app::sagas::region_replacement_drive::SagaRegionReplacementDrive; +use crate::app::sagas::region_replacement_finish::SagaRegionReplacementFinish; +use crate::app::sagas::NexusSaga; use futures::future::BoxFuture; use futures::FutureExt; use nexus_db_queries::context::OpContext; @@ -28,19 +32,15 @@ use nexus_db_queries::db::DataStore; use nexus_types::internal_api::background::RegionReplacementDriverStatus; use serde_json::json; use std::sync::Arc; -use tokio::sync::mpsc::Sender; pub struct RegionReplacementDriver { datastore: Arc, - saga_request: Sender, + sagas: Arc, } impl RegionReplacementDriver { - pub fn new( - datastore: Arc, - saga_request: Sender, - ) -> Self { - RegionReplacementDriver { datastore, saga_request } + pub fn new(datastore: Arc, sagas: Arc) -> Self { + RegionReplacementDriver { datastore, sagas } } /// Drive running region replacements forward @@ -119,36 +119,32 @@ impl RegionReplacementDriver { // (or determine if it is complete). let request_id = request.id; - - let result = self - .saga_request - .send(sagas::SagaRequest::RegionReplacementDrive { - params: sagas::region_replacement_drive::Params { - serialized_authn: - authn::saga::Serialized::for_opctx(opctx), - request, - }, - }) - .await; - + let params = sagas::region_replacement_drive::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + request, + }; + let result = async { + let saga_dag = + SagaRegionReplacementDrive::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await + } + .await; match result { - Ok(()) => { - let s = format!("{request_id}: drive invoked ok"); - + Ok(_) => { + let s = format!("{request_id}: drive saga started ok"); info!(&log, "{s}"); status.drive_invoked_ok.push(s); } - Err(e) => { let s = format!( - "sending region replacement drive request for \ + "starting region replacement drive saga for \ {request_id} failed: {e}", ); error!(&log, "{s}"); status.errors.push(s); } - }; + } } } } @@ -193,22 +189,19 @@ impl RegionReplacementDriver { }; let request_id = request.id; - - let result = - self.saga_request - .send(sagas::SagaRequest::RegionReplacementFinish { - params: sagas::region_replacement_finish::Params { - serialized_authn: - authn::saga::Serialized::for_opctx(opctx), - region_volume_id: old_region_volume_id, - request, - }, - }) - .await; - + let params = sagas::region_replacement_finish::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + region_volume_id: old_region_volume_id, + request, + }; + let result = async { + let saga_dag = SagaRegionReplacementFinish::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await + } + .await; match result { - Ok(()) => { - let s = format!("{request_id}: finish invoked ok"); + Ok(_) => { + let s = format!("{request_id}: finish saga started ok"); info!(&log, "{s}"); status.finish_invoked_ok.push(s); @@ -216,14 +209,14 @@ impl RegionReplacementDriver { Err(e) => { let s = format!( - "sending region replacement finish request for \ + "starting region replacement finish saga for \ {request_id} failed: {e}" ); error!(&log, "{s}"); status.errors.push(s); } - }; + } } } } @@ -253,6 +246,7 @@ impl BackgroundTask for RegionReplacementDriver { #[cfg(test)] mod test { use super::*; + use crate::app::background::init::test::NoopStartSaga; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use nexus_db_model::Region; @@ -268,7 +262,6 @@ mod test { use omicron_uuid_kinds::UpstairsKind; use omicron_uuid_kinds::UpstairsRepairKind; use omicron_uuid_kinds::UpstairsSessionKind; - use tokio::sync::mpsc; use uuid::Uuid; type ControlPlaneTestContext = @@ -285,9 +278,9 @@ mod test { datastore.clone(), ); - let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let starter = Arc::new(NoopStartSaga::new()); let mut task = - RegionReplacementDriver::new(datastore.clone(), saga_request_tx); + RegionReplacementDriver::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -320,17 +313,12 @@ mod test { assert_eq!( result.drive_invoked_ok, - vec![format!("{request_id}: drive invoked ok")] + vec![format!("{request_id}: drive saga started ok")] ); assert!(result.finish_invoked_ok.is_empty()); assert!(result.errors.is_empty()); - let request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - request, - sagas::SagaRequest::RegionReplacementDrive { .. } - )); + assert_eq!(starter.count_reset(), 1); } #[nexus_test(server = crate::Server)] @@ -344,9 +332,9 @@ mod test { datastore.clone(), ); - let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let starter = Arc::new(NoopStartSaga::new()); let mut task = - RegionReplacementDriver::new(datastore.clone(), saga_request_tx); + RegionReplacementDriver::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -421,16 +409,11 @@ mod test { assert!(result.drive_invoked_ok.is_empty()); assert_eq!( result.finish_invoked_ok, - vec![format!("{request_id}: finish invoked ok")] + vec![format!("{request_id}: finish saga started ok")] ); assert!(result.errors.is_empty()); - let request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - request, - sagas::SagaRequest::RegionReplacementFinish { .. } - )); + assert_eq!(starter.count_reset(), 1); } #[nexus_test(server = crate::Server)] @@ -444,9 +427,9 @@ mod test { datastore.clone(), ); - let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let starter = Arc::new(NoopStartSaga::new()); let mut task = - RegionReplacementDriver::new(datastore.clone(), saga_request_tx); + RegionReplacementDriver::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -519,17 +502,12 @@ mod test { assert_eq!( result.drive_invoked_ok, - vec![format!("{request_id}: drive invoked ok")] + vec![format!("{request_id}: drive saga started ok")] ); assert!(result.finish_invoked_ok.is_empty()); assert!(result.errors.is_empty()); - let saga_request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - saga_request, - sagas::SagaRequest::RegionReplacementDrive { .. } - )); + assert_eq!(starter.count_reset(), 1); // Now, pretend that an Upstairs sent a notification that it // successfully finished a repair @@ -580,12 +558,7 @@ mod test { ); } - let saga_request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - saga_request, - sagas::SagaRequest::RegionReplacementFinish { .. } - )); + assert_eq!(starter.count_reset(), 1); } #[nexus_test(server = crate::Server)] @@ -599,9 +572,9 @@ mod test { datastore.clone(), ); - let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1); + let starter = Arc::new(NoopStartSaga::new()); let mut task = - RegionReplacementDriver::new(datastore.clone(), saga_request_tx); + RegionReplacementDriver::new(datastore.clone(), starter.clone()); // Noop test let result = task.activate(&opctx).await; @@ -673,17 +646,12 @@ mod test { assert_eq!( result.drive_invoked_ok, - vec![format!("{request_id}: drive invoked ok")] + vec![format!("{request_id}: drive saga started ok")] ); assert!(result.finish_invoked_ok.is_empty()); assert!(result.errors.is_empty()); - let saga_request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - saga_request, - sagas::SagaRequest::RegionReplacementDrive { .. } - )); + assert_eq!(starter.count_reset(), 1); // Now, pretend that an Upstairs sent a notification that it failed to // finish a repair @@ -721,16 +689,11 @@ mod test { assert_eq!( result.drive_invoked_ok, - vec![format!("{request_id}: drive invoked ok")] + vec![format!("{request_id}: drive saga started ok")] ); assert!(result.finish_invoked_ok.is_empty()); assert!(result.errors.is_empty()); - let saga_request = saga_request_rx.try_recv().unwrap(); - - assert!(matches!( - saga_request, - sagas::SagaRequest::RegionReplacementDrive { .. } - )); + assert_eq!(starter.count_reset(), 1); } } diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 4b8d148d7b..cee62f1107 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -7,7 +7,6 @@ use self::external_endpoints::NexusCertResolver; use self::saga::SagaExecutor; use crate::app::oximeter::LazyTimeseriesClient; -use crate::app::sagas::SagaRequest; use crate::populate::populate_start; use crate::populate::PopulateArgs; use crate::populate::PopulateStatus; @@ -134,7 +133,7 @@ pub struct Nexus { authz: Arc, /// saga execution coordinator - sagas: SagaExecutor, + sagas: Arc, /// Task representing completion of recovered Sagas recovery_task: std::sync::Mutex>, @@ -249,10 +248,10 @@ impl Nexus { sec_store, )); - let sagas = SagaExecutor::new( + let sagas = Arc::new(SagaExecutor::new( Arc::clone(&sec_client), log.new(o!("component" => "SagaExecutor")), - ); + )); let client_state = dpd_client::ClientState { tag: String::from("nexus"), @@ -402,8 +401,6 @@ impl Nexus { Arc::clone(&db_datastore) as Arc, ); - let (saga_request, mut saga_request_recv) = SagaRequest::channel(); - let (background_tasks_initializer, background_tasks) = background::BackgroundTasksInitializer::new(); @@ -517,41 +514,15 @@ impl Nexus { rack_id, task_config.deployment.id, resolver, - saga_request, + task_nexus.sagas.clone(), task_registry, ); if let Err(_) = task_nexus.background_tasks_driver.set(driver) { - panic!( - "concurrent initialization of \ - background_tasks_driver?!" - ) + panic!("multiple initialization of background_tasks_driver"); } }); - // Spawn a task to receive SagaRequests from RPWs, and execute them - { - let nexus = nexus.clone(); - tokio::spawn(async move { - loop { - match saga_request_recv.recv().await { - None => { - // If this channel is closed, then RPWs will not be - // able to request that sagas be run. This will - // likely only occur when Nexus itself is shutting - // down, so emit an error and exit the task. - error!(&nexus.log, "saga request channel closed!"); - break; - } - - Some(saga_request) => { - nexus.handle_saga_request(saga_request).await; - } - } - } - }); - } - Ok(nexus) } @@ -938,95 +909,6 @@ impl Nexus { &self.internal_resolver } - /// Reliable persistent workflows can request that sagas be executed by - /// sending a SagaRequest to a supplied channel. Execute those here. - pub(crate) async fn handle_saga_request( - self: &Arc, - saga_request: SagaRequest, - ) { - match saga_request { - #[cfg(test)] - SagaRequest::TestOnly => { - unimplemented!(); - } - - SagaRequest::RegionReplacementStart { params } => { - let nexus = self.clone(); - tokio::spawn(async move { - let saga_result = nexus - .sagas - .saga_execute::( - params, - ) - .await; - - match saga_result { - Ok(_) => { - info!( - nexus.log, - "region replacement start saga completed ok" - ); - } - - Err(e) => { - warn!(nexus.log, "region replacement start saga returned an error: {e}"); - } - } - }); - } - - SagaRequest::RegionReplacementDrive { params } => { - let nexus = self.clone(); - tokio::spawn(async move { - let saga_result = nexus - .sagas - .saga_execute::( - params, - ) - .await; - - match saga_result { - Ok(_) => { - info!( - nexus.log, - "region replacement drive saga completed ok" - ); - } - - Err(e) => { - warn!(nexus.log, "region replacement drive saga returned an error: {e}"); - } - } - }); - } - - SagaRequest::RegionReplacementFinish { params } => { - let nexus = self.clone(); - tokio::spawn(async move { - let saga_result = nexus - .sagas - .saga_execute::( - params, - ) - .await; - - match saga_result { - Ok(_) => { - info!( - nexus.log, - "region replacement finish saga completed ok" - ); - } - - Err(e) => { - warn!(nexus.log, "region replacement finish saga returned an error: {e}"); - } - } - }); - } - } - } - pub(crate) async fn dpd_clients( &self, ) -> Result, String> { diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index baa9513d7b..ed4ccf44fd 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -49,12 +49,12 @@ //! or inject errors. use super::sagas::NexusSaga; -use super::sagas::SagaInitError; use super::sagas::ACTION_REGISTRY; use crate::saga_interface::SagaContext; use crate::Nexus; use anyhow::Context; use futures::future::BoxFuture; +use futures::FutureExt; use futures::StreamExt; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; @@ -66,10 +66,8 @@ use omicron_common::api::external::ResourceType; use omicron_common::bail_unless; use std::sync::Arc; use std::sync::OnceLock; -use steno::DagBuilder; use steno::SagaDag; use steno::SagaId; -use steno::SagaName; use steno::SagaResult; use steno::SagaResultOk; use uuid::Uuid; @@ -79,12 +77,31 @@ use uuid::Uuid; pub(crate) fn create_saga_dag( params: N::Params, ) -> Result { - let builder = DagBuilder::new(SagaName::new(N::NAME)); - let dag = N::make_saga_dag(¶ms, builder)?; - let params = serde_json::to_value(¶ms).map_err(|e| { - SagaInitError::SerializeError(String::from("saga params"), e) - })?; - Ok(SagaDag::new(dag, params)) + N::prepare(¶ms) +} + +/// Interface for kicking off sagas +/// +/// See [`SagaExecutor`] for the implementation within Nexus. Some tests use +/// alternate implementations that don't actually run the sagas. +pub(crate) trait StartSaga: Send + Sync { + /// Create a new saga (of type `N` with parameters `params`), start it + /// running, but do not wait for it to finish. + fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>>; +} + +impl StartSaga for SagaExecutor { + fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>> { + async move { + let runnable_saga = self.saga_prepare(dag).await?; + // start() returns a future that can be used to wait for the saga to + // complete. We don't need that here. (Cancelling this has no + // effect on the running saga.) + let _ = runnable_saga.start().await?; + Ok(()) + } + .boxed() + } } /// Handle to a self-contained subsystem for kicking off sagas @@ -242,7 +259,6 @@ impl SagaExecutor { &self, params: N::Params, ) -> Result { - // Construct the DAG specific to this saga. let dag = create_saga_dag::(params)?; let runnable_saga = self.saga_prepare(dag).await?; let running_saga = runnable_saga.start().await?; diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 1604d6013d..d278fb5600 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -15,9 +15,11 @@ use std::sync::Arc; use steno::new_action_noop_undo; use steno::ActionContext; use steno::ActionError; +use steno::DagBuilder; +use steno::SagaDag; +use steno::SagaName; use steno::SagaType; use thiserror::Error; -use tokio::sync::mpsc; use uuid::Uuid; pub mod disk_create; @@ -70,6 +72,17 @@ pub(crate) trait NexusSaga { params: &Self::Params, builder: steno::DagBuilder, ) -> Result; + + fn prepare( + params: &Self::Params, + ) -> Result { + let builder = DagBuilder::new(SagaName::new(Self::NAME)); + let dag = Self::make_saga_dag(¶ms, builder)?; + let params = serde_json::to_value(¶ms).map_err(|e| { + SagaInitError::SerializeError(format!("saga params: {params:?}"), e) + })?; + Ok(SagaDag::new(dag, params)) + } } #[derive(Debug, Error)] @@ -318,35 +331,3 @@ pub(crate) use __action_name; pub(crate) use __emit_action; pub(crate) use __stringify_ident; pub(crate) use declare_saga_actions; - -/// Reliable persistent workflows can request that sagas be run as part of their -/// activation by sending a SagaRequest through a supplied channel to Nexus. -pub enum SagaRequest { - #[cfg(test)] - TestOnly, - - RegionReplacementStart { - params: region_replacement_start::Params, - }, - - RegionReplacementDrive { - params: region_replacement_drive::Params, - }, - - RegionReplacementFinish { - params: region_replacement_finish::Params, - }, -} - -impl SagaRequest { - pub fn channel() -> (mpsc::Sender, mpsc::Receiver) - { - // Limit the maximum number of saga requests that background tasks can - // queue for Nexus to run. - // - // Note this value was chosen arbitrarily! - const MAX_QUEUED_SAGA_REQUESTS: usize = 128; - - mpsc::channel(MAX_QUEUED_SAGA_REQUESTS) - } -}