Skip to content

Commit

Permalink
post-rebase update for #5964
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 3, 2024
1 parent ea2f3ab commit d56175e
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 95 deletions.
34 changes: 30 additions & 4 deletions dev-tools/omdb/src/bin/omdb/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,17 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) {
/// number of instances found with terminated active migrations
terminated_active_migrations: usize,

/// number of update sagas queued.
update_sagas_queued: usize,
/// number of update sagas started.
sagas_started: usize,

/// number of sagas completed successfully
sagas_completed: usize,

/// number of sagas which failed
sagas_failed: usize,

/// number of sagas which could not be started
saga_start_failures: usize,

/// the last error that occurred during execution.
error: Option<String>,
Expand All @@ -1109,7 +1118,10 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) {
Ok(UpdaterStatus {
destroyed_active_vmms,
terminated_active_migrations,
update_sagas_queued,
sagas_started,
sagas_completed,
sagas_failed,
saga_start_failures,
error,
}) => {
if let Some(error) = error {
Expand All @@ -1129,7 +1141,21 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) {
" instances with terminated active migrations: {}",
terminated_active_migrations,
);
println!(" update sagas queued: {update_sagas_queued}");
println!(" update sagas started: {sagas_started}");
println!(
" update sagas completed successfully: {}",
sagas_completed,
);

let total_failed = sagas_failed + saga_start_failures;
if total_failed > 0 {
println!(" unsuccessful update sagas: {total_failed}");
println!(
" sagas which could not be started: {}",
saga_start_failures
);
println!(" sagas failed: {sagas_failed}");
}
}
};
} else {
Expand Down
4 changes: 2 additions & 2 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,9 @@ impl BackgroundTasksInitializer {
{
let watcher = instance_watcher::InstanceWatcher::new(
datastore.clone(),
sagas.clone(),
producer_registry,
instance_watcher::WatcherIdentity { nexus_id, rack_id },
saga_request.clone(),
);
driver.register(TaskDefinition {
name: "instance_watcher",
Expand All @@ -609,7 +609,7 @@ impl BackgroundTasksInitializer {
{
let updater = instance_updater::InstanceUpdater::new(
datastore.clone(),
saga_request.clone(),
sagas.clone(),
);
driver.register( TaskDefinition {
name: "instance_updater",
Expand Down
153 changes: 121 additions & 32 deletions nexus/src/app/background/tasks/instance_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
//! Background task for detecting instances in need of update sagas.
use crate::app::background::BackgroundTask;
use crate::app::saga::StartSaga;
use crate::app::sagas::instance_update;
use crate::app::sagas::SagaRequest;
use crate::app::sagas::NexusSaga;
use anyhow::Context;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand All @@ -20,19 +21,16 @@ use omicron_common::api::external::ListResultVec;
use serde_json::json;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;

pub struct InstanceUpdater {
datastore: Arc<DataStore>,
saga_req: Sender<SagaRequest>,
sagas: Arc<dyn StartSaga>,
}

impl InstanceUpdater {
pub fn new(
datastore: Arc<DataStore>,
saga_req: Sender<SagaRequest>,
) -> Self {
InstanceUpdater { datastore, saga_req }
pub fn new(datastore: Arc<DataStore>, sagas: Arc<dyn StartSaga>) -> Self {
InstanceUpdater { datastore, sagas }
}

async fn activate2(
Expand Down Expand Up @@ -71,6 +69,7 @@ impl InstanceUpdater {
}

let mut last_err = Ok(());
let mut sagas = JoinSet::new();

// NOTE(eliza): These don't, strictly speaking, need to be two separate
// queries, they probably could instead be `OR`ed together in SQL. I
Expand All @@ -84,6 +83,14 @@ impl InstanceUpdater {
)
.await;
stats.destroyed_active_vmms = destroyed_active_vmms.len();
self.start_sagas(
&opctx,
stats,
&mut last_err,
&mut sagas,
destroyed_active_vmms,
)
.await;

let terminated_active_migrations = find_instances(
"terminated active migrations",
Expand All @@ -94,38 +101,101 @@ impl InstanceUpdater {
)
.await;
stats.terminated_active_migrations = terminated_active_migrations.len();
self.start_sagas(
&opctx,
stats,
&mut last_err,
&mut sagas,
terminated_active_migrations,
)
.await;

for instance in destroyed_active_vmms
.iter()
.chain(terminated_active_migrations.iter())
{
let serialized_authn = authn::saga::Serialized::for_opctx(opctx);
let (.., authz_instance) = LookupPath::new(&opctx, &self.datastore)
.instance_id(instance.id())
.lookup_for(authz::Action::Modify)
.await?;
let saga = SagaRequest::InstanceUpdate {
params: instance_update::Params {
serialized_authn,
authz_instance,
},
};
self.saga_req
.send(saga)
.await
.context("SagaRequest receiver missing")?;
stats.update_sagas_queued += 1;
// Now, wait for the sagas to complete.
while let Some(saga_result) = sagas.join_next().await {
match saga_result {
Err(err) => {
debug_assert!(
false,
"since nexus is compiled with `panic=\"abort\"`, and \
we never cancel the tasks on the `JoinSet`, a \
`JoinError` should never be observed!",
);
stats.sagas_failed += 1;
last_err = Err(err.into());
}
Ok(Err(err)) => {
warn!(opctx.log, "update saga failed!"; "error" => %err);
stats.sagas_failed += 1;
last_err = Err(err.into());
}
Ok(Ok(())) => stats.sagas_completed += 1,
}
}

last_err
}

async fn start_sagas(
&self,
opctx: &OpContext,
stats: &mut ActivationStats,
last_err: &mut Result<(), anyhow::Error>,
sagas: &mut JoinSet<Result<(), anyhow::Error>>,
instances: impl IntoIterator<Item = Instance>,
) {
let serialized_authn = authn::saga::Serialized::for_opctx(opctx);
for instance in instances {
let instance_id = instance.id();
let saga = async {
let (.., authz_instance) =
LookupPath::new(&opctx, &self.datastore)
.instance_id(instance_id)
.lookup_for(authz::Action::Modify)
.await?;
instance_update::SagaInstanceUpdate::prepare(
&instance_update::Params {
serialized_authn: serialized_authn.clone(),
authz_instance,
},
)
.with_context(|| {
format!("failed to prepare instance-update saga for {instance_id}")
})
}
.await;
match saga {
Ok(saga) => {
let start_saga = self.sagas.clone();
sagas.spawn(async move {
start_saga.saga_start(saga).await.with_context(|| {
format!("update saga for {instance_id} failed")
})
});
stats.sagas_started += 1;
}
Err(err) => {
warn!(
opctx.log,
"failed to start instance-update saga!";
"instance_id" => %instance_id,
"error" => %err,
);
stats.saga_start_failures += 1;
*last_err = Err(err);
}
}
}
}
}

#[derive(Default)]
struct ActivationStats {
destroyed_active_vmms: usize,
terminated_active_migrations: usize,
update_sagas_queued: usize,
sagas_started: usize,
sagas_completed: usize,
sagas_failed: usize,
saga_start_failures: usize,
}

impl BackgroundTask for InstanceUpdater {
Expand All @@ -142,7 +212,20 @@ impl BackgroundTask for InstanceUpdater {
"instance updater activation completed";
"destroyed_active_vmms" => stats.destroyed_active_vmms,
"terminated_active_migrations" => stats.terminated_active_migrations,
"update_sagas_queued" => stats.update_sagas_queued,
"update_sagas_started" => stats.sagas_started,
"update_sagas_completed" => stats.sagas_completed,
);
debug_assert_eq!(
stats.sagas_failed,
0,
"if the task completed successfully, then no sagas \
should have failed",
);
debug_assert_eq!(
stats.saga_start_failures,
0,
"if the task completed successfully, all sagas \
should have started successfully"
);
None
}
Expand All @@ -153,15 +236,21 @@ impl BackgroundTask for InstanceUpdater {
"error" => %error,
"destroyed_active_vmms" => stats.destroyed_active_vmms,
"terminated_active_migrations" => stats.terminated_active_migrations,
"update_sagas_queued" => stats.update_sagas_queued,
"update_sagas_started" => stats.sagas_started,
"update_sagas_completed" => stats.sagas_completed,
"update_sagas_failed" => stats.sagas_failed,
"update_saga_start_failures" => stats.saga_start_failures,
);
Some(error.to_string())
}
};
json!({
"destroyed_active_vmms": stats.destroyed_active_vmms,
"terminated_active_migrations": stats.terminated_active_migrations,
"update_sagas_queued": stats.update_sagas_queued,
"sagas_started": stats.sagas_started,
"sagas_completed": stats.sagas_completed,
"sagas_failed": stats.sagas_failed,
"saga_start_failures": stats.saga_start_failures,
"error": error,
})
}
Expand Down
Loading

0 comments on commit d56175e

Please sign in to comment.