From 1a78465aac2b41a8e74ea4f76a6c533e7128b9e2 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Tue, 9 Jul 2024 10:59:13 -0700 Subject: [PATCH] it compiles --- .../src/app/background/tasks/saga_recovery.rs | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/nexus/src/app/background/tasks/saga_recovery.rs b/nexus/src/app/background/tasks/saga_recovery.rs index b23c604180..9c78680201 100644 --- a/nexus/src/app/background/tasks/saga_recovery.rs +++ b/nexus/src/app/background/tasks/saga_recovery.rs @@ -277,7 +277,7 @@ impl SagaRecovery { } } - fn activate_finish(&mut self, last_pass: LastPass) { + fn activate_finish(&mut self, last_pass: LastPass) -> serde_json::Value { self.last_pass = last_pass; serde_json::to_value(SagaRecoveryTaskStatus { @@ -289,10 +289,11 @@ impl SagaRecovery { } async fn recovery_execute( + &self, bgtask_log: &slog::Logger, plan: &SagaRecoveryPlan, ) -> SagaExecutionDone { - let builder = SagaExecutionDoneBuilder::new(bgtask_log); + let mut builder = SagaExecutionDoneBuilder::new(bgtask_log); for (saga_id, saga) in &plan.needs_recovery { let saga_log = self.nexus.log.new(o!( @@ -303,13 +304,13 @@ impl SagaRecovery { builder.saga_recovery_start(*saga_id, saga_log.clone()); match self.recover_one_saga(bgtask_log, &saga_log, saga).await { Ok(completion_future) => { - builder.saga_recovery_success(saga_id, completion_future); + builder.saga_recovery_success(*saga_id, completion_future); } Err(error) => { // It's essential that we not bail out early just because we // hit an error here. We want to recover all the sagas that // we can. - builder.saga_recovery_failure(saga_id, &error); + builder.saga_recovery_failure(*saga_id, &error); } } } @@ -390,9 +391,12 @@ impl BackgroundTask for SagaRecovery { // Fetch the list of not-yet-finished sagas that are assigned to // this Nexus instance. - let result = - list_sagas_in_progress(&self.saga_recovery_opctx, &self.sec_id) - .await; + let result = list_sagas_in_progress( + &self.saga_recovery_opctx, + datastore, + self.sec_id, + ) + .await; // Process any newly-created sagas, adding them to our set of sagas // to ignore during recovery. We never want to try to recover a @@ -428,8 +432,8 @@ impl BackgroundTask for SagaRecovery { &mut self.sagas_to_ignore, db_sagas, ); - let last_pass = - LastPass::Success(self.recovery_execute(log, &plan).await); + let execution = self.recovery_execute(log, &plan).await; + let last_pass = LastPass::Success(execution.to_last_pass_result()); self.activate_finish(last_pass) } .boxed() @@ -443,14 +447,14 @@ struct SagaRecoveryPlan { maybe_done: BTreeSet, } -impl SagaRecoveryPlan { +impl<'a> SagaRecoveryPlan { pub fn new( log: &slog::Logger, previously_maybe_done: &[steno::SagaId], sagas_started: &mut BTreeSet, - running_sagas_found: BTreeMap, + mut running_sagas_found: BTreeMap, ) -> SagaRecoveryPlan { - let builder = SagaRecoveryPlanBuilder::new(); + let mut builder = SagaRecoveryPlanBuilder::new(log); // First of all, remove finished sagas from our "ignore" set. // @@ -473,7 +477,7 @@ impl SagaRecoveryPlan { // created up to the beginning of the database query. Since we now have // the list of sagas that were not-finished in the database, we can // compare these two sets. - for running_saga_id in sagas_started { + for running_saga_id in sagas_started.iter() { match running_sagas_found.remove(running_saga_id) { None => { // The saga is in the ignore set, but not the database list @@ -522,7 +526,7 @@ impl SagaRecoveryPlan { // parallelize this since these operations should generally be quick, // and there shouldn't be too many sagas outstanding, and Nexus has // already crashed so they've experienced a bit of latency already. - for (saga_id, saga) in running_sagas_found { + for (saga_id, saga) in running_sagas_found.into_iter() { builder.saga_recovery_needed(saga_id, saga); } @@ -539,7 +543,7 @@ struct SagaRecoveryPlanBuilder<'a> { } impl<'a> SagaRecoveryPlanBuilder<'a> { - pub fn new(log: &'a slog::Logger) { + pub fn new(log: &'a slog::Logger) -> SagaRecoveryPlanBuilder { SagaRecoveryPlanBuilder { log, needs_recovery: BTreeMap::new(), @@ -549,7 +553,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> { } } - pub fn build(self) { + pub fn build(self) -> SagaRecoveryPlan { SagaRecoveryPlan { needs_recovery: self.needs_recovery, skipped_running: self.skipped_running, @@ -570,7 +574,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> { ) { // XXX-dap log entry // XXX-dap panic if already present - self.skipped_running.insert(saga_id) + self.skipped_running.insert(saga_id); } pub fn saga_recovery_maybe_done(&mut self, saga_id: steno::SagaId) { @@ -579,10 +583,14 @@ impl<'a> SagaRecoveryPlanBuilder<'a> { self.maybe_done.insert(saga_id); } - pub fn saga_recovery_needed(&mut self, saga_id: steno::SagaId) { + pub fn saga_recovery_needed( + &mut self, + saga_id: steno::SagaId, + saga: nexus_db_model::Saga, + ) { // XXX-dap log entry // XXX-dap panic if already present - self.needs_recovery.insert(saga_id); + self.needs_recovery.insert(saga_id, saga); } } @@ -593,20 +601,29 @@ struct SagaExecutionDone { completion_futures: Vec>>, } +impl SagaExecutionDone { + pub fn to_last_pass_result(&self) -> LastPassSuccess { + // XXX-dap + todo!(); + } +} + struct SagaExecutionDoneBuilder<'a> { log: &'a slog::Logger, in_progress: BTreeMap, succeeded: Vec, - failed: Vec, + failed: Vec, + completion_futures: Vec>>, } impl<'a> SagaExecutionDoneBuilder<'a> { pub fn new(log: &'a slog::Logger) -> SagaExecutionDoneBuilder<'a> { SagaExecutionDoneBuilder { log, - in_progress: BTreeSet::new(), + in_progress: BTreeMap::new(), succeeded: Vec::new(), failed: Vec::new(), + completion_futures: Vec::new(), } } @@ -616,7 +633,11 @@ impl<'a> SagaExecutionDoneBuilder<'a> { "attempted to build execution result while some recoveries are \ still in progress" ); - SagaExecutionDone { succeeded: self.succeeded, failed: self.failed } + SagaExecutionDone { + succeeded: self.succeeded, + failed: self.failed, + completion_futures: self.completion_futures, + } } pub fn saga_recovery_start( @@ -634,7 +655,7 @@ impl<'a> SagaExecutionDoneBuilder<'a> { completion_future: BoxFuture<'static, Result<(), Error>>, ) { // XXX-dap log entry - self.in_progress.remove(saga_id); + self.in_progress.remove(&saga_id); self.succeeded.push(RecoverySuccess { time: Utc::now(), saga_id }); self.completion_futures.push(completion_future); } @@ -645,7 +666,7 @@ impl<'a> SagaExecutionDoneBuilder<'a> { error: &Error, ) { // XXX-dap log entry - self.in_progress.remove(saga_id); + self.in_progress.remove(&saga_id); self.failed.push(RecoveryFailure { time: Utc::now(), saga_id, @@ -658,15 +679,13 @@ impl<'a> SagaExecutionDoneBuilder<'a> { async fn list_sagas_in_progress( opctx: &OpContext, + datastore: &DataStore, sec_id: db::SecId, ) -> Result, Error> { let log = &opctx.log; debug!(log, "listing candidate sagas for recovery"); let result = datastore - .saga_list_recovery_candidates_batched( - &self.saga_recovery_opctx, - &self.sec_id, - ) + .saga_list_recovery_candidates_batched(&opctx, &sec_id) .await .internal_context("listing in-progress sagas for saga recovery") .map(|list| { @@ -674,7 +693,7 @@ async fn list_sagas_in_progress( .map(|saga| (saga.id.into(), saga)) .collect::>() }); - match result { + match &result { Ok(list) => { info!(log, "listed in-progress sagas"; "count" => list.len()); } @@ -682,11 +701,12 @@ async fn list_sagas_in_progress( warn!(log, "failed to list in-progress sagas"; error); } }; + result } -async fn update_sagas_started( +fn update_sagas_started( set: &mut BTreeSet, - rx: &mut mpsc::UnboundedReceiver, + rx: &mut mpsc::UnboundedReceiver, ) { let (new_sagas, disconnected) = read_all_from_channel(rx); // XXX-dap warn on disconnected