diff --git a/nexus/src/app/background/tasks/saga_recovery/mod.rs b/nexus/src/app/background/tasks/saga_recovery/mod.rs index c2da95a216..a608dd1dc6 100644 --- a/nexus/src/app/background/tasks/saga_recovery/mod.rs +++ b/nexus/src/app/background/tasks/saga_recovery/mod.rs @@ -137,102 +137,11 @@ mod test { use omicron_common::api::external::Error; use omicron_test_utils::dev::test_setup_log; use std::collections::BTreeMap; + use std::collections::BTreeSet; use steno::SagaId; use tokio::sync::mpsc; use uuid::Uuid; - // Test the following structures used together: - // - // - RestState - // - Plan - // - ExecutionSummary - // - Report - // - // The first three of these are used in a loop together and their exposed - // functionality is largely in terms of each other. That makes them a - // little harder to test in isolation, but we can test them pretty - // well together. - #[tokio::test] - async fn test_basic() { - let logctx = test_setup_log("saga_recovery_basic"); - let log = &logctx.log; - - // Start with a blank slate. - let mut report = status::Report::new(); - let mut rest_state = recovery::RestState::new(); - let initial_rest_state = rest_state.clone(); - - // Now, go through a no-op recovery. - let (plan, summary, last_pass_result) = do_recovery_pass( - log, - &mut rest_state, - Vec::new(), - Vec::new(), - &|_| Ok(()), - ); - assert_eq!(last_pass_result.nfound, 0); - assert_eq!(last_pass_result.nskipped, 0); - assert_eq!(last_pass_result.nremoved, 0); - assert_eq!(rest_state, initial_rest_state); - report.update_after_pass(&plan, summary); - - // Great. Now go through a somewhat general case of recovery: - // - start two sagas normally (i.e., as though they had been started - // elsewhere in Nexus) - // - one that also appears in the database - // (this is the case where Nexus listed sagas after this new saga - // started) - // - one that does not appear in the database - // (this is the case where Nexus listed sagas before this new saga - // started) - // - at the same time, create two sagas that need to be recovered: - // - one where recovery succeeds - // - one where recovery fails - let sagas_started = make_saga_ids(2); - let sagas_to_recover = make_saga_ids(2); - let db_sagas = { - let mut db_sagas = sagas_to_recover.clone(); - db_sagas.push(sagas_started[0]); - db_sagas - }; - let ndb_sagas = db_sagas.len(); - let (plan, summary, last_pass_result) = do_recovery_pass( - log, - &mut rest_state, - sagas_started, - db_sagas, - &|s| { - if s == sagas_to_recover[1] { - Ok(()) - } else { - Err(Error::internal_error("test error")) - } - }, - ); - assert_eq!(ndb_sagas, last_pass_result.nfound); - assert_eq!(1, last_pass_result.nskipped); - assert_eq!(0, last_pass_result.nremoved); - assert_eq!(1, last_pass_result.nrecovered); - assert_eq!(1, last_pass_result.nfailed); - assert_eq!( - vec![sagas_to_recover[1]], - summary.sagas_recovered_successfully().collect::>() - ); - report.update_after_pass(&plan, summary); - assert_eq!(report.recent_recoveries.ring.len(), 1); - assert_eq!( - report.recent_recoveries.ring[0].saga_id, - sagas_to_recover[1] - ); - assert_eq!(report.recent_failures.ring.len(), 1); - assert_eq!(report.recent_failures.ring[0].saga_id, sagas_to_recover[0]); - - // XXX-dap working on this test -- see comment in the other file for - // some cases to cover, and also think through it from first principles. - // e.g., another pass should exercise cases where an ambiguous saga - // actually does finish and one where it doesn't? - } - const FAKE_SEC_ID: &str = "03082281-fb2e-4bfd-bce3-997c89a0db2d"; pub fn make_fake_saga(saga_id: SagaId) -> nexus_db_model::Saga { let sec_id = @@ -259,76 +168,462 @@ mod test { rv } - /// Simulates one recovery pass - fn do_recovery_pass( - log: &slog::Logger, - rest_state: &mut recovery::RestState, - new_sagas_started: Vec, - sagas_found: Vec, - recovery_result: &dyn Fn(SagaId) -> Result<(), Error>, - ) -> (recovery::Plan, recovery::ExecutionSummary, status::LastPassSuccess) - { - // Simulate processing messages that the `new_sagas_started` sagas just - // started. - let (tx, mut rx) = mpsc::unbounded_channel(); - for saga_id in new_sagas_started { - tx.send(saga_id).unwrap(); + /// Simple simulator for saga recovery state + /// + /// This type keeps track of simulated database state and enough actual + /// in-memory state to make it easy to test various scenarios. + struct Simulator { + log: slog::Logger, + rest_state: recovery::RestState, + started_sagas: Vec, + db_list: BTreeMap, + snapshot_db_list: Option>, + injected_recovery_errors: BTreeSet, + } + + impl Simulator { + pub fn new(log: slog::Logger) -> Simulator { + Simulator { + log, + rest_state: recovery::RestState::new(), + started_sagas: Vec::new(), + db_list: BTreeMap::new(), + snapshot_db_list: None, + injected_recovery_errors: BTreeSet::new(), + } } - rest_state.update_started_sagas(log, &mut rx); - - // Generate fake database records for the sagas that the caller wants to - // pretend show up as running in the database. - let expected_nfound = sagas_found.len(); - let fake_sagas_found: BTreeMap<_, _> = sagas_found - .into_iter() - .map(|saga_id| { - let saga = make_fake_saga(saga_id); - (saga_id, saga) - }) - .collect(); - - // Start the recovery pass by planning what to do. - let plan = recovery::Plan::new(log, &rest_state, fake_sagas_found); - - // Simulate execution using the callback to determine whether recovery - // for each saga succeeds or not. - // - // There are a lot of ways we could interleave execution here. But in - // practice, the implementation we care about does these all serially. - // So that's what we test here. - let mut summary_builder = recovery::ExecutionSummaryBuilder::new(); - let mut nok = 0; - let mut nerrors = 0; - for (saga_id, saga) in plan.sagas_needing_recovery() { - let saga_log = log.new(o!( - "saga_name" => saga.name.clone(), - "saga_id" => saga_id.to_string(), - )); - - summary_builder.saga_recovery_start(*saga_id, saga_log); - match recovery_result(*saga_id) { - Ok(()) => { + + /// Pretend that a particular saga was running in a previous Nexus + /// lifetime (and so needs to be recovered). + pub fn sim_previously_running_saga(&mut self) -> SagaId { + let saga_id = SagaId(Uuid::new_v4()); + self.db_list.insert(saga_id, make_fake_saga(saga_id)); + saga_id + } + + /// Pretend that Nexus started a new saga (e.g., in response to an API + /// request) + pub fn sim_normal_saga_start(&mut self) -> SagaId { + let saga_id = SagaId(Uuid::new_v4()); + self.db_list.insert(saga_id, make_fake_saga(saga_id)); + self.started_sagas.push(saga_id); + saga_id + } + + /// Pretend that Nexus finished running the given saga + pub fn sim_normal_saga_done(&mut self, saga_id: SagaId) { + assert!( + self.db_list.remove(&saga_id).is_some(), + "simulated saga finished, but it wasn't running" + ); + } + + /// Configure simulation so that recovery for the specified saga will + /// succeed or fail, depending on `fail`. This will affect all recovery + /// passes until the function is called again with a different value. + /// + /// If this function is not called for a saga, the default behavior is + /// that recovery succeeds. + pub fn sim_config_recovery_result( + &mut self, + saga_id: SagaId, + fail: bool, + ) { + if fail { + self.injected_recovery_errors.insert(saga_id); + } else { + self.injected_recovery_errors.remove(&saga_id); + } + } + + /// Snapshot the simulated database state and use that state for the + /// next recovery pass. + /// + /// As an example, this can be used to exercise both sides of the race + /// between Nexus starting a saga and listing in-progress sagas. If you + /// want to test "listing in-progress" happens first, use this function + /// to snapshot the database state, then start a saga, and then do a + /// recovery pass. That recovery pass will act on the snapshotted + /// database state. + /// + /// After the next recovery pass, the snapshotted state will be removed. + /// The _next_ recovery pass will use the latest database state unless + /// this function is called again. + pub fn snapshot_db(&mut self) { + assert!( + self.snapshot_db_list.is_none(), + "multiple snapshots created between recovery passes" + ); + self.snapshot_db_list = Some(self.db_list.clone()); + } + + /// Simulate a saga recovery pass + pub fn sim_recovery_pass( + &mut self, + ) -> (recovery::Plan, recovery::ExecutionSummary, status::LastPassSuccess) + { + let log = &self.log; + + // Simulate processing messages that the `new_sagas_started` sagas + // just started. + let (tx, mut rx) = mpsc::unbounded_channel(); + for saga_id in self.started_sagas.drain(..) { + tx.send(saga_id).unwrap(); + } + self.rest_state.update_started_sagas(log, &mut rx); + + // Start the recovery pass by planning what to do. + let db_sagas = self + .snapshot_db_list + .take() + .unwrap_or_else(|| self.db_list.clone()); + let plan = recovery::Plan::new(log, &self.rest_state, db_sagas); + + // Simulate execution using the callback to determine whether + // recovery for each saga succeeds or not. + // + // There are a lot of ways we could interleave execution here. But + // in practice, the implementation we care about does these all + // serially. So that's what we test here. + let mut summary_builder = recovery::ExecutionSummaryBuilder::new(); + let mut nok = 0; + let mut nerrors = 0; + for (saga_id, saga) in plan.sagas_needing_recovery() { + let saga_log = log.new(o!( + "saga_name" => saga.name.clone(), + "saga_id" => saga_id.to_string(), + )); + + summary_builder.saga_recovery_start(*saga_id, saga_log); + if self.injected_recovery_errors.contains(saga_id) { + nerrors += 1; + summary_builder.saga_recovery_failure( + *saga_id, + &Error::internal_error("test error"), + ); + } else { nok += 1; summary_builder.saga_recovery_success( *saga_id, futures::future::ready(Ok(())).boxed(), ); } - Err(error) => { - nerrors += 1; - summary_builder.saga_recovery_failure(*saga_id, &error); - } } + + let summary = summary_builder.build(); + let last_pass = status::LastPassSuccess::new(&plan, &summary); + assert_eq!(last_pass.nrecovered, nok); + assert_eq!(last_pass.nfailed, nerrors); + + self.rest_state.update_after_pass(&plan, &summary); + + // We can't tell from the information we have how many were skipped, + // removed, or ambiguous. The caller verifies that. + (plan, summary, last_pass) } + } - let summary = summary_builder.build(); - let last_pass = status::LastPassSuccess::new(&plan, &summary); - assert_eq!(last_pass.nfound, expected_nfound); - assert_eq!(last_pass.nrecovered, nok); - assert_eq!(last_pass.nfailed, nerrors); + // End-to-end test of the saga recovery bookkeeping -- everything *except* + // loading the sagas from the database and restoring them in Steno. + // + // Tests the following structures used together: + // + // - RestState + // - Plan + // - ExecutionSummary + // + // These are hard to test in isolation since they're intended to be used + // together in a loop (and so don't export public interfaces for mucking + // with internal). + #[tokio::test] + async fn test_basic() { + let logctx = test_setup_log("saga_recovery_basic"); + let log = &logctx.log; + + // Start with a blank slate. + let mut sim = Simulator::new(log.clone()); + let initial_rest_state = sim.rest_state.clone(); + let mut report = status::Report::new(); + + // + // Now, go through a no-op recovery. + // + let (plan, summary, last_pass_result) = sim.sim_recovery_pass(); + assert_eq!(last_pass_result.nfound, 0); + assert_eq!(last_pass_result.nskipped, 0); + assert_eq!(last_pass_result.nremoved, 0); + assert_eq!(sim.rest_state, initial_rest_state); + report.update_after_pass(&plan, summary); + + // + // Now, go through a somewhat general case of recovery. + // + // First, add a couple of sagas that just showed up in the database. + // This covers the case of sagas that were either from a previous Nexus + // lifetime or re-assigned from some other Nexus that has been expunged. + // We create two so we can exercise success and failure cases for + // recovery. + // + let saga_recover_ok = sim.sim_previously_running_saga(); + let saga_recover_fail = sim.sim_previously_running_saga(); + sim.sim_config_recovery_result(saga_recover_fail, true); + + // Simulate Nexus starting a couple of sagas in the usual way. This one + // will appear in the database as well as in our set of sagas started. + let saga_started_normally_1 = sim.sim_normal_saga_start(); + let saga_started_normally_2 = sim.sim_normal_saga_start(); + + // Start a saga and then finish it immediately. This is a tricky case + // because the recovery pass will see that it started, but not see in + // the database, and it won't be able to tell if it finished or just + // started. + let saga_started_and_finished = sim.sim_normal_saga_start(); + sim.sim_normal_saga_done(saga_started_and_finished); + + // Take a snapshot. Subsequent changes will not affect the database + // state that's used for the next recovery pass. We'll use this to + // simulate Nexus having started a saga immediately after the database + // listing that's used for a recovery pass. + sim.snapshot_db(); + let saga_started_after_listing = sim.sim_normal_saga_start(); + + // We're finally ready to carry out a simulation pass and verify what + // happened with each of these sagas. + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + // In the end, there should have been four sagas found in the database: + // all of the above except for the one that finished. + assert_eq!(4, last_pass_success.nfound); + // Two of these needed to be recovered (because they had been previously + // running). One succeeded. + assert_eq!(1, last_pass_success.nrecovered); + assert_eq!(1, summary.succeeded.len()); + assert_eq!(saga_recover_ok, summary.succeeded[0].saga_id); + + assert_eq!(1, last_pass_success.nfailed); + assert_eq!(1, summary.failed.len()); + assert_eq!(saga_recover_fail, summary.failed[0].saga_id); + // Two sagas should have been found in the database that corresponded to + // sagas that had been started normally and did not need to be + // recovered. They would have been skipped. + assert_eq!(2, last_pass_success.nskipped); + assert_eq!(2, plan.nskipped()); + // No sagas were removed yet -- we can't do that with only one pass. + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.ninferred_done()); + // From what the pass could tell, two sagas might be done: the one that + // actually finished and the one that started after the database + // listing. + let mut maybe_done = plan.sagas_maybe_done().collect::>(); + maybe_done.sort(); + let mut expected_maybe_done = + vec![saga_started_and_finished, saga_started_after_listing]; + expected_maybe_done.sort(); + assert_eq!(maybe_done, expected_maybe_done); + report.update_after_pass(&plan, summary); + + // + // Change nothing and run another pass. + // This pass allows the system to determine that some sagas are now + // done. + // + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + // There's now five sagas in-progress in the database: the same four as + // above, plus the one that was started after the snapshot. + assert_eq!(5, last_pass_success.nfound); + // One of these needs to be recovered because it failed last time. It + // fails again this time. + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + assert_eq!(1, last_pass_success.nfailed); + assert_eq!(1, summary.failed.len()); + assert_eq!(saga_recover_fail, summary.failed[0].saga_id); + // This time, four sagas should have been found in the database that + // correspond to sagas that were started normally and did not need to be + // recovered: the two from last time, plus the one that was recovered, + // plus the one that was started after the previous snapshot. These + // would have been skipped. + assert_eq!(4, last_pass_success.nskipped); + assert_eq!(4, plan.nskipped()); + // This time, the saga that was actually finished should have been + // removed. We could tell this time. + assert_eq!(1, last_pass_success.nremoved); + assert_eq!( + vec![saga_started_and_finished], + plan.sagas_inferred_done().collect::>() + ); + // This time, there are no sagas that might be done. The one we thought + // might have been done last time is now clearly running because it + // appears in this database listing. + assert_eq!(0, plan.sagas_maybe_done().count()); + report.update_after_pass(&plan, summary); + + // + // Again, change nothing and run another pass. This should be a steady + // state: if we keep running passes from here, nothing should change. + // + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + // Same as above. + assert_eq!(5, last_pass_success.nfound); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + assert_eq!(1, last_pass_success.nfailed); + assert_eq!(1, summary.failed.len()); + assert_eq!(saga_recover_fail, summary.failed[0].saga_id); + assert_eq!(4, last_pass_success.nskipped); + assert_eq!(4, plan.nskipped()); + assert_eq!(0, plan.sagas_maybe_done().count()); + // Here's the only thing that differs from last time. We removed a saga + // before, so this time there's nothing to remove. + // removed. We could tell this time. + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.sagas_inferred_done().count()); + report.update_after_pass(&plan, summary); - // We can't tell from the information we have how many were skipped, - // removed, or ambiguous. The caller verifies that. - (plan, summary, last_pass) + // + // Once more and make sure nothing changes. + // + let previous_rest_state = sim.rest_state.clone(); + let previous_last_pass_success = last_pass_success.clone(); + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(previous_rest_state, sim.rest_state); + assert_eq!(previous_last_pass_success, last_pass_success); + report.update_after_pass(&plan, summary); + + // + // This time, fix that saga whose recovery has been failing. + // + sim.sim_config_recovery_result(saga_recover_fail, false); + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + // Same as above. + assert_eq!(5, last_pass_success.nfound); + assert_eq!(4, last_pass_success.nskipped); + assert_eq!(4, plan.nskipped()); + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.sagas_inferred_done().count()); + assert_eq!(0, plan.sagas_maybe_done().count()); + // Here's what's different from before. + assert_eq!(1, last_pass_success.nrecovered); + assert_eq!(1, summary.succeeded.len()); + assert_eq!(saga_recover_fail, summary.succeeded[0].saga_id); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + report.update_after_pass(&plan, summary); + + // + // After the next pass, we should have one more saga that seems to be + // running. + // + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + // Same as above. + assert_eq!(5, last_pass_success.nfound); + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.sagas_inferred_done().count()); + assert_eq!(0, plan.sagas_maybe_done().count()); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + // Here's what's different from before. + assert_eq!(5, last_pass_success.nskipped); + assert_eq!(5, plan.nskipped()); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + report.update_after_pass(&plan, summary); + + // + // With another pass, nothing should differ. + // + let previous_rest_state = sim.rest_state.clone(); + let previous_last_pass_success = last_pass_success.clone(); + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(previous_rest_state, sim.rest_state); + assert_eq!(previous_last_pass_success, last_pass_success); + report.update_after_pass(&plan, summary); + + // + // Now let's complete a couple of different sagas. + // It'll take two passes for the system to be sure they're done. + // + sim.sim_normal_saga_done(saga_started_normally_1); + sim.sim_normal_saga_done(saga_started_after_listing); + sim.sim_normal_saga_done(saga_recover_fail); + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(2, last_pass_success.nfound); + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.sagas_inferred_done().count()); + assert_eq!(3, plan.sagas_maybe_done().count()); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + assert_eq!(2, last_pass_success.nskipped); + assert_eq!(2, plan.nskipped()); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + report.update_after_pass(&plan, summary); + + // + // With another pass, we can remove those three that finished. + // + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(2, last_pass_success.nfound); + assert_eq!(3, last_pass_success.nremoved); + assert_eq!(3, plan.sagas_inferred_done().count()); + assert_eq!(0, plan.sagas_maybe_done().count()); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + assert_eq!(2, last_pass_success.nskipped); + assert_eq!(2, plan.nskipped()); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + report.update_after_pass(&plan, summary); + + // + // Finish the last two sagas. + // + sim.sim_normal_saga_done(saga_started_normally_2); + sim.sim_normal_saga_done(saga_recover_ok); + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(0, last_pass_success.nfound); + assert_eq!(0, last_pass_success.nremoved); + assert_eq!(0, plan.sagas_inferred_done().count()); + assert_eq!(2, plan.sagas_maybe_done().count()); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + assert_eq!(0, last_pass_success.nskipped); + assert_eq!(0, plan.nskipped()); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + report.update_after_pass(&plan, summary); + + // + // With another pass, remove those last two. + // + let (plan, summary, last_pass_success) = sim.sim_recovery_pass(); + assert_eq!(0, last_pass_success.nfound); + assert_eq!(2, last_pass_success.nremoved); + assert_eq!(2, plan.sagas_inferred_done().count()); + assert_eq!(0, plan.sagas_maybe_done().count()); + assert_eq!(0, last_pass_success.nfailed); + assert_eq!(0, summary.failed.len()); + assert_eq!(0, last_pass_success.nskipped); + assert_eq!(0, plan.nskipped()); + assert_eq!(0, last_pass_success.nrecovered); + assert_eq!(0, summary.succeeded.len()); + report.update_after_pass(&plan, summary); + + // At this point, the rest state should match our existing rest state. + // This is an extra check to make sure we're not leaking memory related + // to old sagas. + assert_eq!(sim.rest_state, initial_rest_state); + + // + // At this point, we've exercised: + // - recovering a saga that we didn't start + // (basic "recovery" path after a crash, plus re-assignment path) + // - retrying a saga whose recovery failed (multiple times) + // - *not* trying to recover: + // - a newly-started saga + // - a saga that was recovered before + // - not hanging on forever to sagas that have finished + // - the edge case built into our implementation where we learned that a + // saga was started before it appeared in the database + // } } diff --git a/nexus/src/app/background/tasks/saga_recovery/recovery.rs b/nexus/src/app/background/tasks/saga_recovery/recovery.rs index a15a33e27d..19a31a4b57 100644 --- a/nexus/src/app/background/tasks/saga_recovery/recovery.rs +++ b/nexus/src/app/background/tasks/saga_recovery/recovery.rs @@ -23,7 +23,7 @@ use tokio::sync::mpsc; #[derive(Debug, Clone, Eq, PartialEq)] pub struct RestState { sagas_started: BTreeMap, - remove_next: Vec, + remove_next: BTreeSet, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -41,7 +41,10 @@ enum SagaStartSource { impl RestState { pub fn new() -> RestState { - RestState { sagas_started: BTreeMap::new(), remove_next: Vec::new() } + RestState { + sagas_started: BTreeMap::new(), + remove_next: BTreeSet::new(), + } } /// Read messages from the channel (signaling sagas that have started @@ -102,6 +105,11 @@ impl RestState { self.remove_next = plan.sagas_maybe_done().collect(); } + + #[cfg(test)] + pub fn sagas_started(&self) -> Vec { + self.sagas_started.keys().copied().collect() + } } /// Read all message that are currently available on the given channel (without @@ -221,11 +229,15 @@ impl Plan { for running_saga_id in sagas_started.keys() { match running_sagas_found.remove(running_saga_id) { None => { - // The saga is in the ignore set, but not the database list - // of running sagas. It's possible that the saga has simply - // finished. And if the saga is definitely not running any - // more, then we can remove it from the ignore set. This is - // important to keep that set from growing without bound. + // If this saga is in `previously_maybe_done`, then we + // processed it above already. We know it's done. + // + // Otherwise, the saga is in the ignore set, but not the + // database list of running sagas. It's possible that the + // saga has simply finished. And if the saga is definitely + // not running any more, then we can remove it from the + // ignore set. This is important to keep that set from + // growing without bound. // // But it's also possible that the saga started immediately // after the database query's snapshot, in which case we @@ -236,7 +248,9 @@ impl Plan { // must have finished. Rather than do that now, we'll just // keep track of this list and take care of it in the next // activation. - builder.saga_maybe_done(*running_saga_id) + if !previously_maybe_done.contains(running_saga_id) { + builder.saga_maybe_done(*running_saga_id) + } } Some(_found_saga) => { @@ -524,14 +538,6 @@ impl ExecutionSummaryBuilder { #[cfg(test)] mod test { - // XXX-dap test plan: - // RestState: - // - update_started_sagas(): - // - normal update (x2) - // - empty channel - // - disconnected channel - // - update_after_pass(): - // - come back to this // XXX-dap write a stress test that furiously performs saga recovery a lot // of times and ensures that each saga is recovered exactly once use super::*; diff --git a/nexus/src/app/background/tasks/saga_recovery/status.rs b/nexus/src/app/background/tasks/saga_recovery/status.rs index e576bf30eb..43d5cabf57 100644 --- a/nexus/src/app/background/tasks/saga_recovery/status.rs +++ b/nexus/src/app/background/tasks/saga_recovery/status.rs @@ -110,8 +110,7 @@ impl LastPassSuccess { #[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub struct DebuggingHistory { size: usize, - // XXX-dap should not be pub - pub ring: VecDeque, + ring: VecDeque, } impl DebuggingHistory {