Skip to content

Commit

Permalink
[nexus] add instance_resurrection background task
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Sep 1, 2024
1 parent a89df2f commit 377d372
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 6 deletions.
56 changes: 52 additions & 4 deletions nexus/db-queries/src/db/datastore/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::db::identity::Resource;
use crate::db::lookup::LookupPath;
use crate::db::model::Generation;
use crate::db::model::Instance;
use crate::db::model::InstanceAutoRestart;
use crate::db::model::InstanceRuntimeState;
use crate::db::model::InstanceState;
use crate::db::model::Migration;
use crate::db::model::MigrationState;
use crate::db::model::Name;
Expand Down Expand Up @@ -435,6 +437,53 @@ impl DataStore {
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// List all instances in the [`Failed`](InstanceState::Failed) with an
/// auto-restart policy that permits them to be automatically restarted by
/// the control plane.
///
/// This is used by the `instance_resurrection` RPW to ensure that that any
/// such instances are restarted.
///
/// This query is paginated by the instance's UUID, using the provided
/// [`DataPageParams`].
pub async fn find_resurrectable_instances(
&self,
opctx: &OpContext,
pagparams: &DataPageParams<'_, Uuid>,
) -> ListResultVec<Instance> {
use db::schema::instance::dsl;

paginated(dsl::instance, dsl::id, pagparams)
// Only attempt to resurrect Failed instances.
.filter(dsl::state.eq(InstanceState::Failed))
// The instance's auto-restart policy must allow the control plane
// to restart it automatically.
//
// N.B. that this may become more complex in the future if we grow
// additional auto-restart policies that require additional logic
// (such as restart limits...)
.filter(
dsl::auto_restart_policy.eq(InstanceAutoRestart::AllFailures),
)
// Deleted instances may not be resurrected.
.filter(dsl::time_deleted.is_null())
// If the instance is currently in the process of being updated,
// let's not mess with it for now and try to restart it on another
// pass.
.filter(dsl::updater_id.is_null())
// TODO(eliza): perhaps we ought to check for the presence of an
// active VMM here? If there is one, that would indicate that the
// instance hasn't been moved to `Failed` correctly. But, we would
// also need to handle the case where the active VMM is
// SagaUnwound...
.select(Instance::as_select())
.load_async::<Instance>(
&*self.pool_connection_authorized(opctx).await?,
)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Fetches information about an Instance that the caller has previously
/// fetched
///
Expand Down Expand Up @@ -871,12 +920,11 @@ impl DataStore {
// instance must be "stopped" or "failed" in order to delete it. The
// delete operation sets "time_deleted" (just like with other objects)
// and also sets the state to "destroyed".
use db::model::InstanceState as DbInstanceState;
use db::schema::{disk, instance};

let stopped = DbInstanceState::NoVmm;
let failed = DbInstanceState::Failed;
let destroyed = DbInstanceState::Destroyed;
let stopped = InstanceState::NoVmm;
let failed = InstanceState::Failed;
let destroyed = InstanceState::Destroyed;
let ok_to_delete_instance_states = vec![stopped, failed];

let detached_label = api::external::DiskState::Detached.label();
Expand Down
240 changes: 240 additions & 0 deletions nexus/src/app/background/tasks/instance_resurrection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Background task for automatically restarting failed instances.
use crate::app::background::BackgroundTask;
use crate::app::saga::StartSaga;
use crate::app::sagas::instance_start;
use crate::app::sagas::NexusSaga;
use anyhow::Context;
use futures::future::BoxFuture;
use nexus_db_queries::authn;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::pagination::Paginator;
use nexus_db_queries::db::DataStore;
use nexus_types::identity::Resource;
use omicron_common::api::external::Error;
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::task::JoinSet;

pub struct InstanceResurrection {
datastore: Arc<DataStore>,
sagas: Arc<dyn StartSaga>,
}

#[derive(Default)]
struct ActivationStats {
instances_found: usize,
instances_restarted: usize,
already_resurrected: usize,
sagas_started: usize,
saga_start_errors: usize,
saga_errors: usize,
}

const BATCH_SIZE: NonZeroU32 = unsafe {
// Safety: last time I checked, 100 was greater than zero.
NonZeroU32::new_unchecked(100)
};

impl BackgroundTask for InstanceResurrection {
fn activate<'a>(
&'a mut self,
opctx: &'a OpContext,
) -> BoxFuture<'a, serde_json::Value> {
Box::pin(async move {
let mut stats = ActivationStats::default();
let error = match self.actually_activate(opctx, &mut stats).await {
Ok(_) => {
if stats.instances_restarted > 0 {
info!(
&opctx.log,
"instance resurrection completed";
"instances_found" => stats.instances_found,
"instances_restarted" => stats.instances_restarted,
"already_resurrected" => stats.already_resurrected,
"sagas_started" => stats.sagas_started,
);
}
None
}
Err(error) => {
error!(
&opctx.log,
"instance resurrection failed!";
"last_error" => %error,
"instances_found" => stats.instances_found,
"instances_restarted" => stats.instances_restarted,
"already_resurrected" => stats.already_resurrected,
"sagas_started" => stats.sagas_started,
"saga_start_errors" => stats.saga_start_errors,
"saga_errors" => stats.saga_errors,
);
Some(error.to_string())
}
};
serde_json::json!({
"instances_found": stats.instances_found,
"instances_restarted": stats.instances_restarted,
"already_resurrected": stats.already_resurrected,
"sagas_started": stats.sagas_started,
"saga_start_errors": stats.saga_start_errors,
"saga_errors": stats.saga_errors,
"last_error": error,
})
})
}
}

impl InstanceResurrection {
async fn actually_activate(
&mut self,
opctx: &OpContext,
stats: &mut ActivationStats,
) -> anyhow::Result<()> {
let mut tasks = JoinSet::new();

let mut last_err = Ok(());
let mut paginator = Paginator::new(BATCH_SIZE);

while let Some(p) = paginator.next() {
let maybe_batch = self
.datastore
.find_resurrectable_instances(opctx, &p.current_pagparams())
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(error) => {
const ERR_STR: &'static str =
"failed to list instances in need of resurrection";
error!(
opctx.log,
"failed to list instances in need of resurrection";
"error" => &error,
);
last_err = Err(error).context(ERR_STR);
break;
}
};

paginator = p.found_batch(&batch, &|instance| instance.id());

let found = batch.len();
if found == 0 {
debug!(
opctx.log,
"no more instances in need of resurrection";
"total_found" => stats.instances_found,
);
break;
}

let prev_sagas_started = stats.sagas_started;
stats.instances_found += found;

let serialized_authn = authn::saga::Serialized::for_opctx(opctx);
for db_instance in batch {
let instance_id = db_instance.id();
let prepared_saga = instance_start::SagaInstanceStart::prepare(
&instance_start::Params {
db_instance,
serialized_authn: serialized_authn.clone(),
},
);
match prepared_saga {
Ok(saga) => {
let start_saga = self.sagas.clone();
tasks.spawn(async move {
start_saga
.saga_start(saga)
.await
.map_err(|e| (instance_id, e))?;
Ok(instance_id)
});
stats.sagas_started += 1;
}
Err(error) => {
const ERR_STR: &'static str =
"failed to prepare instance-start saga for ";
error!(
opctx.log,
"{ERR_STR}{instance_id}";
"instance_id" => %instance_id,
"error" => %error,
);
last_err = Err(error)
.with_context(|| format!("{ERR_STR}{instance_id}"));
stats.saga_start_errors += 1;
}
};
}

debug!(
opctx.log,
"found resurrectible instances";
"found" => found,
"total_found" => stats.instances_found,
"sagas_started" => stats.sagas_started - prev_sagas_started,
"total_sagas_started" => stats.sagas_started,
);
}

// All sagas started, wait for them to come back...
while let Some(saga_result) = tasks.join_next().await {
match saga_result {
// Start saga completed successfully
Ok(Ok(instance_id)) => {
debug!(
opctx.log,
"welcome back to the realm of the living, {instance_id}!";
"instance_id" => %instance_id,
);
stats.instances_restarted += 1;
}
// The instance was restarted by another saga, that's fine...
Ok(Err((instance_id, Error::Conflict { message })))
if message.external_message()
== instance_start::ALREADY_STARTING_ERROR =>
{
debug!(
opctx.log,
"instance {instance_id} was already resurrected";
"instance_id" => %instance_id,
);
stats.already_resurrected += 1;
}
// Start saga failed
Ok(Err((instance_id, error))) => {
// TODO(eliza): determine if this error indicates that
// someone else has already restarted the instance, which is
// fine...

const ERR_MSG: &'static str = "failed to restart instance";
warn!(opctx.log,
"{ERR_MSG} {instance_id}";
"instance_id" => %instance_id,
"error" => %error,
);
stats.saga_errors += 1;
last_err = Err(error)
.with_context(|| format!("{ERR_MSG} {instance_id}"));
}
Err(e) => {
const JOIN_ERR_MSG: &'static str =
"tasks spawned on the JoinSet should never return a \
JoinError, as nexus is compiled with panic=\"abort\", \
and we never cancel them...";
error!(opctx.log, "{JOIN_ERR_MSG}"; "error" => %e);
if cfg!(debug_assertions) {
unreachable!("{JOIN_ERR_MSG} but, I saw {e}!",)
}
}
}
}

last_err
}
}
1 change: 1 addition & 0 deletions nexus/src/app/background/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod dns_config;
pub mod dns_propagation;
pub mod dns_servers;
pub mod external_endpoints;
pub mod instance_resurrection;
pub mod instance_updater;
pub mod instance_watcher;
pub mod inventory_collection;
Expand Down
7 changes: 5 additions & 2 deletions nexus/src/app/sagas/instance_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ declare_saga_actions! {
/// changing its generation.
const REGISTERED_VMM_RECORD: &'static str = "ensure_registered";

pub(crate) const ALREADY_STARTING_ERROR: &'static str =
"instance changed state before it could be started";

#[derive(Debug)]
pub(crate) struct SagaInstanceStart;
impl NexusSaga for SagaInstanceStart {
Expand Down Expand Up @@ -282,7 +285,7 @@ async fn sis_move_to_starting(
// must have started the instance already, so unwind.
Some(_) => {
return Err(ActionError::action_failed(Error::conflict(
"instance changed state before it could be started",
ALREADY_STARTING_ERROR,
)));
}

Expand Down Expand Up @@ -312,7 +315,7 @@ async fn sis_move_to_starting(
.map_err(ActionError::action_failed)?
{
return Err(ActionError::action_failed(Error::conflict(
"instance changed state before it could be started",
ALREADY_STARTING_ERROR,
)));
}

Expand Down

0 comments on commit 377d372

Please sign in to comment.