diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 5bc69946ad..975df7fc3b 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -469,6 +469,10 @@ impl super::Nexus { // We don't need the handle that runnable_saga.start() returns because // we're not going to wait for the saga to finish here. let _ = runnable_saga.start().await?; + + let mut demo_sagas = self.demo_sagas()?; + demo_sagas.preregister(demo_saga_id); + Ok(DemoSaga { saga_id, demo_saga_id }) } diff --git a/nexus/src/app/sagas/demo.rs b/nexus/src/app/sagas/demo.rs index 4a8eda8b80..d76a48688d 100644 --- a/nexus/src/app/sagas/demo.rs +++ b/nexus/src/app/sagas/demo.rs @@ -21,56 +21,66 @@ use super::NexusActionContext; use super::{ActionRegistry, NexusSaga, SagaInitError}; use crate::app::sagas::declare_saga_actions; -use anyhow::ensure; +use anyhow::Context; use omicron_common::api::external::Error; use omicron_uuid_kinds::DemoSagaUuid; use serde::Deserialize; use serde::Serialize; use slog::info; use std::collections::BTreeMap; +use std::future::Future; +use std::sync::Arc; use steno::ActionError; -use tokio::sync::oneshot; +use tokio::sync::Semaphore; -/// Set of demo sagas that have been marked completed +/// Rendezvous point for demo sagas /// -/// Nexus maintains one of these at the top level. Individual demo sagas wait -/// until their id shows up here, then remove it and proceed. +/// This is where: +/// +/// - demo sagas wait for a completion message +/// - completion messages are recorded for demo sagas that haven't started +/// waiting yet +/// +/// Nexus maintains one of these structures at the top level. pub struct CompletingDemoSagas { - ids: BTreeMap>, + sagas: BTreeMap>, } impl CompletingDemoSagas { pub fn new() -> CompletingDemoSagas { - CompletingDemoSagas { ids: BTreeMap::new() } + CompletingDemoSagas { sagas: BTreeMap::new() } } - pub fn complete(&mut self, id: DemoSagaUuid) -> Result<(), Error> { - self.ids - .remove(&id) - .ok_or_else(|| { - Error::non_resourcetype_not_found(format!( - "demo saga with id {:?}", - id - )) - })? - .send(()) - .map_err(|_| { - Error::internal_error( - "saga stopped listening (Nexus shutting down?)", - ) - }) + pub fn preregister(&mut self, id: DemoSagaUuid) { + assert!(self.sagas.insert(id, Arc::new(Semaphore::new(0))).is_none()); } pub fn subscribe( &mut self, id: DemoSagaUuid, - ) -> Result, anyhow::Error> { - let (tx, rx) = oneshot::channel(); - ensure!( - self.ids.insert(id, tx).is_none(), - "multiple subscriptions for the same demo saga" - ); - Ok(rx) + ) -> impl Future> { + let sem = + self.sagas.entry(id).or_insert_with(|| Arc::new(Semaphore::new(0))); + let sem_clone = sem.clone(); + async move { + sem_clone + .acquire() + .await + // We don't need the Semaphore permit once we've acquired it. + .map(|_| ()) + .context("acquiring demo saga semaphore") + } + } + + pub fn complete(&mut self, id: DemoSagaUuid) -> Result<(), Error> { + let sem = self.sagas.get_mut(&id).ok_or_else(|| { + Error::non_resourcetype_not_found(format!( + "demo saga with demo saga id {:?}", + id + )) + })?; + sem.add_permits(1); + Ok(()) } } @@ -115,21 +125,87 @@ async fn demo_wait(sagactx: NexusActionContext) -> Result<(), ActionError> { .nexus() .demo_sagas() .map_err(ActionError::action_failed)?; - demo_sagas.subscribe(demo_id).map_err(|e| { - ActionError::action_failed(Error::internal_error(&format!( - "demo saga subscribe failed: {:#}", - e - ))) - })? + demo_sagas.subscribe(demo_id) }; match rx.await { Ok(_) => { info!(log, "demo saga: completing"; "id" => %demo_id); + Ok(()) } - Err(_) => { - info!(log, "demo saga: waiting failed (Nexus shutting down?)"; - "id" => %demo_id); + Err(error) => { + warn!(log, "demo saga: waiting failed (Nexus shutting down?)"; + "id" => %demo_id, + "error" => #?error, + ); + Err(ActionError::action_failed(Error::internal_error(&format!( + "demo saga wait failed: {:#}", + error + )))) } } - Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use assert_matches::assert_matches; + + #[tokio::test] + async fn test_demo_saga_rendezvous() { + let mut hub = CompletingDemoSagas::new(); + + // The most straightforward sequence is: + // - create (preregister) demo saga + // - demo saga starts and waits for completion (subscribe) + // - complete demo saga + let demo_saga_id = DemoSagaUuid::new_v4(); + println!("demo saga: {demo_saga_id}"); + hub.preregister(demo_saga_id); + println!("demo saga: {demo_saga_id} preregistered"); + let subscribe = hub.subscribe(demo_saga_id); + println!("demo saga: {demo_saga_id} subscribed"); + assert!(hub.complete(demo_saga_id).is_ok()); + println!("demo saga: {demo_saga_id} marked completed"); + subscribe.await.unwrap(); + println!("demo saga: {demo_saga_id} done"); + + // It's also possible that the completion request arrives before the + // saga started waiting. In that case, the sequence is: + // + // - create (preregister) demo saga + // - complete demo saga + // - demo saga starts and waits for completion (subscribe) + // + // This should work, too, with no errors. + let demo_saga_id = DemoSagaUuid::new_v4(); + println!("demo saga: {demo_saga_id}"); + hub.preregister(demo_saga_id); + println!("demo saga: {demo_saga_id} preregistered"); + assert!(hub.complete(demo_saga_id).is_ok()); + println!("demo saga: {demo_saga_id} marked completed"); + let subscribe = hub.subscribe(demo_saga_id); + println!("demo saga: {demo_saga_id} subscribed"); + subscribe.await.unwrap(); + println!("demo saga: {demo_saga_id} done"); + + // It's also possible to have no preregistration at all. This happens + // if the demo saga was recovered. That's fine, too, but then it will + // only work if the completion arrives after the saga starts waiting. + let demo_saga_id = DemoSagaUuid::new_v4(); + println!("demo saga: {demo_saga_id}"); + let subscribe = hub.subscribe(demo_saga_id); + println!("demo saga: {demo_saga_id} subscribed"); + assert!(hub.complete(demo_saga_id).is_ok()); + println!("demo saga: {demo_saga_id} marked completed"); + subscribe.await.unwrap(); + println!("demo saga: {demo_saga_id} done"); + + // If there's no preregistration and we get a completion request, then + // that request should fail. + let demo_saga_id = DemoSagaUuid::new_v4(); + println!("demo saga: {demo_saga_id}"); + let error = hub.complete(demo_saga_id).unwrap_err(); + assert_matches!(error, Error::NotFound { .. }); + println!("demo saga: {demo_saga_id} complete error: {:#}", error); + } }