Skip to content

Commit

Permalink
earlier change was overzealous
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Jul 10, 2024
1 parent 7aec9da commit 84a4ae3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
20 changes: 8 additions & 12 deletions nexus/src/app/background/tasks/saga_recovery/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use tokio::sync::mpsc;
/// multiple passes
pub struct RestState {
sagas_started: BTreeMap<steno::SagaId, SagaStartInfo>,
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
remove_next: Vec<steno::SagaId>,
}

Expand All @@ -38,21 +37,18 @@ enum SagaStartSource {
}

impl RestState {
pub fn new(
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
) -> RestState {
RestState {
sagas_started: BTreeMap::new(),
sagas_started_rx,
remove_next: Vec::new(),
}
pub fn new() -> RestState {
RestState { sagas_started: BTreeMap::new(), remove_next: Vec::new() }
}

/// Read messages from the channel (signaling sagas that have started
/// running) and update our set of sagas that we believe to be running.
pub fn update_started_sagas(&mut self, log: &slog::Logger) {
let (new_sagas, disconnected) =
read_all_from_channel(&mut self.sagas_started_rx);
pub fn update_started_sagas(
&mut self,
log: &slog::Logger,
sagas_started_rx: &mut mpsc::UnboundedReceiver<steno::SagaId>,
) {
let (new_sagas, disconnected) = read_all_from_channel(sagas_started_rx);
if disconnected {
warn!(
log,
Expand Down
7 changes: 5 additions & 2 deletions nexus/src/app/background/tasks/saga_recovery/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub struct SagaRecovery {
nexus: Arc<Nexus>,
sec_client: Arc<steno::SecClient>,
registry: Arc<ActionRegistry>,
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,

/// recovery state persisted between passes
rest_state: recovery::RestState,
Expand All @@ -193,7 +194,8 @@ impl SagaRecovery {
nexus,
sec_client: sec,
registry,
rest_state: recovery::RestState::new(sagas_started_rx),
sagas_started_rx,
rest_state: recovery::RestState::new(),
status: status::Report::new(),
}
}
Expand Down Expand Up @@ -325,7 +327,8 @@ impl BackgroundTask for SagaRecovery {
// was immediately created and showed up in our candidate list, we'd
// erroneously conclude that it needed to be recovered when in fact
// it was already running.
self.rest_state.update_started_sagas(log);
self.rest_state
.update_started_sagas(log, &mut self.sagas_started_rx);

match result {
Ok(db_sagas) => {
Expand Down

0 comments on commit 84a4ae3

Please sign in to comment.