diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index aa3b7151f7f..872740687f3 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -1095,8 +1095,17 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { /// number of instances found with terminated active migrations terminated_active_migrations: usize, - /// number of update sagas queued. - update_sagas_queued: usize, + /// number of update sagas started. + sagas_started: usize, + + /// number of sagas completed successfully + sagas_completed: usize, + + /// number of sagas which failed + sagas_failed: usize, + + /// number of sagas which could not be started + saga_start_failures: usize, /// the last error that occurred during execution. error: Option, @@ -1109,7 +1118,10 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { Ok(UpdaterStatus { destroyed_active_vmms, terminated_active_migrations, - update_sagas_queued, + sagas_started, + sagas_completed, + sagas_failed, + saga_start_failures, error, }) => { if let Some(error) = error { @@ -1129,7 +1141,21 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { " instances with terminated active migrations: {}", terminated_active_migrations, ); - println!(" update sagas queued: {update_sagas_queued}"); + println!(" update sagas started: {sagas_started}"); + println!( + " update sagas completed successfully: {}", + sagas_completed, + ); + + let total_failed = sagas_failed + saga_start_failures; + if total_failed > 0 { + println!(" unsuccessful update sagas: {total_failed}"); + println!( + " sagas which could not be started: {}", + saga_start_failures + ); + println!(" sagas failed: {sagas_failed}"); + } } }; } else { diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 30c5be6ba1b..9b6ae1ca447 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -589,9 +589,9 @@ impl BackgroundTasksInitializer { { let watcher = instance_watcher::InstanceWatcher::new( datastore.clone(), + sagas.clone(), producer_registry, instance_watcher::WatcherIdentity { nexus_id, rack_id }, - saga_request.clone(), ); driver.register(TaskDefinition { name: "instance_watcher", @@ -609,7 +609,7 @@ impl BackgroundTasksInitializer { { let updater = instance_updater::InstanceUpdater::new( datastore.clone(), - saga_request.clone(), + sagas.clone(), ); driver.register( TaskDefinition { name: "instance_updater", diff --git a/nexus/src/app/background/tasks/instance_updater.rs b/nexus/src/app/background/tasks/instance_updater.rs index cced8e82036..4aa31c07023 100644 --- a/nexus/src/app/background/tasks/instance_updater.rs +++ b/nexus/src/app/background/tasks/instance_updater.rs @@ -5,8 +5,9 @@ //! Background task for detecting instances in need of update sagas. use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; use crate::app::sagas::instance_update; -use crate::app::sagas::SagaRequest; +use crate::app::sagas::NexusSaga; use anyhow::Context; use futures::future::BoxFuture; use futures::FutureExt; @@ -20,19 +21,16 @@ use omicron_common::api::external::ListResultVec; use serde_json::json; use std::future::Future; use std::sync::Arc; -use tokio::sync::mpsc::Sender; +use tokio::task::JoinSet; pub struct InstanceUpdater { datastore: Arc, - saga_req: Sender, + sagas: Arc, } impl InstanceUpdater { - pub fn new( - datastore: Arc, - saga_req: Sender, - ) -> Self { - InstanceUpdater { datastore, saga_req } + pub fn new(datastore: Arc, sagas: Arc) -> Self { + InstanceUpdater { datastore, sagas } } async fn activate2( @@ -71,6 +69,7 @@ impl InstanceUpdater { } let mut last_err = Ok(()); + let mut sagas = JoinSet::new(); // NOTE(eliza): These don't, strictly speaking, need to be two separate // queries, they probably could instead be `OR`ed together in SQL. I @@ -84,6 +83,14 @@ impl InstanceUpdater { ) .await; stats.destroyed_active_vmms = destroyed_active_vmms.len(); + self.start_sagas( + &opctx, + stats, + &mut last_err, + &mut sagas, + destroyed_active_vmms, + ) + .await; let terminated_active_migrations = find_instances( "terminated active migrations", @@ -94,38 +101,101 @@ impl InstanceUpdater { ) .await; stats.terminated_active_migrations = terminated_active_migrations.len(); + self.start_sagas( + &opctx, + stats, + &mut last_err, + &mut sagas, + terminated_active_migrations, + ) + .await; - for instance in destroyed_active_vmms - .iter() - .chain(terminated_active_migrations.iter()) - { - let serialized_authn = authn::saga::Serialized::for_opctx(opctx); - let (.., authz_instance) = LookupPath::new(&opctx, &self.datastore) - .instance_id(instance.id()) - .lookup_for(authz::Action::Modify) - .await?; - let saga = SagaRequest::InstanceUpdate { - params: instance_update::Params { - serialized_authn, - authz_instance, - }, - }; - self.saga_req - .send(saga) - .await - .context("SagaRequest receiver missing")?; - stats.update_sagas_queued += 1; + // Now, wait for the sagas to complete. + while let Some(saga_result) = sagas.join_next().await { + match saga_result { + Err(err) => { + debug_assert!( + false, + "since nexus is compiled with `panic=\"abort\"`, and \ + we never cancel the tasks on the `JoinSet`, a \ + `JoinError` should never be observed!", + ); + stats.sagas_failed += 1; + last_err = Err(err.into()); + } + Ok(Err(err)) => { + warn!(opctx.log, "update saga failed!"; "error" => %err); + stats.sagas_failed += 1; + last_err = Err(err.into()); + } + Ok(Ok(())) => stats.sagas_completed += 1, + } } last_err } + + async fn start_sagas( + &self, + opctx: &OpContext, + stats: &mut ActivationStats, + last_err: &mut Result<(), anyhow::Error>, + sagas: &mut JoinSet>, + instances: impl IntoIterator, + ) { + let serialized_authn = authn::saga::Serialized::for_opctx(opctx); + for instance in instances { + let instance_id = instance.id(); + let saga = async { + let (.., authz_instance) = + LookupPath::new(&opctx, &self.datastore) + .instance_id(instance_id) + .lookup_for(authz::Action::Modify) + .await?; + instance_update::SagaInstanceUpdate::prepare( + &instance_update::Params { + serialized_authn: serialized_authn.clone(), + authz_instance, + }, + ) + .with_context(|| { + format!("failed to prepare instance-update saga for {instance_id}") + }) + } + .await; + match saga { + Ok(saga) => { + let start_saga = self.sagas.clone(); + sagas.spawn(async move { + start_saga.saga_start(saga).await.with_context(|| { + format!("update saga for {instance_id} failed") + }) + }); + stats.sagas_started += 1; + } + Err(err) => { + warn!( + opctx.log, + "failed to start instance-update saga!"; + "instance_id" => %instance_id, + "error" => %err, + ); + stats.saga_start_failures += 1; + *last_err = Err(err); + } + } + } + } } #[derive(Default)] struct ActivationStats { destroyed_active_vmms: usize, terminated_active_migrations: usize, - update_sagas_queued: usize, + sagas_started: usize, + sagas_completed: usize, + sagas_failed: usize, + saga_start_failures: usize, } impl BackgroundTask for InstanceUpdater { @@ -142,7 +212,20 @@ impl BackgroundTask for InstanceUpdater { "instance updater activation completed"; "destroyed_active_vmms" => stats.destroyed_active_vmms, "terminated_active_migrations" => stats.terminated_active_migrations, - "update_sagas_queued" => stats.update_sagas_queued, + "update_sagas_started" => stats.sagas_started, + "update_sagas_completed" => stats.sagas_completed, + ); + debug_assert_eq!( + stats.sagas_failed, + 0, + "if the task completed successfully, then no sagas \ + should have failed", + ); + debug_assert_eq!( + stats.saga_start_failures, + 0, + "if the task completed successfully, all sagas \ + should have started successfully" ); None } @@ -153,7 +236,10 @@ impl BackgroundTask for InstanceUpdater { "error" => %error, "destroyed_active_vmms" => stats.destroyed_active_vmms, "terminated_active_migrations" => stats.terminated_active_migrations, - "update_sagas_queued" => stats.update_sagas_queued, + "update_sagas_started" => stats.sagas_started, + "update_sagas_completed" => stats.sagas_completed, + "update_sagas_failed" => stats.sagas_failed, + "update_saga_start_failures" => stats.saga_start_failures, ); Some(error.to_string()) } @@ -161,7 +247,10 @@ impl BackgroundTask for InstanceUpdater { json!({ "destroyed_active_vmms": stats.destroyed_active_vmms, "terminated_active_migrations": stats.terminated_active_migrations, - "update_sagas_queued": stats.update_sagas_queued, + "sagas_started": stats.sagas_started, + "sagas_completed": stats.sagas_completed, + "sagas_failed": stats.sagas_failed, + "saga_start_failures": stats.saga_start_failures, "error": error, }) } diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index d77e51dd1f9..cd4ec1f3d4f 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -4,9 +4,8 @@ //! Background task for pulling instance state from sled-agents. -use crate::app::background::Activator; use crate::app::background::BackgroundTask; -use crate::app::sagas; +use crate::app::saga::StartSaga; use futures::{future::BoxFuture, FutureExt}; use http::StatusCode; use nexus_db_model::Instance; @@ -32,15 +31,14 @@ use std::net::IpAddr; use std::num::NonZeroU32; use std::sync::Arc; use std::sync::Mutex; -use tokio::sync::mpsc::Sender; use uuid::Uuid; /// Background task that periodically checks instance states. pub(crate) struct InstanceWatcher { datastore: Arc, + sagas: Arc, metrics: Arc>, id: WatcherIdentity, - v2p_manager: Activator, } const MAX_SLED_AGENTS: NonZeroU32 = unsafe { @@ -51,15 +49,15 @@ const MAX_SLED_AGENTS: NonZeroU32 = unsafe { impl InstanceWatcher { pub(crate) fn new( datastore: Arc, + sagas: Arc, producer_registry: &ProducerRegistry, id: WatcherIdentity, - v2p_manager: Activator, ) -> Self { let metrics = Arc::new(Mutex::new(metrics::Metrics::default())); producer_registry .register_producer(metrics::Producer(metrics.clone())) .unwrap(); - Self { datastore, resolver, metrics, id, v2p_manager } + Self { datastore, sagas, metrics, id } } fn check_instance( @@ -69,6 +67,7 @@ impl InstanceWatcher { target: VirtualMachine, ) -> impl Future + Send + 'static { let datastore = self.datastore.clone(); + let sagas = self.sagas.clone(); let opctx = opctx.child( std::iter::once(( @@ -78,7 +77,6 @@ impl InstanceWatcher { .collect(), ); let client = client.clone(); - let v2p_manager = self.v2p_manager.clone(); async move { slog::trace!(opctx.log, "checking on instance..."); @@ -159,35 +157,34 @@ impl InstanceWatcher { "updating instance state"; "state" => ?new_runtime_state.vmm_state.state, ); - check.result = - crate::app::instance::notify_instance_updated_background( - &datastore, - &opctx, - &saga_req, - InstanceUuid::from_untyped_uuid(target.instance_id), - new_runtime_state, - ) - .await - .map_err(|e| { - slog::warn!( - opctx.log, - "error updating instance"; - "error" => ?e, - ); - match e { - Error::ObjectNotFound { .. } => { - Incomplete::InstanceNotFound - } - _ => Incomplete::UpdateFailed, + check.result = crate::app::instance::notify_instance_updated( + &datastore, + sagas.as_ref(), + &opctx, + InstanceUuid::from_untyped_uuid(target.instance_id), + new_runtime_state, + ) + .await + .map_err(|e| { + slog::warn!( + opctx.log, + "error updating instance"; + "error" => ?e, + ); + match e { + Error::ObjectNotFound { .. } => { + Incomplete::InstanceNotFound } - }) - .map(|updated| { - slog::debug!( - opctx.log, "update successful"; - "vmm_updated" => ?updated, - ); - check.update_saga_queued = updated; - }); + _ => Incomplete::UpdateFailed, + } + }) + .map(|updated| { + slog::debug!( + opctx.log, "update successful"; + "vmm_updated" => ?updated, + ); + check.update_saga_queued = updated; + }); check } } diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index efbdcdc15df..fabb569082c 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -12,7 +12,9 @@ use super::MAX_NICS_PER_INSTANCE; use super::MAX_SSH_KEYS_PER_INSTANCE; use super::MAX_VCPU_PER_INSTANCE; use super::MIN_MEMORY_BYTES_PER_INSTANCE; +use crate::app::saga::StartSaga; use crate::app::sagas; +use crate::app::sagas::NexusSaga; use crate::cidata::InstanceCiData; use crate::external_api::params; use cancel_safe_futures::prelude::*; @@ -1884,14 +1886,17 @@ impl super::Nexus { } } -/// `Nexus::notify_instance_updated` (~~Taylor~~ background task's version) -pub(crate) async fn notify_instance_updated_background( +/// Invoked by a sled agent to publish an updated runtime state for an +/// Instance. +pub(crate) async fn notify_instance_updated( datastore: &DataStore, + sagas: &dyn StartSaga, opctx: &OpContext, - saga_request: &tokio::sync::mpsc::Sender, instance_id: InstanceUuid, new_runtime_state: nexus::SledInstanceState, ) -> Result { + use sagas::instance_update; + let migrations = new_runtime_state.migrations(); let propolis_id = new_runtime_state.propolis_id; info!(opctx.log, "received new VMM runtime state from sled agent"; @@ -1916,24 +1921,13 @@ pub(crate) async fn notify_instance_updated_background( .instance_id(instance_id.into_untyped_uuid()) .lookup_for(authz::Action::Modify) .await?; - let params = sagas::instance_update::Params { - serialized_authn: authn::saga::Serialized::for_opctx(opctx), - authz_instance, - }; - info!(opctx.log, "queueing update saga for {instance_id}"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id, - "vmm_state" => ?new_runtime_state.vmm_state, - "migration_state" => ?migrations, - ); - saga_request - .send(sagas::SagaRequest::InstanceUpdate { params }) - .await - .map_err(|_| { - Error::internal_error( - "background saga executor is gone! this is not supposed to happen" - ) - })?; + let saga = instance_update::SagaInstanceUpdate::prepare( + &instance_update::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + authz_instance, + }, + )?; + sagas.saga_start(saga).await?; } Ok(updated) diff --git a/nexus/src/app/instance_network.rs b/nexus/src/app/instance_network.rs index 27edcacabd7..8cd0a34fbf2 100644 --- a/nexus/src/app/instance_network.rs +++ b/nexus/src/app/instance_network.rs @@ -4,7 +4,6 @@ //! Routines that manage instance-related networking state. -use crate::app::background; use crate::app::switch_port; use ipnetwork::IpNetwork; use nexus_db_model::ExternalIp;