From 377d3726628d8028d18222b35cfb574771488c2c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 1 Sep 2024 09:53:07 -0700 Subject: [PATCH] [nexus] add `instance_resurrection` background task --- nexus/db-queries/src/db/datastore/instance.rs | 56 +++- .../background/tasks/instance_resurrection.rs | 240 ++++++++++++++++++ nexus/src/app/background/tasks/mod.rs | 1 + nexus/src/app/sagas/instance_start.rs | 7 +- 4 files changed, 298 insertions(+), 6 deletions(-) create mode 100644 nexus/src/app/background/tasks/instance_resurrection.rs diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index 34579ad21f3..7ab7b76f92f 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -20,7 +20,9 @@ use crate::db::identity::Resource; use crate::db::lookup::LookupPath; use crate::db::model::Generation; use crate::db::model::Instance; +use crate::db::model::InstanceAutoRestart; use crate::db::model::InstanceRuntimeState; +use crate::db::model::InstanceState; use crate::db::model::Migration; use crate::db::model::MigrationState; use crate::db::model::Name; @@ -435,6 +437,53 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// List all instances in the [`Failed`](InstanceState::Failed) with an + /// auto-restart policy that permits them to be automatically restarted by + /// the control plane. + /// + /// This is used by the `instance_resurrection` RPW to ensure that that any + /// such instances are restarted. + /// + /// This query is paginated by the instance's UUID, using the provided + /// [`DataPageParams`]. + pub async fn find_resurrectable_instances( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + use db::schema::instance::dsl; + + paginated(dsl::instance, dsl::id, pagparams) + // Only attempt to resurrect Failed instances. + .filter(dsl::state.eq(InstanceState::Failed)) + // The instance's auto-restart policy must allow the control plane + // to restart it automatically. + // + // N.B. that this may become more complex in the future if we grow + // additional auto-restart policies that require additional logic + // (such as restart limits...) + .filter( + dsl::auto_restart_policy.eq(InstanceAutoRestart::AllFailures), + ) + // Deleted instances may not be resurrected. + .filter(dsl::time_deleted.is_null()) + // If the instance is currently in the process of being updated, + // let's not mess with it for now and try to restart it on another + // pass. + .filter(dsl::updater_id.is_null()) + // TODO(eliza): perhaps we ought to check for the presence of an + // active VMM here? If there is one, that would indicate that the + // instance hasn't been moved to `Failed` correctly. But, we would + // also need to handle the case where the active VMM is + // SagaUnwound... + .select(Instance::as_select()) + .load_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + /// Fetches information about an Instance that the caller has previously /// fetched /// @@ -871,12 +920,11 @@ impl DataStore { // instance must be "stopped" or "failed" in order to delete it. The // delete operation sets "time_deleted" (just like with other objects) // and also sets the state to "destroyed". - use db::model::InstanceState as DbInstanceState; use db::schema::{disk, instance}; - let stopped = DbInstanceState::NoVmm; - let failed = DbInstanceState::Failed; - let destroyed = DbInstanceState::Destroyed; + let stopped = InstanceState::NoVmm; + let failed = InstanceState::Failed; + let destroyed = InstanceState::Destroyed; let ok_to_delete_instance_states = vec![stopped, failed]; let detached_label = api::external::DiskState::Detached.label(); diff --git a/nexus/src/app/background/tasks/instance_resurrection.rs b/nexus/src/app/background/tasks/instance_resurrection.rs new file mode 100644 index 00000000000..c95f44e4201 --- /dev/null +++ b/nexus/src/app/background/tasks/instance_resurrection.rs @@ -0,0 +1,240 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for automatically restarting failed instances. + +use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; +use crate::app::sagas::instance_start; +use crate::app::sagas::NexusSaga; +use anyhow::Context; +use futures::future::BoxFuture; +use nexus_db_queries::authn; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::pagination::Paginator; +use nexus_db_queries::db::DataStore; +use nexus_types::identity::Resource; +use omicron_common::api::external::Error; +use std::num::NonZeroU32; +use std::sync::Arc; +use tokio::task::JoinSet; + +pub struct InstanceResurrection { + datastore: Arc, + sagas: Arc, +} + +#[derive(Default)] +struct ActivationStats { + instances_found: usize, + instances_restarted: usize, + already_resurrected: usize, + sagas_started: usize, + saga_start_errors: usize, + saga_errors: usize, +} + +const BATCH_SIZE: NonZeroU32 = unsafe { + // Safety: last time I checked, 100 was greater than zero. + NonZeroU32::new_unchecked(100) +}; + +impl BackgroundTask for InstanceResurrection { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + Box::pin(async move { + let mut stats = ActivationStats::default(); + let error = match self.actually_activate(opctx, &mut stats).await { + Ok(_) => { + if stats.instances_restarted > 0 { + info!( + &opctx.log, + "instance resurrection completed"; + "instances_found" => stats.instances_found, + "instances_restarted" => stats.instances_restarted, + "already_resurrected" => stats.already_resurrected, + "sagas_started" => stats.sagas_started, + ); + } + None + } + Err(error) => { + error!( + &opctx.log, + "instance resurrection failed!"; + "last_error" => %error, + "instances_found" => stats.instances_found, + "instances_restarted" => stats.instances_restarted, + "already_resurrected" => stats.already_resurrected, + "sagas_started" => stats.sagas_started, + "saga_start_errors" => stats.saga_start_errors, + "saga_errors" => stats.saga_errors, + ); + Some(error.to_string()) + } + }; + serde_json::json!({ + "instances_found": stats.instances_found, + "instances_restarted": stats.instances_restarted, + "already_resurrected": stats.already_resurrected, + "sagas_started": stats.sagas_started, + "saga_start_errors": stats.saga_start_errors, + "saga_errors": stats.saga_errors, + "last_error": error, + }) + }) + } +} + +impl InstanceResurrection { + async fn actually_activate( + &mut self, + opctx: &OpContext, + stats: &mut ActivationStats, + ) -> anyhow::Result<()> { + let mut tasks = JoinSet::new(); + + let mut last_err = Ok(()); + let mut paginator = Paginator::new(BATCH_SIZE); + + while let Some(p) = paginator.next() { + let maybe_batch = self + .datastore + .find_resurrectable_instances(opctx, &p.current_pagparams()) + .await; + let batch = match maybe_batch { + Ok(batch) => batch, + Err(error) => { + const ERR_STR: &'static str = + "failed to list instances in need of resurrection"; + error!( + opctx.log, + "failed to list instances in need of resurrection"; + "error" => &error, + ); + last_err = Err(error).context(ERR_STR); + break; + } + }; + + paginator = p.found_batch(&batch, &|instance| instance.id()); + + let found = batch.len(); + if found == 0 { + debug!( + opctx.log, + "no more instances in need of resurrection"; + "total_found" => stats.instances_found, + ); + break; + } + + let prev_sagas_started = stats.sagas_started; + stats.instances_found += found; + + let serialized_authn = authn::saga::Serialized::for_opctx(opctx); + for db_instance in batch { + let instance_id = db_instance.id(); + let prepared_saga = instance_start::SagaInstanceStart::prepare( + &instance_start::Params { + db_instance, + serialized_authn: serialized_authn.clone(), + }, + ); + match prepared_saga { + Ok(saga) => { + let start_saga = self.sagas.clone(); + tasks.spawn(async move { + start_saga + .saga_start(saga) + .await + .map_err(|e| (instance_id, e))?; + Ok(instance_id) + }); + stats.sagas_started += 1; + } + Err(error) => { + const ERR_STR: &'static str = + "failed to prepare instance-start saga for "; + error!( + opctx.log, + "{ERR_STR}{instance_id}"; + "instance_id" => %instance_id, + "error" => %error, + ); + last_err = Err(error) + .with_context(|| format!("{ERR_STR}{instance_id}")); + stats.saga_start_errors += 1; + } + }; + } + + debug!( + opctx.log, + "found resurrectible instances"; + "found" => found, + "total_found" => stats.instances_found, + "sagas_started" => stats.sagas_started - prev_sagas_started, + "total_sagas_started" => stats.sagas_started, + ); + } + + // All sagas started, wait for them to come back... + while let Some(saga_result) = tasks.join_next().await { + match saga_result { + // Start saga completed successfully + Ok(Ok(instance_id)) => { + debug!( + opctx.log, + "welcome back to the realm of the living, {instance_id}!"; + "instance_id" => %instance_id, + ); + stats.instances_restarted += 1; + } + // The instance was restarted by another saga, that's fine... + Ok(Err((instance_id, Error::Conflict { message }))) + if message.external_message() + == instance_start::ALREADY_STARTING_ERROR => + { + debug!( + opctx.log, + "instance {instance_id} was already resurrected"; + "instance_id" => %instance_id, + ); + stats.already_resurrected += 1; + } + // Start saga failed + Ok(Err((instance_id, error))) => { + // TODO(eliza): determine if this error indicates that + // someone else has already restarted the instance, which is + // fine... + + const ERR_MSG: &'static str = "failed to restart instance"; + warn!(opctx.log, + "{ERR_MSG} {instance_id}"; + "instance_id" => %instance_id, + "error" => %error, + ); + stats.saga_errors += 1; + last_err = Err(error) + .with_context(|| format!("{ERR_MSG} {instance_id}")); + } + Err(e) => { + const JOIN_ERR_MSG: &'static str = + "tasks spawned on the JoinSet should never return a \ + JoinError, as nexus is compiled with panic=\"abort\", \ + and we never cancel them..."; + error!(opctx.log, "{JOIN_ERR_MSG}"; "error" => %e); + if cfg!(debug_assertions) { + unreachable!("{JOIN_ERR_MSG} but, I saw {e}!",) + } + } + } + } + + last_err + } +} diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 6cbba0a07b8..471747a435b 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -14,6 +14,7 @@ pub mod dns_config; pub mod dns_propagation; pub mod dns_servers; pub mod external_endpoints; +pub mod instance_resurrection; pub mod instance_updater; pub mod instance_watcher; pub mod inventory_collection; diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 3325cd72f44..e710891ea38 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -99,6 +99,9 @@ declare_saga_actions! { /// changing its generation. const REGISTERED_VMM_RECORD: &'static str = "ensure_registered"; +pub(crate) const ALREADY_STARTING_ERROR: &'static str = + "instance changed state before it could be started"; + #[derive(Debug)] pub(crate) struct SagaInstanceStart; impl NexusSaga for SagaInstanceStart { @@ -282,7 +285,7 @@ async fn sis_move_to_starting( // must have started the instance already, so unwind. Some(_) => { return Err(ActionError::action_failed(Error::conflict( - "instance changed state before it could be started", + ALREADY_STARTING_ERROR, ))); } @@ -312,7 +315,7 @@ async fn sis_move_to_starting( .map_err(ActionError::action_failed)? { return Err(ActionError::action_failed(Error::conflict( - "instance changed state before it could be started", + ALREADY_STARTING_ERROR, ))); }