From 2dc69d1f20f907152f988f554690065b466f411a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 29 May 2024 14:13:17 -0700 Subject: [PATCH] it's sagas all the way down sickos dot png --- nexus/db-queries/src/db/datastore/instance.rs | 48 ++++ .../app/sagas/instance_update/destroyed.rs | 91 ++++--- nexus/src/app/sagas/instance_update/mod.rs | 225 +++++++++++++----- nexus/src/app/sagas/instance_update/start.rs | 0 4 files changed, 252 insertions(+), 112 deletions(-) create mode 100644 nexus/src/app/sagas/instance_update/start.rs diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index 6f513d3123e..2398718bf41 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -803,6 +803,54 @@ impl DataStore { Ok(locked) } + pub async fn instance_updater_inherit_lock( + &self, + opctx: &OpContext, + instance: &Instance, + child_lock_id: &Uuid, + ) -> Result<(), Error> { + use db::schema::instance::dsl; + + let current_gen = instance.runtime_state.updater_gen; + let new_gen = Generation(current_gen.0.next()); + let instance_id = instance.id(); + let parent_id = instance.runtime_state.updater_id.ok_or_else(|| { + Error::internal_error( + "instance must already be locked in order to inherit the lock", + ) + })?; + + diesel::update(dsl::instance) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(instance_id)) + .filter(dsl::updater_gen.eq(current_gen)) + .filter(dsl::updater_id.eq(parent_id)) + .set(( + dsl::updater_gen.eq(new_gen), + dsl::updater_id.eq(Some(*child_lock_id)), + )) + .check_if_exists::(instance_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::Instance, + LookupType::ById(instance_id), + ), + ) + }) + .and_then(|r| { + match r.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => Err(Error::internal_error( + "cannot inherit lock (another saga probably already has)", + )), + } + }) + } + /// Release the instance-updater lock acquired by /// [`DataStore::instance_updater_try_lock`]. pub async fn instance_updater_unlock( diff --git a/nexus/src/app/sagas/instance_update/destroyed.rs b/nexus/src/app/sagas/instance_update/destroyed.rs index 70e82ee6b15..d673be00409 100644 --- a/nexus/src/app/sagas/instance_update/destroyed.rs +++ b/nexus/src/app/sagas/instance_update/destroyed.rs @@ -5,7 +5,6 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; -use super::Params; use super::STATE; use crate::app::sagas::declare_saga_actions; use crate::app::sagas::ActionError; @@ -13,14 +12,18 @@ use nexus_db_model::Generation; use nexus_db_model::Instance; use nexus_db_model::InstanceRuntimeState; use nexus_db_model::Vmm; +use nexus_db_queries::authn; +use nexus_db_queries::authz; use nexus_db_queries::db::datastore::InstanceAndVmms; use nexus_db_queries::db::identity::Resource; use omicron_common::api::external; use omicron_common::api::external::Error; use omicron_common::api::external::InstanceState; +use serde::{Deserialize, Serialize}; use slog::info; +use uuid::Uuid; -// instance update VMM destroyed subsaga: actions +// instance update (active VMM destroyed) subsaga: actions // This subsaga is responsible for handling an instance update where the // instance's active VMM has entered the `Destroyed` state. This requires @@ -63,6 +66,21 @@ declare_saga_actions! { } } +/// Parameters to the instance update (active VMM destroyed) sub-saga. +#[derive(Debug, Deserialize, Serialize)] +pub(super) struct Params { + /// Authentication context to use to fetch the instance's current state from + /// the database. + pub(super) serialized_authn: authn::saga::Serialized, + + pub(super) authz_instance: authz::Instance, + + /// The UUID of the VMM that was destroyed. + pub(super) vmm_id: Uuid, + + pub(super) instance: Instance, +} + #[derive(Debug)] pub(crate) struct SagaVmmDestroyed; impl NexusSaga for SagaVmmDestroyed { @@ -104,14 +122,8 @@ fn get_destroyed_vmm( async fn siud_release_sled_resources( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; - let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref authz_instance } = + let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } = sagactx.saga_params::()?; let opctx = @@ -121,13 +133,13 @@ async fn siud_release_sled_resources( osagactx.log(), "instance update (active VMM destroyed): deallocating sled resource reservation"; "instance_id" => %authz_instance.id(), - "propolis_id" => %vmm.id, + "propolis_id" => %vmm_id, "instance_update" => %"VMM destroyed", ); osagactx .datastore() - .sled_reservation_delete(&opctx, vmm.id) + .sled_reservation_delete(&opctx, vmm_id) .await .or_else(|err| { // Necessary for idempotency @@ -149,7 +161,7 @@ async fn siud_release_virtual_provisioning( }; let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref authz_instance } = + let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } = sagactx.saga_params::()?; let opctx = @@ -159,7 +171,7 @@ async fn siud_release_virtual_provisioning( osagactx.log(), "instance update (VMM destroyed): deallocating virtual provisioning resources"; "instance_id" => %authz_instance.id(), - "propolis_id" => %vmm.id, + "propolis_id" => %vmm_id, "instance_update" => %"VMM destroyed", ); @@ -187,11 +199,6 @@ async fn siud_release_virtual_provisioning( async fn siud_unassign_oximeter_producer( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((_, _)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; let osagactx = sagactx.user_data(); let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; @@ -212,17 +219,15 @@ async fn siud_unassign_oximeter_producer( async fn siud_delete_v2p_mappings( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; + let Params { ref authz_instance, vmm_id, .. } = + sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); info!( osagactx.log(), "instance update (VMM destroyed): deleting V2P mappings"; - "instance_id" => %instance.id(), - "propolis_id" => %vmm.id, + "instance_id" => %authz_instance.id(), + "propolis_id" => %vmm_id, "instance_update" => %"VMM destroyed", ); @@ -234,13 +239,8 @@ async fn siud_delete_v2p_mappings( async fn siud_delete_nat_entries( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref authz_instance, .. } = + let Params { ref serialized_authn, ref authz_instance, vmm_id, .. } = sagactx.saga_params::()?; let opctx = @@ -250,7 +250,7 @@ async fn siud_delete_nat_entries( osagactx.log(), "instance update (VMM destroyed): deleting NAT entries"; "instance_id" => %authz_instance.id(), - "propolis_id" => %vmm.id, + "propolis_id" => %vmm_id, "instance_update" => %"VMM destroyed", ); @@ -265,11 +265,9 @@ async fn siud_delete_nat_entries( async fn siud_update_instance( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; + let Params { ref authz_instance, ref vmm_id, instance, .. } = + sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); let new_runtime = InstanceRuntimeState { propolis_id: None, @@ -281,8 +279,8 @@ async fn siud_update_instance( info!( osagactx.log(), "instance update (VMM destroyed): updating runtime state"; - "instance_id" => %instance.id(), - "propolis_id" => %vmm.id, + "instance_id" => %authz_instance.id(), + "propolis_id" => %vmm_id, "new_runtime_state" => ?new_runtime, "instance_update" => %"VMM destroyed", ); @@ -290,7 +288,7 @@ async fn siud_update_instance( // It's okay for this to fail, it just means that the active VMM ID has changed. let _ = osagactx .datastore() - .instance_update_runtime(&instance.id(), &new_runtime) + .instance_update_runtime(&authz_instance.id(), &new_runtime) .await; Ok(()) } @@ -298,13 +296,8 @@ async fn siud_update_instance( async fn siud_mark_vmm_deleted( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { - // if the update we are handling is not an active VMM destroyed update, - // bail --- there's nothing to do here. - return Ok(()); - }; let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, .. } = + let Params { ref authz_instance, ref vmm_id, ref serialized_authn, .. } = sagactx.saga_params::()?; let opctx = @@ -313,14 +306,14 @@ async fn siud_mark_vmm_deleted( info!( osagactx.log(), "instance update (VMM destroyed): marking VMM record deleted"; - "instance_id" => %instance.id(), - "propolis_id" => %vmm.id, + "instance_id" => %authz_instance.id(), + "propolis_id" => %vmm_id, "instance_update" => %"VMM destroyed", ); osagactx .datastore() - .vmm_mark_deleted(&opctx, &vmm.id) + .vmm_mark_deleted(&opctx, vmm_id) .await .map(|_| ()) .map_err(ActionError::action_failed) diff --git a/nexus/src/app/sagas/instance_update/mod.rs b/nexus/src/app/sagas/instance_update/mod.rs index 521869c1ff0..2107c8ca568 100644 --- a/nexus/src/app/sagas/instance_update/mod.rs +++ b/nexus/src/app/sagas/instance_update/mod.rs @@ -7,9 +7,7 @@ use super::{ ACTION_GENERATE_ID, }; use crate::app::db::datastore::InstanceAndVmms; -use crate::app::db::lookup::LookupPath; use crate::app::sagas::declare_saga_actions; -use nexus_db_model::Generation; use nexus_db_queries::{authn, authz}; use nexus_types::identity::Resource; use omicron_common::api::external::InstanceState; @@ -18,7 +16,8 @@ use steno::{ActionError, DagBuilder, Node, SagaName}; use uuid::Uuid; mod destroyed; -/// Parameters to the instance update saga. + +/// Parameters to the start instance update saga. #[derive(Debug, Deserialize, Serialize)] pub(crate) struct Params { /// Authentication context to use to fetch the instance's current state from @@ -28,6 +27,16 @@ pub(crate) struct Params { pub authz_instance: authz::Instance, } +/// Parameters to the "real" instance update saga. +#[derive(Debug, Deserialize, Serialize)] +struct RealParams { + serialized_authn: authn::saga::Serialized, + + authz_instance: authz::Instance, + + state: InstanceAndVmms, +} + const INSTANCE_LOCK_ID: &str = "saga_instance_lock_id"; const STATE: &str = "state"; @@ -43,15 +52,21 @@ declare_saga_actions! { - siu_lock_instance_undo } - // Fetch the instance and VMM's state. + // Fetch the instance and VMM's state, and start the "real" instance update saga. // N.B. that this must be performed as a separate action from // `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the // `LOCK_INSTANCE` action and release the lock. - FETCH_STATE -> "state" { - + siu_fetch_state + FETCH_STATE_AND_START_REAL_SAGA -> "state" { + + siu_fetch_state_and_start_real_saga } - UNLOCK_INSTANCE -> "no_result7" { + // Become the instance updater + BECOME_UPDATER -> "generation" { + + siu_become_updater + - siu_lock_instance_undo + } + + UNLOCK_INSTANCE -> "unlocked" { + siu_unlock_instance } } @@ -61,7 +76,7 @@ declare_saga_actions! { #[derive(Debug)] pub(crate) struct SagaInstanceUpdate; impl NexusSaga for SagaInstanceUpdate { - const NAME: &'static str = "instance-update"; + const NAME: &'static str = "start-instance-update"; type Params = Params; fn register_actions(registry: &mut ActionRegistry) { @@ -78,40 +93,76 @@ impl NexusSaga for SagaInstanceUpdate { ACTION_GENERATE_ID.as_ref(), )); builder.append(lock_instance_action()); - builder.append(fetch_state_action()); + builder.append(fetch_state_and_start_real_saga_action()); - // determine which subsaga to execute based on the state of the instance - // and the VMMs associated with it. - const DESTROYED_SUBSAGA_PARAMS: &str = - "params_for_vmm_destroyed_subsaga"; - let subsaga_dag = { - let subsaga_builder = DagBuilder::new(SagaName::new( - destroyed::SagaVmmDestroyed::NAME, - )); - destroyed::SagaVmmDestroyed::make_saga_dag( - ¶ms, - subsaga_builder, - )? - }; - - builder.append(Node::constant( - DESTROYED_SUBSAGA_PARAMS, - serde_json::to_value(¶ms).map_err(|e| { - SagaInitError::SerializeError( - DESTROYED_SUBSAGA_PARAMS.to_string(), - e, - ) - })?, - )); + Ok(builder.build()?) + } +} + +struct SagaRealInstanceUpdate; + +impl NexusSaga for SagaRealInstanceUpdate { + const NAME: &'static str = "instance-update"; + type Params = RealParams; - builder.append(Node::subsaga( - "vmm_destroyed_subsaga_no_result", - subsaga_dag, - DESTROYED_SUBSAGA_PARAMS, + fn register_actions(registry: &mut ActionRegistry) { + instance_update_register_actions(registry); + } + + fn make_saga_dag( + params: &Self::Params, + mut builder: DagBuilder, + ) -> Result { + builder.append(Node::action( + INSTANCE_LOCK_ID, + "GenerateInstanceLockId", + ACTION_GENERATE_ID.as_ref(), )); + builder.append(become_updater_action()); - builder.append(unlock_instance_action()); + // determine which subsaga(s) to execute based on the state of the instance + // and the VMMs associated with it. + if let Some(ref active_vmm) = params.state.active_vmm { + // If the active VMM is `Destroyed`, schedule the active VMM + // destroyed subsaga. + if active_vmm.runtime.state.state() == &InstanceState::Destroyed { + const DESTROYED_SUBSAGA_PARAMS: &str = + "params_for_vmm_destroyed_subsaga"; + let subsaga_params = destroyed::Params { + serialized_authn: params.serialized_authn.clone(), + authz_instance: params.authz_instance.clone(), + vmm_id: active_vmm.id, + instance: params.state.instance.clone(), + }; + let subsaga_dag = { + let subsaga_builder = DagBuilder::new(SagaName::new( + destroyed::SagaVmmDestroyed::NAME, + )); + destroyed::SagaVmmDestroyed::make_saga_dag( + &subsaga_params, + subsaga_builder, + )? + }; + + builder.append(Node::constant( + DESTROYED_SUBSAGA_PARAMS, + serde_json::to_value(&subsaga_params).map_err(|e| { + SagaInitError::SerializeError( + DESTROYED_SUBSAGA_PARAMS.to_string(), + e, + ) + })?, + )); + + builder.append(Node::subsaga( + "vmm_destroyed_subsaga_no_result", + subsaga_dag, + DESTROYED_SUBSAGA_PARAMS, + )); + } + } + builder.append(unlock_instance_action()); Ok(builder.build()?) } } @@ -120,19 +171,17 @@ impl NexusSaga for SagaInstanceUpdate { async fn siu_lock_instance( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); - let datastore = osagactx.datastore(); - let log = osagactx.log(); slog::info!( osagactx.log(), "instance update: attempting to lock instance"; - "instance_id" => %instance.id(), + "instance_id" => %authz_instance.id(), "saga_id" => %lock_id, ); osagactx @@ -143,69 +192,119 @@ async fn siu_lock_instance( .map(|_| ()) } -async fn siu_fetch_state( +async fn siu_fetch_state_and_start_real_saga( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref authz_instance, .. } = + let Params { serialized_authn, authz_instance, .. } = sagactx.saga_params::()?; let opctx = - crate::context::op_context_for_saga_action(&sagactx, serialized_authn); + crate::context::op_context_for_saga_action(&sagactx, &serialized_authn); - osagactx + let state = osagactx .datastore() - .instance_fetch_with_vmms(&opctx, authz_instance) + .instance_fetch_with_vmms(&opctx, &authz_instance) .await - .map_err(ActionError::action_failed) + .map_err(ActionError::action_failed)?; + osagactx + .nexus() + .execute_saga::(RealParams { + serialized_authn, + authz_instance, + state, + }) + .await + .map_err(ActionError::action_failed); + + Ok(()) } -async fn siu_unlock_instance( +async fn siu_become_updater( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref authz_instance, .. } = - sagactx.saga_params::()?; + let RealParams { + ref serialized_authn, ref authz_instance, ref state, .. + } = sagactx.saga_params::()?; + let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); - let datastore = osagactx.datastore(); - - slog::info!( + let osagactx = sagactx.user_data(); + slog::debug!( osagactx.log(), - "instance update: unlocking instance"; + "instance update: trying to become instance updater..."; "instance_id" => %authz_instance.id(), "saga_id" => %lock_id, + "parent_id" => ?state.instance.runtime_state.updater_id, ); - datastore - .instance_updater_unlock(&opctx, authz_instance, &lock_id) + osagactx + .datastore() + .instance_updater_inherit_lock(&opctx, &state.instance, &lock_id) .await .map_err(ActionError::action_failed)?; + slog::info!( + osagactx.log(), + "instance update: became instance updater"; + "instance_id" => %authz_instance.id(), + "saga_id" => %lock_id, + "parent_id" => ?state.instance.runtime_state.updater_id, + ); + + Ok(()) +} + +async fn siu_unbecome_updater( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let RealParams { ref serialized_authn, ref authz_instance, .. } = + sagactx.saga_params::()?; + unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await?; + Ok(()) } +async fn siu_unlock_instance( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let RealParams { ref serialized_authn, ref authz_instance, .. } = + sagactx.saga_params::()?; + unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await +} + // N.B. that this has to be a separate function just because the undo action // must return `anyhow::Error` rather than `ActionError`. async fn siu_lock_instance_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { - let osagactx = sagactx.user_data(); let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; + unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await?; + Ok(()) +} + +async fn unlock_instance_inner( + serialized_authn: &authn::saga::Serialized, + authz_instance: &authz::Instance, + sagactx: &NexusActionContext, +) -> Result<(), ActionError> { let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); - let datastore = osagactx.datastore(); - + let osagactx = sagactx.user_data(); slog::info!( osagactx.log(), - "instance update: unlocking instance on unwind"; + "instance update: unlocking instance"; "instance_id" => %authz_instance.id(), "saga_id" => %lock_id, ); - datastore.instance_updater_unlock(&opctx, authz_instance, &lock_id).await?; + osagactx + .datastore() + .instance_updater_unlock(&opctx, authz_instance, &lock_id) + .await + .map_err(ActionError::action_failed)?; Ok(()) } diff --git a/nexus/src/app/sagas/instance_update/start.rs b/nexus/src/app/sagas/instance_update/start.rs new file mode 100644 index 00000000000..e69de29bb2d