Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Jul 3, 2024
1 parent 991c195 commit 5e44f02
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 187 deletions.
17 changes: 16 additions & 1 deletion nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,8 @@ pub struct BackgroundTaskConfig {
pub v2p_mapping_propagation: V2PMappingPropagationConfig,
/// configuration for abandoned VMM reaper task
pub abandoned_vmm_reaper: AbandonedVmmReaperConfig,
/// configuration for saga recovery task
pub saga_recovery: SagaRecoveryConfig,
}

#[serde_as]
Expand Down Expand Up @@ -566,6 +568,14 @@ pub struct AbandonedVmmReaperConfig {
pub period_secs: Duration,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct SagaRecoveryConfig {
/// period (in seconds) for periodic activations of this background task
#[serde_as(as = "DurationSeconds<u64>")]
pub period_secs: Duration,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RegionReplacementDriverConfig {
Expand Down Expand Up @@ -816,6 +826,7 @@ mod test {
service_firewall_propagation.period_secs = 300
v2p_mapping_propagation.period_secs = 30
abandoned_vmm_reaper.period_secs = 60
saga_recovery.period_secs = 60
[default_region_allocation_strategy]
type = "random"
seed = 0
Expand Down Expand Up @@ -962,7 +973,10 @@ mod test {
},
abandoned_vmm_reaper: AbandonedVmmReaperConfig {
period_secs: Duration::from_secs(60),
}
},
saga_recovery: SagaRecoveryConfig {
period_secs: Duration::from_secs(60),
},
},
default_region_allocation_strategy:
crate::nexus_config::RegionAllocationStrategy::Random {
Expand Down Expand Up @@ -1035,6 +1049,7 @@ mod test {
service_firewall_propagation.period_secs = 300
v2p_mapping_propagation.period_secs = 30
abandoned_vmm_reaper.period_secs = 60
saga_recovery.period_secs = 60
[default_region_allocation_strategy]
type = "random"
"##,
Expand Down
2 changes: 1 addition & 1 deletion nexus/db-queries/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub use config::Config;
pub use datastore::DataStore;
pub use on_conflict_ext::IncompleteOnConflictExt;
pub use pool::{DbConnection, Pool};
pub use saga_recovery::{recover, CompletionTask, RecoveryTask};
pub use saga_recovery::{recover, CompletionTask};
pub use saga_types::SecId;
pub use sec_store::CockroachDbSecStore;

Expand Down
232 changes: 87 additions & 145 deletions nexus/db-queries/src/db/saga_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,11 @@ use futures::{future::BoxFuture, TryFutureExt};
use omicron_common::api::external::Error;
use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use omicron_common::backoff::retry_notify;
use omicron_common::backoff::retry_policy_internal_service;
use omicron_common::backoff::BackoffError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

/// Result type of a [`RecoveryTask`].
pub type RecoveryResult = Result<CompletionTask, Error>;

/// A future which completes once sagas have been loaded and resumed.
/// Note that this does not necessarily mean the sagas have completed
/// execution.
///
/// Returns a Result of either:
/// - A [`CompletionTask`] to track the completion of the resumed sagas, or
/// - An [`Error`] encountered when attempting to load and resume sagas.
pub struct RecoveryTask(BoxFuture<'static, RecoveryResult>);

impl Future for RecoveryTask {
type Output = RecoveryResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.get_mut().0).poll(cx)
}
}

/// Result type from a [`CompletionTask`].
pub type CompletionResult = Result<(), Error>;

Expand All @@ -58,117 +36,93 @@ impl Future for CompletionTask {
}
}

/// Starts an asynchronous task to recover sagas (as after a crash or restart)
/// Kick off saga recovery (as after a crash or restart)
///
/// More specifically, this task queries the database to list all uncompleted
/// sagas that are assigned to SEC `sec_id` and for each one:
/// More specifically, this function queries the database to list all
/// uncompleted sagas that are assigned to SEC `sec_id` and for each one:
///
/// * loads the saga DAG and log from `datastore`
/// * uses [`steno::SecClient::saga_resume`] to prepare to resume execution of
/// the saga using the persistent saga log
/// * resumes execution of each saga
///
/// The returned [`RecoveryTask`] completes once all sagas have been loaded
/// and resumed, and itself returns a [`CompletionTask`] which completes
/// when those resumed sagas have finished.
pub fn recover<T>(
opctx: OpContext,
/// The function completes once all sagas have been loaded and resumed. The
/// itself returns a [`CompletionTask`] that completes when those resumed sagas
/// have finished.
pub async fn recover<T>(
opctx: &OpContext,
sec_id: db::SecId,
uctx: Arc<T::ExecContextType>,
datastore: Arc<db::DataStore>,
sec_client: Arc<steno::SecClient>,
make_context: &(dyn Fn(&slog::Logger) -> Arc<T::ExecContextType>
+ Send
+ Sync),
datastore: &db::DataStore,
sec_client: &steno::SecClient,
registry: Arc<steno::ActionRegistry<T>>,
) -> RecoveryTask
) -> Result<CompletionTask, Error>
where
T: steno::SagaType,
{
let join_handle = tokio::spawn(async move {
info!(&opctx.log, "start saga recovery");

// We perform the initial list of sagas using a standard retry policy.
// We treat all errors as transient because there's nothing we can do
// about any of them except try forever. As a result, we never expect
// an error from the overall operation.
// TODO-monitoring we definitely want a way to raise a big red flag if
// saga recovery is not completing.
// TODO-robustness It would be better to retry the individual database
// operations within this operation than retrying the overall operation.
// As this is written today, if the listing requires a bunch of pages
// and the operation fails partway through, we'll re-fetch all the pages
// we successfully fetched before. If the database is overloaded and
// only N% of requests are completing, the probability of this operation
// succeeding decreases considerably as the number of separate queries
// (pages) goes up. We'd be much more likely to finish the overall
// operation if we didn't throw away the results we did get each time.
let found_sagas = retry_notify(
retry_policy_internal_service(),
|| async {
list_unfinished_sagas(&opctx, &datastore, &sec_id)
.await
.map_err(BackoffError::transient)
},
|error, duration| {
warn!(
&opctx.log,
"failed to list sagas (will retry after {:?}): {:#}",
duration,
error
)
},
// XXX-dap who else was calling this function and is it okay that we do this
// work "synchronously" now?
info!(&opctx.log, "start saga recovery");

// We do not retry any database operations here because we're being invoked
// by a background task that will be re-activated some time later and pick
// up anything we missed.
// TODO-monitoring we definitely want a way to raise a big red flag if
// saga recovery is not completing.
let found_sagas = list_unfinished_sagas(&opctx, datastore, &sec_id).await?;

info!(&opctx.log, "listed sagas ({} total)", found_sagas.len());

// Load and resume all sagas in serial. Too much parallelism here could
// overload the database. It wouldn't buy us much anyway to parallelize
// this since these operations should generally be quick, and there
// shouldn't be too many sagas outstanding, and Nexus has already crashed so
// they've experienced a bit of latency already.
let mut completion_futures = Vec::with_capacity(found_sagas.len());
for saga in found_sagas {
// TODO-debugging want visibility into sagas that we cannot recover for
// whatever reason
let saga_id: steno::SagaId = saga.id.into();
let saga_name = saga.name.clone();
let saga_logger = opctx.log.new(o!(
"saga_name" => saga_name,
"saga_id" => saga_id.to_string()
));
info!(&saga_logger, "recovering saga: start");
match recover_saga(
&opctx,
&saga_logger,
make_context,
datastore,
sec_client,
Arc::clone(&registry),
saga,
)
.await
.unwrap();

info!(&opctx.log, "listed sagas ({} total)", found_sagas.len());

let recovery_futures = found_sagas.into_iter().map(|saga| async {
// TODO-robustness We should put this into a retry loop. We may
// also want to take any failed sagas and put them at the end of the
// queue. It shouldn't really matter, in that the transient
// failures here are likely to affect recovery of all sagas.
// However, it's conceivable we misclassify a permanent failure as a
// transient failure, or that a transient failure is more likely to
// affect some sagas than others (e.g, data on a different node, or
// it has a larger log that requires more queries). To avoid one
// bad saga ruining the rest, we should try to recover the rest
// before we go back to one that's failed.
// TODO-debugging want visibility into "abandoned" sagas
let saga_id: steno::SagaId = saga.id.into();
recover_saga(
&opctx,
Arc::clone(&uctx),
&datastore,
&sec_client,
Arc::clone(&registry),
saga,
)
.map_err(|error| {
warn!(
&opctx.log,
"failed to recover saga {}: {:#}", saga_id, error
{
Ok(completion_future) => {
info!(&saga_logger, "recovered saga");
completion_futures.push(completion_future);
}
Err(error) => {
// It's essential that we not bail out early just because we hit
// an error here. We want to recover all the sagas that we can.
error!(
&saga_logger,
"failed to recover saga";
slog_error_chain::InlineErrorChain::new(&error),
);
error
})
.await
});

let mut completion_futures = Vec::with_capacity(recovery_futures.len());
// Loads and resumes all sagas in serial.
for recovery_future in recovery_futures {
let saga_complete_future = recovery_future.await?;
completion_futures.push(saga_complete_future);
}
}
// Returns a future that awaits the completion of all resumed sagas.
Ok(CompletionTask(Box::pin(async move {
futures::future::try_join_all(completion_futures).await?;
Ok(())
})))
});

RecoveryTask(Box::pin(async move {
// Unwraps join-related errors.
join_handle.await.unwrap()
}))
}

// Returns a future that awaits the completion of all resumed sagas.
Ok(CompletionTask(Box::pin(async move {
futures::future::try_join_all(completion_futures).await?;
Ok(())
})))
}

/// Queries the database to return a list of uncompleted sagas assigned to SEC
Expand Down Expand Up @@ -233,7 +187,10 @@ async fn list_unfinished_sagas(
/// and does not need to be polled.
async fn recover_saga<'a, T>(
opctx: &'a OpContext,
uctx: Arc<T::ExecContextType>,
saga_logger: &slog::Logger,
make_context: &(dyn Fn(&slog::Logger) -> Arc<T::ExecContextType>
+ Send
+ Sync),
datastore: &'a db::DataStore,
sec_client: &'a steno::SecClient,
registry: Arc<steno::ActionRegistry<T>>,
Expand All @@ -246,27 +203,11 @@ where
T: steno::SagaType,
{
let saga_id: steno::SagaId = saga.id.into();
let saga_name = saga.name.clone();
trace!(opctx.log, "recovering saga: start";
"saga_id" => saga_id.to_string(),
"saga_name" => saga_name.clone(),
);

let log_events = load_saga_log(&opctx, datastore, &saga).await?;
trace!(
opctx.log,
"recovering saga: loaded log";
"saga_id" => ?saga_id,
"saga_name" => saga_name.clone()
);
trace!(&saga_logger, "recovering saga: loaded log");
let uctx = make_context(&saga_logger);
let saga_completion = sec_client
.saga_resume(
saga_id,
Arc::clone(&uctx),
saga.saga_dag,
registry,
log_events,
)
.saga_resume(saga_id, uctx, saga.saga_dag, registry, log_events)
.await
.map_err(|error| {
// TODO-robustness We want to differentiate between retryable and
Expand All @@ -276,6 +217,7 @@ where
error
))
})?;
trace!(&saga_logger, "recovering saga: starting the saga");
sec_client.saga_start(saga_id).await.map_err(|error| {
Error::internal_error(&format!("failed to start saga: {:#}", error))
})?;
Expand Down Expand Up @@ -519,11 +461,11 @@ mod test {
// Recover the saga, observing that it re-runs operations and completes.
let sec_client = Arc::new(sec_client);
recover(
opctx,
&opctx,
sec_id,
uctx.clone(),
db_datastore,
sec_client.clone(),
&|_| uctx.clone(),
&db_datastore,
&sec_client,
registry_create(),
)
.await // Await the loading and resuming of the sagas
Expand Down Expand Up @@ -584,11 +526,11 @@ mod test {
// Recover the saga, observing that it does not replay the nodes.
let sec_client = Arc::new(sec_client);
recover(
opctx,
&opctx,
sec_id,
uctx.clone(),
db_datastore,
sec_client.clone(),
&|_| uctx.clone(),
&db_datastore,
&sec_client,
registry_create(),
)
.await
Expand Down
1 change: 1 addition & 0 deletions nexus/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ instance_watcher.period_secs = 30
service_firewall_propagation.period_secs = 300
v2p_mapping_propagation.period_secs = 30
abandoned_vmm_reaper.period_secs = 60
saga_recovery.period_secs = 600

[default_region_allocation_strategy]
# allocate region on 3 random distinct zpools, on 3 random distinct sleds.
Expand Down
Loading

0 comments on commit 5e44f02

Please sign in to comment.