Skip to content

Commit

Permalink
it compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Jul 9, 2024
1 parent 8017bb7 commit 1a78465
Showing 1 changed file with 51 additions and 31 deletions.
82 changes: 51 additions & 31 deletions nexus/src/app/background/tasks/saga_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl SagaRecovery {
}
}

fn activate_finish(&mut self, last_pass: LastPass) {
fn activate_finish(&mut self, last_pass: LastPass) -> serde_json::Value {
self.last_pass = last_pass;

serde_json::to_value(SagaRecoveryTaskStatus {
Expand All @@ -289,10 +289,11 @@ impl SagaRecovery {
}

async fn recovery_execute(
&self,
bgtask_log: &slog::Logger,
plan: &SagaRecoveryPlan,
) -> SagaExecutionDone {
let builder = SagaExecutionDoneBuilder::new(bgtask_log);
let mut builder = SagaExecutionDoneBuilder::new(bgtask_log);

for (saga_id, saga) in &plan.needs_recovery {
let saga_log = self.nexus.log.new(o!(
Expand All @@ -303,13 +304,13 @@ impl SagaRecovery {
builder.saga_recovery_start(*saga_id, saga_log.clone());
match self.recover_one_saga(bgtask_log, &saga_log, saga).await {
Ok(completion_future) => {
builder.saga_recovery_success(saga_id, completion_future);
builder.saga_recovery_success(*saga_id, completion_future);
}
Err(error) => {
// It's essential that we not bail out early just because we
// hit an error here. We want to recover all the sagas that
// we can.
builder.saga_recovery_failure(saga_id, &error);
builder.saga_recovery_failure(*saga_id, &error);
}
}
}
Expand Down Expand Up @@ -390,9 +391,12 @@ impl BackgroundTask for SagaRecovery {

// Fetch the list of not-yet-finished sagas that are assigned to
// this Nexus instance.
let result =
list_sagas_in_progress(&self.saga_recovery_opctx, &self.sec_id)
.await;
let result = list_sagas_in_progress(
&self.saga_recovery_opctx,
datastore,
self.sec_id,
)
.await;

// Process any newly-created sagas, adding them to our set of sagas
// to ignore during recovery. We never want to try to recover a
Expand Down Expand Up @@ -428,8 +432,8 @@ impl BackgroundTask for SagaRecovery {
&mut self.sagas_to_ignore,
db_sagas,
);
let last_pass =
LastPass::Success(self.recovery_execute(log, &plan).await);
let execution = self.recovery_execute(log, &plan).await;
let last_pass = LastPass::Success(execution.to_last_pass_result());
self.activate_finish(last_pass)
}
.boxed()
Expand All @@ -443,14 +447,14 @@ struct SagaRecoveryPlan {
maybe_done: BTreeSet<steno::SagaId>,
}

impl SagaRecoveryPlan {
impl<'a> SagaRecoveryPlan {
pub fn new(
log: &slog::Logger,
previously_maybe_done: &[steno::SagaId],
sagas_started: &mut BTreeSet<steno::SagaId>,
running_sagas_found: BTreeMap<steno::SagaId, nexus_db_model::Saga>,
mut running_sagas_found: BTreeMap<steno::SagaId, nexus_db_model::Saga>,
) -> SagaRecoveryPlan {
let builder = SagaRecoveryPlanBuilder::new();
let mut builder = SagaRecoveryPlanBuilder::new(log);

// First of all, remove finished sagas from our "ignore" set.
//
Expand All @@ -473,7 +477,7 @@ impl SagaRecoveryPlan {
// created up to the beginning of the database query. Since we now have
// the list of sagas that were not-finished in the database, we can
// compare these two sets.
for running_saga_id in sagas_started {
for running_saga_id in sagas_started.iter() {
match running_sagas_found.remove(running_saga_id) {
None => {
// The saga is in the ignore set, but not the database list
Expand Down Expand Up @@ -522,7 +526,7 @@ impl SagaRecoveryPlan {
// parallelize this since these operations should generally be quick,
// and there shouldn't be too many sagas outstanding, and Nexus has
// already crashed so they've experienced a bit of latency already.
for (saga_id, saga) in running_sagas_found {
for (saga_id, saga) in running_sagas_found.into_iter() {
builder.saga_recovery_needed(saga_id, saga);
}

Expand All @@ -539,7 +543,7 @@ struct SagaRecoveryPlanBuilder<'a> {
}

impl<'a> SagaRecoveryPlanBuilder<'a> {
pub fn new(log: &'a slog::Logger) {
pub fn new(log: &'a slog::Logger) -> SagaRecoveryPlanBuilder {
SagaRecoveryPlanBuilder {
log,
needs_recovery: BTreeMap::new(),
Expand All @@ -549,7 +553,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
}
}

pub fn build(self) {
pub fn build(self) -> SagaRecoveryPlan {
SagaRecoveryPlan {
needs_recovery: self.needs_recovery,
skipped_running: self.skipped_running,
Expand All @@ -570,7 +574,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
) {
// XXX-dap log entry
// XXX-dap panic if already present
self.skipped_running.insert(saga_id)
self.skipped_running.insert(saga_id);
}

pub fn saga_recovery_maybe_done(&mut self, saga_id: steno::SagaId) {
Expand All @@ -579,10 +583,14 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
self.maybe_done.insert(saga_id);
}

pub fn saga_recovery_needed(&mut self, saga_id: steno::SagaId) {
pub fn saga_recovery_needed(
&mut self,
saga_id: steno::SagaId,
saga: nexus_db_model::Saga,
) {
// XXX-dap log entry
// XXX-dap panic if already present
self.needs_recovery.insert(saga_id);
self.needs_recovery.insert(saga_id, saga);
}
}

Expand All @@ -593,20 +601,29 @@ struct SagaExecutionDone {
completion_futures: Vec<BoxFuture<'static, Result<(), Error>>>,
}

impl SagaExecutionDone {
pub fn to_last_pass_result(&self) -> LastPassSuccess {
// XXX-dap
todo!();
}
}

struct SagaExecutionDoneBuilder<'a> {
log: &'a slog::Logger,
in_progress: BTreeMap<steno::SagaId, slog::Logger>,
succeeded: Vec<RecoverySuccess>,
failed: Vec<RecoveryFailed>,
failed: Vec<RecoveryFailure>,
completion_futures: Vec<BoxFuture<'static, Result<(), Error>>>,
}

impl<'a> SagaExecutionDoneBuilder<'a> {
pub fn new(log: &'a slog::Logger) -> SagaExecutionDoneBuilder<'a> {
SagaExecutionDoneBuilder {
log,
in_progress: BTreeSet::new(),
in_progress: BTreeMap::new(),
succeeded: Vec::new(),
failed: Vec::new(),
completion_futures: Vec::new(),
}
}

Expand All @@ -616,7 +633,11 @@ impl<'a> SagaExecutionDoneBuilder<'a> {
"attempted to build execution result while some recoveries are \
still in progress"
);
SagaExecutionDone { succeeded: self.succeeded, failed: self.failed }
SagaExecutionDone {
succeeded: self.succeeded,
failed: self.failed,
completion_futures: self.completion_futures,
}
}

pub fn saga_recovery_start(
Expand All @@ -634,7 +655,7 @@ impl<'a> SagaExecutionDoneBuilder<'a> {
completion_future: BoxFuture<'static, Result<(), Error>>,
) {
// XXX-dap log entry
self.in_progress.remove(saga_id);
self.in_progress.remove(&saga_id);
self.succeeded.push(RecoverySuccess { time: Utc::now(), saga_id });
self.completion_futures.push(completion_future);
}
Expand All @@ -645,7 +666,7 @@ impl<'a> SagaExecutionDoneBuilder<'a> {
error: &Error,
) {
// XXX-dap log entry
self.in_progress.remove(saga_id);
self.in_progress.remove(&saga_id);
self.failed.push(RecoveryFailure {
time: Utc::now(),
saga_id,
Expand All @@ -658,35 +679,34 @@ impl<'a> SagaExecutionDoneBuilder<'a> {

async fn list_sagas_in_progress(
opctx: &OpContext,
datastore: &DataStore,
sec_id: db::SecId,
) -> Result<BTreeMap<steno::SagaId, nexus_db_model::saga_types::Saga>, Error> {
let log = &opctx.log;
debug!(log, "listing candidate sagas for recovery");
let result = datastore
.saga_list_recovery_candidates_batched(
&self.saga_recovery_opctx,
&self.sec_id,
)
.saga_list_recovery_candidates_batched(&opctx, &sec_id)
.await
.internal_context("listing in-progress sagas for saga recovery")
.map(|list| {
list.into_iter()
.map(|saga| (saga.id.into(), saga))
.collect::<BTreeMap<steno::SagaId, nexus_db_model::Saga>>()
});
match result {
match &result {
Ok(list) => {
info!(log, "listed in-progress sagas"; "count" => list.len());
}
Err(error) => {
warn!(log, "failed to list in-progress sagas"; error);
}
};
result
}

async fn update_sagas_started(
fn update_sagas_started(
set: &mut BTreeSet<steno::SagaId>,
rx: &mut mpsc::UnboundedReceiver<T>,
rx: &mut mpsc::UnboundedReceiver<steno::SagaId>,
) {
let (new_sagas, disconnected) = read_all_from_channel(rx);
// XXX-dap warn on disconnected
Expand Down

0 comments on commit 1a78465

Please sign in to comment.