Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move saga recovery to a background task #6063

Merged
merged 9 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions nexus/saga-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Saga recovery bookkeeping
//! # Saga recovery bookkeeping
//!
//! If you're reading this, you first want to read the big block comment in the
//! saga recovery background task. It explains important background about what
Expand Down Expand Up @@ -52,19 +52,19 @@
//! |
//! 2. list in-progress sagas |
//! |
//! 3. collect list of sagas ------> use RestState::update_started_sagas()
//! started by Nexus |
//! 3. collect list of sagas ------> use
//! started by Nexus | `RestState::update_started_sagas()`
//! |
//! 4. make a plan ----------------> use Plan::new()
//! 4. make a plan ----------------> use `Plan::new()`
//! | This is where all the decisions
//! | about saga recovery get made.
//! |
//! 5. follow the plan -------------> use Plan::sagas_needing_recovery()
//! 5. follow the plan -------------> use `Plan::sagas_needing_recovery()`
//! |
//! fetch details from db |
//! load sagas into Steno |
//! | use ExecutionBuilder::new() to report
//! | what's going on
//! | use `ExecutionBuilder::new()` to
//! | report what's going on
//! |
//! 6. update `RestState` and -----> use `RestState::update_after_pass()`
//! `Report` | and `Report::update_after_pass()`
Expand Down
14 changes: 7 additions & 7 deletions nexus/saga-recovery/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn read_all_from_channel<T>(
/// Describes what should happen during a particular recovery pass
///
/// This is constructed by the saga recovery background task via
/// [`Plan::new()`]. That function uses `PlanBuilder`
/// [`Plan::new()`].
///
/// This structure is also much more detailed than it needs to be to support
/// better observability and testing.
Expand Down Expand Up @@ -318,7 +318,7 @@ struct PlanBuilder<'a> {

impl<'a> PlanBuilder<'a> {
davepacheco marked this conversation as resolved.
Show resolved Hide resolved
/// Begin building a `Plan`
pub fn new(log: &'a slog::Logger) -> PlanBuilder {
fn new(log: &'a slog::Logger) -> PlanBuilder {
PlanBuilder {
log,
needs_recovery: BTreeMap::new(),
Expand All @@ -329,7 +329,7 @@ impl<'a> PlanBuilder<'a> {
}

/// Turn this into a `Plan`
pub fn build(self) -> Plan {
fn build(self) -> Plan {
Plan {
needs_recovery: self.needs_recovery,
skipped_running: self.skipped_running,
Expand All @@ -341,7 +341,7 @@ impl<'a> PlanBuilder<'a> {
/// Record that this saga appears to be done, based on it being missing from
/// two different database queries for in-progress sagas with no intervening
/// indication that a saga with this id was started in the meantime
pub fn saga_infer_done(&mut self, saga_id: SagaId) {
fn saga_infer_done(&mut self, saga_id: SagaId) {
info!(
self.log,
"found saga that appears to be done \
Expand All @@ -356,7 +356,7 @@ impl<'a> PlanBuilder<'a> {

/// Record that no action is needed for this saga in this recovery pass
/// because it appears to already be running
pub fn saga_recovery_not_needed(&mut self, saga_id: SagaId) {
fn saga_recovery_not_needed(&mut self, saga_id: SagaId) {
debug!(
self.log,
"found saga that can be ignored (already running)";
Expand All @@ -376,7 +376,7 @@ impl<'a> PlanBuilder<'a> {
/// solution is to only consider sagas done that are missing for two
/// consecutive database queries with no intervening report that a saga with
/// that id has just started.
pub fn saga_maybe_done(&mut self, saga_id: SagaId) {
fn saga_maybe_done(&mut self, saga_id: SagaId) {
debug!(
self.log,
"found saga that may be done (will be sure on the next pass)";
Expand All @@ -390,7 +390,7 @@ impl<'a> PlanBuilder<'a> {

/// Record that this saga needs to be recovered, based on it being "in
/// progress" according to the database but not yet resumed in this process
pub fn saga_recovery_needed(
fn saga_recovery_needed(
&mut self,
saga_id: SagaId,
saga: nexus_db_model::Saga,
Expand Down
35 changes: 17 additions & 18 deletions nexus/src/app/background/tasks/saga_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

// XXX-dap at the end, verify:
// - counters (maybe plumb these into Oximeter?)
// - task status reported by omdb
// - log entries
// - test coverage
// XXX-dap write up summary for PR, including the option of doing recovery in
// Steno with read methods on SecStore? also could have added purge option to
// SEC and then relied on Steno to know if a thing was alive. (can't do it
// without a purge that's invoked by recovery)
// XXX-dap write a stress test that furiously performs saga recovery a lot
// of times and ensures that each saga is recovered exactly once

//! Saga recovery
davepacheco marked this conversation as resolved.
Show resolved Hide resolved
//!
//! ## Review of distributed sagas
Expand Down Expand Up @@ -342,12 +330,7 @@ impl<N: MakeSagaContext> SagaRecovery<N> {
// and there shouldn't be too many sagas outstanding, and Nexus has
// already crashed so they've experienced a bit of latency already.
for (saga_id, saga) in plan.sagas_needing_recovery() {
// XXX-dap used to be self.nexus.log. Is this okay?
let saga_log = bgtask_log.new(o!(
"saga_name" => saga.name.clone(),
"saga_id" => saga_id.to_string(),
));

let saga_log = self.maker.make_saga_log(*saga_id, &saga.name);
builder.saga_recovery_start(*saga_id, saga_log.clone());
match self.recover_one_saga(bgtask_log, &saga_log, saga).await {
Ok(completion_future) => {
Expand Down Expand Up @@ -467,6 +450,8 @@ pub trait MakeSagaContext: Send + Sync {
&self,
log: slog::Logger,
) -> Arc<<Self::SagaType as steno::SagaType>::ExecContextType>;

fn make_saga_log(&self, id: SagaId, name: &str) -> slog::Logger;
}

impl MakeSagaContext for Arc<Nexus> {
Expand All @@ -478,6 +463,13 @@ impl MakeSagaContext for Arc<Nexus> {
// for our type. Hence we need two Arcs.
Arc::new(Arc::new(SagaContext::new(self.clone(), log)))
}

fn make_saga_log(&self, id: SagaId, name: &str) -> slog::Logger {
self.log.new(o!(
"saga_name" => name.to_owned(),
"saga_id" => id.to_string(),
))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -572,6 +564,13 @@ mod test {
fn make_saga_context(&self, _log: slog::Logger) -> Arc<TestContext> {
self.clone()
}

fn make_saga_log(&self, id: SagaId, name: &str) -> slog::Logger {
self.log.new(o!(
"saga_name" => name.to_owned(),
"saga_id" => id.to_string(),
))
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}

static ACTION_N1: Lazy<Arc<dyn Action<TestOp>>> =
Expand Down
Loading