Skip to content

Commit

Permalink
fix flaky test_demo_saga (#6388)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco authored Aug 19, 2024
1 parent 6dd9802 commit a338e8a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 39 deletions.
4 changes: 4 additions & 0 deletions nexus/src/app/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ impl super::Nexus {
// We don't need the handle that runnable_saga.start() returns because
// we're not going to wait for the saga to finish here.
let _ = runnable_saga.start().await?;

let mut demo_sagas = self.demo_sagas()?;
demo_sagas.preregister(demo_saga_id);

Ok(DemoSaga { saga_id, demo_saga_id })
}

Expand Down
154 changes: 115 additions & 39 deletions nexus/src/app/sagas/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,66 @@
use super::NexusActionContext;
use super::{ActionRegistry, NexusSaga, SagaInitError};
use crate::app::sagas::declare_saga_actions;
use anyhow::ensure;
use anyhow::Context;
use omicron_common::api::external::Error;
use omicron_uuid_kinds::DemoSagaUuid;
use serde::Deserialize;
use serde::Serialize;
use slog::info;
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::Arc;
use steno::ActionError;
use tokio::sync::oneshot;
use tokio::sync::Semaphore;

/// Set of demo sagas that have been marked completed
/// Rendezvous point for demo sagas
///
/// Nexus maintains one of these at the top level. Individual demo sagas wait
/// until their id shows up here, then remove it and proceed.
/// This is where:
///
/// - demo sagas wait for a completion message
/// - completion messages are recorded for demo sagas that haven't started
/// waiting yet
///
/// Nexus maintains one of these structures at the top level.
pub struct CompletingDemoSagas {
ids: BTreeMap<DemoSagaUuid, oneshot::Sender<()>>,
sagas: BTreeMap<DemoSagaUuid, Arc<Semaphore>>,
}

impl CompletingDemoSagas {
pub fn new() -> CompletingDemoSagas {
CompletingDemoSagas { ids: BTreeMap::new() }
CompletingDemoSagas { sagas: BTreeMap::new() }
}

pub fn complete(&mut self, id: DemoSagaUuid) -> Result<(), Error> {
self.ids
.remove(&id)
.ok_or_else(|| {
Error::non_resourcetype_not_found(format!(
"demo saga with id {:?}",
id
))
})?
.send(())
.map_err(|_| {
Error::internal_error(
"saga stopped listening (Nexus shutting down?)",
)
})
pub fn preregister(&mut self, id: DemoSagaUuid) {
assert!(self.sagas.insert(id, Arc::new(Semaphore::new(0))).is_none());
}

pub fn subscribe(
&mut self,
id: DemoSagaUuid,
) -> Result<oneshot::Receiver<()>, anyhow::Error> {
let (tx, rx) = oneshot::channel();
ensure!(
self.ids.insert(id, tx).is_none(),
"multiple subscriptions for the same demo saga"
);
Ok(rx)
) -> impl Future<Output = Result<(), anyhow::Error>> {
let sem =
self.sagas.entry(id).or_insert_with(|| Arc::new(Semaphore::new(0)));
let sem_clone = sem.clone();
async move {
sem_clone
.acquire()
.await
// We don't need the Semaphore permit once we've acquired it.
.map(|_| ())
.context("acquiring demo saga semaphore")
}
}

pub fn complete(&mut self, id: DemoSagaUuid) -> Result<(), Error> {
let sem = self.sagas.get_mut(&id).ok_or_else(|| {
Error::non_resourcetype_not_found(format!(
"demo saga with demo saga id {:?}",
id
))
})?;
sem.add_permits(1);
Ok(())
}
}

Expand Down Expand Up @@ -115,21 +125,87 @@ async fn demo_wait(sagactx: NexusActionContext) -> Result<(), ActionError> {
.nexus()
.demo_sagas()
.map_err(ActionError::action_failed)?;
demo_sagas.subscribe(demo_id).map_err(|e| {
ActionError::action_failed(Error::internal_error(&format!(
"demo saga subscribe failed: {:#}",
e
)))
})?
demo_sagas.subscribe(demo_id)
};
match rx.await {
Ok(_) => {
info!(log, "demo saga: completing"; "id" => %demo_id);
Ok(())
}
Err(_) => {
info!(log, "demo saga: waiting failed (Nexus shutting down?)";
"id" => %demo_id);
Err(error) => {
warn!(log, "demo saga: waiting failed (Nexus shutting down?)";
"id" => %demo_id,
"error" => #?error,
);
Err(ActionError::action_failed(Error::internal_error(&format!(
"demo saga wait failed: {:#}",
error
))))
}
}
Ok(())
}

#[cfg(test)]
mod test {
use super::*;
use assert_matches::assert_matches;

#[tokio::test]
async fn test_demo_saga_rendezvous() {
let mut hub = CompletingDemoSagas::new();

// The most straightforward sequence is:
// - create (preregister) demo saga
// - demo saga starts and waits for completion (subscribe)
// - complete demo saga
let demo_saga_id = DemoSagaUuid::new_v4();
println!("demo saga: {demo_saga_id}");
hub.preregister(demo_saga_id);
println!("demo saga: {demo_saga_id} preregistered");
let subscribe = hub.subscribe(demo_saga_id);
println!("demo saga: {demo_saga_id} subscribed");
assert!(hub.complete(demo_saga_id).is_ok());
println!("demo saga: {demo_saga_id} marked completed");
subscribe.await.unwrap();
println!("demo saga: {demo_saga_id} done");

// It's also possible that the completion request arrives before the
// saga started waiting. In that case, the sequence is:
//
// - create (preregister) demo saga
// - complete demo saga
// - demo saga starts and waits for completion (subscribe)
//
// This should work, too, with no errors.
let demo_saga_id = DemoSagaUuid::new_v4();
println!("demo saga: {demo_saga_id}");
hub.preregister(demo_saga_id);
println!("demo saga: {demo_saga_id} preregistered");
assert!(hub.complete(demo_saga_id).is_ok());
println!("demo saga: {demo_saga_id} marked completed");
let subscribe = hub.subscribe(demo_saga_id);
println!("demo saga: {demo_saga_id} subscribed");
subscribe.await.unwrap();
println!("demo saga: {demo_saga_id} done");

// It's also possible to have no preregistration at all. This happens
// if the demo saga was recovered. That's fine, too, but then it will
// only work if the completion arrives after the saga starts waiting.
let demo_saga_id = DemoSagaUuid::new_v4();
println!("demo saga: {demo_saga_id}");
let subscribe = hub.subscribe(demo_saga_id);
println!("demo saga: {demo_saga_id} subscribed");
assert!(hub.complete(demo_saga_id).is_ok());
println!("demo saga: {demo_saga_id} marked completed");
subscribe.await.unwrap();
println!("demo saga: {demo_saga_id} done");

// If there's no preregistration and we get a completion request, then
// that request should fail.
let demo_saga_id = DemoSagaUuid::new_v4();
println!("demo saga: {demo_saga_id}");
let error = hub.complete(demo_saga_id).unwrap_err();
assert_matches!(error, Error::NotFound { .. });
println!("demo saga: {demo_saga_id} complete error: {:#}", error);
}
}

0 comments on commit a338e8a

Please sign in to comment.