Skip to content

Commit

Permalink
clean up SagaId import
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Jul 10, 2024
1 parent 84a4ae3 commit 553c276
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
56 changes: 25 additions & 31 deletions nexus/src/app/background/tasks/saga_recovery/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use omicron_common::api::external::Error;
use slog_error_chain::InlineErrorChain;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use steno::SagaId;
use tokio::sync::mpsc;

/// Describes state related to saga recovery that needs to be maintained across
/// multiple passes
pub struct RestState {
sagas_started: BTreeMap<steno::SagaId, SagaStartInfo>,
remove_next: Vec<steno::SagaId>,
sagas_started: BTreeMap<SagaId, SagaStartInfo>,
remove_next: Vec<SagaId>,
}

#[allow(dead_code)]
Expand All @@ -46,7 +47,7 @@ impl RestState {
pub fn update_started_sagas(
&mut self,
log: &slog::Logger,
sagas_started_rx: &mut mpsc::UnboundedReceiver<steno::SagaId>,
sagas_started_rx: &mut mpsc::UnboundedReceiver<SagaId>,
) {
let (new_sagas, disconnected) = read_all_from_channel(sagas_started_rx);
if disconnected {
Expand Down Expand Up @@ -141,17 +142,17 @@ fn read_all_from_channel<T>(
/// better observability and testing.
pub struct Plan {
/// sagas that need to be recovered
needs_recovery: BTreeMap<steno::SagaId, nexus_db_model::Saga>,
needs_recovery: BTreeMap<SagaId, nexus_db_model::Saga>,

/// sagas that were found in the database to be in-progress, but that don't
/// need to be recovered because they are either already running or have
/// actually finished
skipped_running: BTreeSet<steno::SagaId>,
skipped_running: BTreeSet<SagaId>,

/// sagas that we infer have finished because they were missing from two
/// consecutive database queries for in-progress sagas (with no intervening
/// message indicating that they had been started)
inferred_done: BTreeSet<steno::SagaId>,
inferred_done: BTreeSet<SagaId>,

/// sagas that may be done, but we can't tell yet. These are sagas where we
/// previously had them running in this process and the database state now
Expand All @@ -160,7 +161,7 @@ pub struct Plan {
/// whether the saga finished or just started. We'll be able to tell during
/// the next pass and if it's done at that point then these sagas will move
/// to `inferred_done`.
maybe_done: BTreeSet<steno::SagaId>,
maybe_done: BTreeSet<SagaId>,
}

impl<'a> Plan {
Expand Down Expand Up @@ -189,7 +190,7 @@ impl<'a> Plan {
pub fn new(
log: &slog::Logger,
rest_state: &RestState,
mut running_sagas_found: BTreeMap<steno::SagaId, nexus_db_model::Saga>,
mut running_sagas_found: BTreeMap<SagaId, nexus_db_model::Saga>,
) -> Plan {
let mut builder = SagaRecoveryPlanBuilder::new(log);
let sagas_started = &rest_state.sagas_started;
Expand Down Expand Up @@ -275,32 +276,29 @@ impl<'a> Plan {
/// Iterate over the sagas that need to be recovered
pub fn sagas_needing_recovery(
&self,
) -> impl Iterator<Item = (&steno::SagaId, &nexus_db_model::Saga)> + '_
{
) -> impl Iterator<Item = (&SagaId, &nexus_db_model::Saga)> + '_ {
self.needs_recovery.iter()
}

/// Iterate over the sagas that were inferred to be done
pub fn sagas_inferred_done(
&self,
) -> impl Iterator<Item = steno::SagaId> + '_ {
pub fn sagas_inferred_done(&self) -> impl Iterator<Item = SagaId> + '_ {
self.inferred_done.iter().copied()
}

/// Iterate over the sagas that should be checked on the next pass to see if
/// they're done
pub fn sagas_maybe_done(&self) -> impl Iterator<Item = steno::SagaId> + '_ {
pub fn sagas_maybe_done(&self) -> impl Iterator<Item = SagaId> + '_ {
self.maybe_done.iter().copied()
}
}

/// Internal helper used to construct `SagaRecoveryPlan`
struct SagaRecoveryPlanBuilder<'a> {
log: &'a slog::Logger,
needs_recovery: BTreeMap<steno::SagaId, nexus_db_model::Saga>,
skipped_running: BTreeSet<steno::SagaId>,
inferred_done: BTreeSet<steno::SagaId>,
maybe_done: BTreeSet<steno::SagaId>,
needs_recovery: BTreeMap<SagaId, nexus_db_model::Saga>,
skipped_running: BTreeSet<SagaId>,
inferred_done: BTreeSet<SagaId>,
maybe_done: BTreeSet<SagaId>,
}

impl<'a> SagaRecoveryPlanBuilder<'a> {
Expand Down Expand Up @@ -328,7 +326,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
/// Record that this saga appears to be done, based on it being missing from
/// two different database queries for in-progress sagas with no intervening
/// indication that a saga with this id was started in the meantime
pub fn saga_infer_done(&mut self, saga_id: steno::SagaId) {
pub fn saga_infer_done(&mut self, saga_id: SagaId) {
info!(
self.log,
"found saga that appears to be done \
Expand All @@ -345,7 +343,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
/// because it appears to already be running
pub fn saga_recovery_not_needed(
&mut self,
saga_id: steno::SagaId,
saga_id: SagaId,
reason: &'static str,
) {
debug!(
Expand All @@ -368,7 +366,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
/// solution is to only consider sagas done that are missing for two
/// consecutive database queries with no intervening report that a saga with
/// that id has just started.
pub fn saga_recovery_maybe_done(&mut self, saga_id: steno::SagaId) {
pub fn saga_recovery_maybe_done(&mut self, saga_id: SagaId) {
debug!(
self.log,
"found saga that may be done (will be sure on the next pass)";
Expand All @@ -384,7 +382,7 @@ impl<'a> SagaRecoveryPlanBuilder<'a> {
/// progress" according to the database but not yet resumed in this process
pub fn saga_recovery_needed(
&mut self,
saga_id: steno::SagaId,
saga_id: SagaId,
saga: nexus_db_model::Saga,
) {
info!(
Expand Down Expand Up @@ -434,7 +432,7 @@ impl<'a> ExecutionSummary<'a> {
/// Iterate over the sagas that were successfully recovered during this pass
pub fn sagas_recovered_successfully(
&self,
) -> impl Iterator<Item = steno::SagaId> + '_ {
) -> impl Iterator<Item = SagaId> + '_ {
self.succeeded.iter().map(|s| s.saga_id)
}

Expand All @@ -456,7 +454,7 @@ impl<'a> ExecutionSummary<'a> {

pub struct ExecutionSummaryBuilder<'a> {
plan: &'a Plan,
in_progress: BTreeMap<steno::SagaId, slog::Logger>,
in_progress: BTreeMap<SagaId, slog::Logger>,
succeeded: Vec<RecoverySuccess>,
failed: Vec<RecoveryFailure>,
#[cfg(test)]
Expand Down Expand Up @@ -493,7 +491,7 @@ impl<'a> ExecutionSummaryBuilder<'a> {
/// Record that we've started recovering this saga
pub fn saga_recovery_start(
&mut self,
saga_id: steno::SagaId,
saga_id: SagaId,
saga_logger: slog::Logger,
) {
info!(&saga_logger, "recovering saga: start");
Expand All @@ -503,7 +501,7 @@ impl<'a> ExecutionSummaryBuilder<'a> {
/// Record that we've successfully recovered this saga
pub fn saga_recovery_success(
&mut self,
saga_id: steno::SagaId,
saga_id: SagaId,
completion_future: BoxFuture<'static, Result<(), Error>>,
) {
let saga_logger = self
Expand All @@ -517,11 +515,7 @@ impl<'a> ExecutionSummaryBuilder<'a> {
}

/// Record that we failed to recover this saga
pub fn saga_recovery_failure(
&mut self,
saga_id: steno::SagaId,
error: &Error,
) {
pub fn saga_recovery_failure(&mut self, saga_id: SagaId, error: &Error) {
let saga_logger = self
.in_progress
.remove(&saga_id)
Expand Down
11 changes: 6 additions & 5 deletions nexus/src/app/background/tasks/saga_recovery/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ use omicron_common::api::external::Error;
use omicron_common::api::external::InternalContext;
use std::collections::BTreeMap;
use std::sync::Arc;
use steno::SagaId;
use tokio::sync::mpsc;
use uuid::Uuid;

Expand All @@ -168,7 +169,7 @@ pub struct SagaRecovery {
nexus: Arc<Nexus>,
sec_client: Arc<steno::SecClient>,
registry: Arc<ActionRegistry>,
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
sagas_started_rx: mpsc::UnboundedReceiver<SagaId>,

/// recovery state persisted between passes
rest_state: recovery::RestState,
Expand All @@ -185,7 +186,7 @@ impl SagaRecovery {
nexus: Arc<Nexus>,
sec: Arc<steno::SecClient>,
registry: Arc<ActionRegistry>,
sagas_started_rx: mpsc::UnboundedReceiver<steno::SagaId>,
sagas_started_rx: mpsc::UnboundedReceiver<SagaId>,
) -> SagaRecovery {
SagaRecovery {
datastore,
Expand Down Expand Up @@ -237,7 +238,7 @@ impl SagaRecovery {
saga: &nexus_db_model::Saga,
) -> Result<BoxFuture<'static, Result<(), Error>>, Error> {
let datastore = &self.datastore;
let saga_id: steno::SagaId = saga.id.into();
let saga_id: SagaId = saga.id.into();

let log_events = datastore
.saga_fetch_log_batched(&self.saga_recovery_opctx, saga)
Expand Down Expand Up @@ -354,7 +355,7 @@ async fn list_sagas_in_progress(
opctx: &OpContext,
datastore: &DataStore,
sec_id: db::SecId,
) -> Result<BTreeMap<steno::SagaId, nexus_db_model::saga_types::Saga>, Error> {
) -> Result<BTreeMap<SagaId, nexus_db_model::saga_types::Saga>, Error> {
let log = &opctx.log;
debug!(log, "listing candidate sagas for recovery");
let result = datastore
Expand All @@ -364,7 +365,7 @@ async fn list_sagas_in_progress(
.map(|list| {
list.into_iter()
.map(|saga| (saga.id.into(), saga))
.collect::<BTreeMap<steno::SagaId, nexus_db_model::Saga>>()
.collect::<BTreeMap<SagaId, nexus_db_model::Saga>>()
});
match &result {
Ok(list) => {
Expand Down

0 comments on commit 553c276

Please sign in to comment.