Skip to content

Commit

Permalink
it's sagas all the way down
Browse files Browse the repository at this point in the history
sickos dot png
  • Loading branch information
hawkw committed May 29, 2024
1 parent aa6c268 commit 2dc69d1
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 112 deletions.
48 changes: 48 additions & 0 deletions nexus/db-queries/src/db/datastore/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,54 @@ impl DataStore {
Ok(locked)
}

pub async fn instance_updater_inherit_lock(
&self,
opctx: &OpContext,
instance: &Instance,
child_lock_id: &Uuid,
) -> Result<(), Error> {
use db::schema::instance::dsl;

let current_gen = instance.runtime_state.updater_gen;
let new_gen = Generation(current_gen.0.next());
let instance_id = instance.id();
let parent_id = instance.runtime_state.updater_id.ok_or_else(|| {
Error::internal_error(
"instance must already be locked in order to inherit the lock",
)
})?;

diesel::update(dsl::instance)
.filter(dsl::time_deleted.is_null())
.filter(dsl::id.eq(instance_id))
.filter(dsl::updater_gen.eq(current_gen))
.filter(dsl::updater_id.eq(parent_id))
.set((
dsl::updater_gen.eq(new_gen),
dsl::updater_id.eq(Some(*child_lock_id)),
))
.check_if_exists::<Instance>(instance_id)
.execute_and_check(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::NotFoundByLookup(
ResourceType::Instance,
LookupType::ById(instance_id),
),
)
})
.and_then(|r| {
match r.status {
UpdateStatus::Updated => Ok(()),
UpdateStatus::NotUpdatedButExists => Err(Error::internal_error(
"cannot inherit lock (another saga probably already has)",
)),
}
})
}

/// Release the instance-updater lock acquired by
/// [`DataStore::instance_updater_try_lock`].
pub async fn instance_updater_unlock(
Expand Down
91 changes: 42 additions & 49 deletions nexus/src/app/sagas/instance_update/destroyed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::Params;
use super::STATE;
use crate::app::sagas::declare_saga_actions;
use crate::app::sagas::ActionError;
use nexus_db_model::Generation;
use nexus_db_model::Instance;
use nexus_db_model::InstanceRuntimeState;
use nexus_db_model::Vmm;
use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::db::datastore::InstanceAndVmms;
use nexus_db_queries::db::identity::Resource;
use omicron_common::api::external;
use omicron_common::api::external::Error;
use omicron_common::api::external::InstanceState;
use serde::{Deserialize, Serialize};
use slog::info;
use uuid::Uuid;

// instance update VMM destroyed subsaga: actions
// instance update (active VMM destroyed) subsaga: actions

// This subsaga is responsible for handling an instance update where the
// instance's active VMM has entered the `Destroyed` state. This requires
Expand Down Expand Up @@ -63,6 +66,21 @@ declare_saga_actions! {
}
}

/// Parameters to the instance update (active VMM destroyed) sub-saga.
#[derive(Debug, Deserialize, Serialize)]
pub(super) struct Params {
/// Authentication context to use to fetch the instance's current state from
/// the database.
pub(super) serialized_authn: authn::saga::Serialized,

pub(super) authz_instance: authz::Instance,

/// The UUID of the VMM that was destroyed.
pub(super) vmm_id: Uuid,

pub(super) instance: Instance,
}

#[derive(Debug)]
pub(crate) struct SagaVmmDestroyed;
impl NexusSaga for SagaVmmDestroyed {
Expand Down Expand Up @@ -104,14 +122,8 @@ fn get_destroyed_vmm(
async fn siud_release_sled_resources(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};

let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance } =
let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -121,13 +133,13 @@ async fn siud_release_sled_resources(
osagactx.log(),
"instance update (active VMM destroyed): deallocating sled resource reservation";
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"propolis_id" => %vmm_id,
"instance_update" => %"VMM destroyed",
);

osagactx
.datastore()
.sled_reservation_delete(&opctx, vmm.id)
.sled_reservation_delete(&opctx, vmm_id)
.await
.or_else(|err| {
// Necessary for idempotency
Expand All @@ -149,7 +161,7 @@ async fn siud_release_virtual_provisioning(
};

let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance } =
let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -159,7 +171,7 @@ async fn siud_release_virtual_provisioning(
osagactx.log(),
"instance update (VMM destroyed): deallocating virtual provisioning resources";
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"propolis_id" => %vmm_id,
"instance_update" => %"VMM destroyed",
);

Expand Down Expand Up @@ -187,11 +199,6 @@ async fn siud_release_virtual_provisioning(
async fn siud_unassign_oximeter_producer(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, _)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;
Expand All @@ -212,17 +219,15 @@ async fn siud_unassign_oximeter_producer(
async fn siud_delete_v2p_mappings(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let Params { ref authz_instance, vmm_id, .. } =
sagactx.saga_params::<Params>()?;

let osagactx = sagactx.user_data();
info!(
osagactx.log(),
"instance update (VMM destroyed): deleting V2P mappings";
"instance_id" => %instance.id(),
"propolis_id" => %vmm.id,
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm_id,
"instance_update" => %"VMM destroyed",
);

Expand All @@ -234,13 +239,8 @@ async fn siud_delete_v2p_mappings(
async fn siud_delete_nat_entries(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance, .. } =
let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -250,7 +250,7 @@ async fn siud_delete_nat_entries(
osagactx.log(),
"instance update (VMM destroyed): deleting NAT entries";
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"propolis_id" => %vmm_id,
"instance_update" => %"VMM destroyed",
);

Expand All @@ -265,11 +265,9 @@ async fn siud_delete_nat_entries(
async fn siud_update_instance(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let Params { ref authz_instance, ref vmm_id, instance, .. } =
sagactx.saga_params::<Params>()?;

let osagactx = sagactx.user_data();
let new_runtime = InstanceRuntimeState {
propolis_id: None,
Expand All @@ -281,30 +279,25 @@ async fn siud_update_instance(
info!(
osagactx.log(),
"instance update (VMM destroyed): updating runtime state";
"instance_id" => %instance.id(),
"propolis_id" => %vmm.id,
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm_id,
"new_runtime_state" => ?new_runtime,
"instance_update" => %"VMM destroyed",
);

// It's okay for this to fail, it just means that the active VMM ID has changed.
let _ = osagactx
.datastore()
.instance_update_runtime(&instance.id(), &new_runtime)
.instance_update_runtime(&authz_instance.id(), &new_runtime)
.await;
Ok(())
}

async fn siud_mark_vmm_deleted(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, .. } =
let Params { ref authz_instance, ref vmm_id, ref serialized_authn, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -313,14 +306,14 @@ async fn siud_mark_vmm_deleted(
info!(
osagactx.log(),
"instance update (VMM destroyed): marking VMM record deleted";
"instance_id" => %instance.id(),
"propolis_id" => %vmm.id,
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm_id,
"instance_update" => %"VMM destroyed",
);

osagactx
.datastore()
.vmm_mark_deleted(&opctx, &vmm.id)
.vmm_mark_deleted(&opctx, vmm_id)
.await
.map(|_| ())
.map_err(ActionError::action_failed)
Expand Down
Loading

0 comments on commit 2dc69d1

Please sign in to comment.