Skip to content

Commit

Permalink
moar debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Jul 10, 2024
1 parent cd41c9e commit 7aec9da
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions nexus/src/app/background/tasks/saga_recovery/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use super::status::LastPassSuccess;
use super::status::RecoveryFailure;
use super::status::RecoverySuccess;
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
#[cfg(test)]
use futures::FutureExt;
Expand All @@ -21,18 +21,28 @@ use tokio::sync::mpsc;
/// Describes state related to saga recovery that needs to be maintained across
/// multiple passes
pub struct RestState {
// XXX-dap could be BTreeMap of some object saying created/resumed and when
sagas_started: BTreeSet<steno::SagaId>,
sagas_started: BTreeMap<steno::SagaId, SagaStartInfo>,
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
remove_next: Vec<steno::SagaId>,
}

#[allow(dead_code)]
struct SagaStartInfo {
time_observed: DateTime<Utc>,
source: SagaStartSource,
}

enum SagaStartSource {
StartChannel,
Recovered,
}

impl RestState {
pub fn new(
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
) -> RestState {
RestState {
sagas_started: BTreeSet::new(),
sagas_started: BTreeMap::new(),
sagas_started_rx,
remove_next: Vec::new(),
}
Expand All @@ -50,9 +60,19 @@ impl RestState {
);
}

let time_observed = Utc::now();
for saga_id in new_sagas {
info!(log, "observed saga start"; "saga_id" => ?saga_id);
assert!(self.sagas_started.insert(saga_id));
assert!(self
.sagas_started
.insert(
saga_id,
SagaStartInfo {
time_observed,
source: SagaStartSource::StartChannel,
}
)
.is_none());
}
}

Expand All @@ -62,12 +82,23 @@ impl RestState {
plan: &Plan,
execution: &ExecutionSummary,
) {
let time_observed = Utc::now();

for saga_id in plan.sagas_inferred_done() {
assert!(self.sagas_started.remove(&saga_id));
assert!(self.sagas_started.remove(&saga_id).is_some());
}

for saga_id in execution.sagas_recovered_successfully() {
assert!(self.sagas_started.insert(saga_id));
assert!(self
.sagas_started
.insert(
saga_id,
SagaStartInfo {
time_observed,
source: SagaStartSource::Recovered,
}
)
.is_none());
}

self.remove_next = plan.sagas_maybe_done().collect();
Expand Down Expand Up @@ -189,7 +220,7 @@ impl<'a> Plan {
// created up to the beginning of the database query. Since we now have
// the list of sagas that were not-finished in the database, we can
// compare these two sets.
for running_saga_id in sagas_started.iter() {
for running_saga_id in sagas_started.keys() {
match running_sagas_found.remove(running_saga_id) {
None => {
// The saga is in the ignore set, but not the database list
Expand Down

0 comments on commit 7aec9da

Please sign in to comment.