Skip to content

Commit

Permalink
queue update sagas for terminated migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 1, 2024
1 parent b89ca7a commit 3adbf54
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 38 deletions.
3 changes: 3 additions & 0 deletions nexus/db-model/src/migration_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl MigrationState {
pub const IN_PROGRESS: MigrationState =
MigrationState(nexus::MigrationState::InProgress);

pub const TERMINAL_STATES: &'static [MigrationState] =
&[Self::COMPLETED, Self::FAILED];

/// Returns `true` if this migration state means that the migration is no
/// longer in progress (it has either succeeded or failed).
#[must_use]
Expand Down
54 changes: 43 additions & 11 deletions nexus/db-queries/src/db/datastore/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,34 +299,66 @@ impl DataStore {

/// List all instances with active VMMs in the `Destroyed` state that don't
/// have currently-running instance-updater sagas.
///
/// This is used by the `instance_updater` background task to ensure that
/// update sagas are scheduled for these instances.
pub async fn find_instances_with_destroyed_active_vmms(
&self,
opctx: &OpContext,
) -> ListResultVec<InstanceAndActiveVmm> {
) -> ListResultVec<Instance> {
use db::model::VmmState;
use db::schema::instance::dsl;
use db::schema::vmm::dsl as vmm_dsl;
Ok(vmm_dsl::vmm
.filter(vmm_dsl::time_deleted.is_not_null())
.filter(vmm_dsl::state.eq(VmmState::Destroyed))
.inner_join(
dsl::instance.on(dsl::active_propolis_id
.eq(vmm_dsl::id.nullable())
.and(dsl::time_deleted.is_null())
.and(dsl::updater_id.is_null())),
)
.select((Instance::as_select(), Vmm::as_select()))
.load_async::<(Instance, Vmm)>(
.select(Instance::as_select())
.load_async::<Instance>(
&*self.pool_connection_authorized(opctx).await?,
)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?
.into_iter()
.map(|(instance, vmm)| InstanceAndActiveVmm {
instance,
vmm: Some(vmm),
})
.collect())
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?)
}

/// List all instances with active migrations that have terminated (either
/// completed or failed) and don't have currently-running instance-updater
/// sagas.
///
/// This is used by the `instance_updater` background task to ensure that
/// update sagas are scheduled for these instances.
pub async fn find_instances_with_terminated_active_migrations(
&self,
opctx: &OpContext,
) -> ListResultVec<Instance> {
use db::model::MigrationState;
use db::schema::instance::dsl;
use db::schema::migration::dsl as migration_dsl;

Ok(dsl::instance
.filter(dsl::time_deleted.is_null())
.filter(dsl::migration_id.is_not_null())
.filter(dsl::updater_id.is_null())
.inner_join(
migration_dsl::migration.on(dsl::migration_id
.eq(migration_dsl::id.nullable())
.and(
migration_dsl::target_state
.eq_any(MigrationState::TERMINAL_STATES)
.or(migration_dsl::source_state
.eq_any(MigrationState::TERMINAL_STATES)),
)),
)
.select(Instance::as_select())
.load_async::<Instance>(
&*self.pool_connection_authorized(opctx).await?,
)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?)
}

/// Fetches information about an Instance that the caller has previously
Expand Down
95 changes: 68 additions & 27 deletions nexus/src/app/background/tasks/instance_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Background task for detecting instances in need of update sagas.
//!
//! TODO this is currently a placeholder for a future PR
use crate::app::background::BackgroundTask;
use crate::app::sagas::instance_update;
use crate::app::sagas::SagaRequest;
use anyhow::Context;
use futures::future::BoxFuture;
use futures::FutureExt;
use nexus_db_model::Instance;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::datastore::InstanceAndActiveVmm;
use nexus_db_queries::db::lookup::LookupPath;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::{authn, authz};
use nexus_types::identity::Resource;
use omicron_common::api::external::ListResultVec;
use serde_json::json;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

Expand All @@ -40,28 +40,65 @@ impl InstanceUpdater {
opctx: &OpContext,
stats: &mut ActivationStats,
) -> Result<(), anyhow::Error> {
let log = &opctx.log;

slog::debug!(
&log,
"looking for instances with destroyed active VMMs..."
);

let destroyed_active_vmms = self
.datastore
.find_instances_with_destroyed_active_vmms(opctx)
.await
.context("failed to find instances with destroyed active VMMs")?;
async fn find_instances(
what: &'static str,
log: &slog::Logger,
last_err: &mut Result<(), anyhow::Error>,
query: impl Future<Output = ListResultVec<Instance>>,
) -> Vec<Instance> {
slog::debug!(&log, "looking for instances with {what}...");
match query.await {
Ok(list) => {
slog::info!(
&log,
"listed instances with {what}";
"count" => list.len(),
);
list
}
Err(error) => {
slog::error!(
&log,
"failed to list instances with {what}";
"error" => %error,
);
*last_err = Err(error).with_context(|| {
format!("failed to find instances with {what}",)
});
Vec::new()
}
}
}

slog::info!(
&log,
"listed instances with destroyed active VMMs";
"count" => destroyed_active_vmms.len(),
);
let mut last_err = Ok(());

// NOTE(eliza): These don't, strictly speaking, need to be two separate
// queries, they probably could instead be `OR`ed together in SQL. I
// just thought it was nice to be able to record the number of instances
// found separately for each state.
let destroyed_active_vmms = find_instances(
"destroyed active VMMs",
&opctx.log,
&mut last_err,
self.datastore.find_instances_with_destroyed_active_vmms(opctx),
)
.await;
stats.destroyed_active_vmms = destroyed_active_vmms.len();

for InstanceAndActiveVmm { instance, .. } in destroyed_active_vmms {
let terminated_active_migrations = find_instances(
"terminated active migrations",
&opctx.log,
&mut last_err,
self.datastore
.find_instances_with_terminated_active_migrations(opctx),
)
.await;
stats.terminated_active_migrations = terminated_active_migrations.len();

for instance in destroyed_active_vmms
.iter()
.chain(terminated_active_migrations.iter())
{
let serialized_authn = authn::saga::Serialized::for_opctx(opctx);
let (.., authz_instance) = LookupPath::new(&opctx, &self.datastore)
.instance_id(instance.id())
Expand All @@ -77,17 +114,18 @@ impl InstanceUpdater {
.send(saga)
.await
.context("SagaRequest receiver missing")?;
stats.sagas_started += 1;
stats.update_sagas_queued += 1;
}

Ok(())
last_err
}
}

#[derive(Default)]
struct ActivationStats {
destroyed_active_vmms: usize,
sagas_started: usize,
terminated_active_migrations: usize,
update_sagas_queued: usize,
}

impl BackgroundTask for InstanceUpdater {
Expand All @@ -103,7 +141,8 @@ impl BackgroundTask for InstanceUpdater {
&opctx.log,
"instance updater activation completed";
"destroyed_active_vmms" => stats.destroyed_active_vmms,
"sagas_started" => stats.sagas_started,
"terminated_active_migrations" => stats.terminated_active_migrations,
"update_sagas_queued" => stats.update_sagas_queued,
);
None
}
Expand All @@ -113,14 +152,16 @@ impl BackgroundTask for InstanceUpdater {
"instance updater activation failed!";
"error" => %error,
"destroyed_active_vmms" => stats.destroyed_active_vmms,
"sagas_started" => stats.sagas_started,
"terminated_active_migrations" => stats.terminated_active_migrations,
"update_sagas_queued" => stats.update_sagas_queued,
);
Some(error.to_string())
}
};
json!({
"destroyed_active_vmms": stats.destroyed_active_vmms,
"sagas_started": stats.sagas_started,
"terminated_active_migrations": stats.terminated_active_migrations,
"update_sagas_queued": stats.update_sagas_queued,
"error": error,
})
}
Expand Down

0 comments on commit 3adbf54

Please sign in to comment.