From 22f6fb17997c187076871967965c439844ddd044 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 May 2024 13:37:31 -0700 Subject: [PATCH] rewrite most of the saga --- nexus/db-queries/src/db/datastore/instance.rs | 3 - nexus/src/app/background/instance_updater.rs | 11 +- nexus/src/app/instance.rs | 16 +- nexus/src/app/mod.rs | 2 +- .../app/sagas/instance_update/destroyed.rs | 102 +++++--- nexus/src/app/sagas/instance_update/mod.rs | 235 +++++++++++------- 6 files changed, 225 insertions(+), 144 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index ddfacb608af..1d8e1dbfea6 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -478,7 +478,6 @@ impl DataStore { opctx: &OpContext, authz_instance: &authz::Instance, saga_lock_id: &Uuid, - updater_gen: Generation, ) -> Result { use db::schema::instance::dsl; @@ -491,8 +490,6 @@ impl DataStore { // - the provided updater ID matches that of the saga that has // currently locked this instance. .filter(dsl::updater_id.eq(Some(*saga_lock_id))) - // - the generation is the same as the captured generation. - .filter(dsl::updater_gen.eq(updater_gen)) .set(( dsl::updater_gen.eq(dsl::updater_gen + 1), dsl::updater_id.eq(None::), diff --git a/nexus/src/app/background/instance_updater.rs b/nexus/src/app/background/instance_updater.rs index ebdb4bb79e1..f43aef91315 100644 --- a/nexus/src/app/background/instance_updater.rs +++ b/nexus/src/app/background/instance_updater.rs @@ -59,16 +59,7 @@ impl InstanceUpdater { stats.destroyed_active_vmms = destroyed_active_vmms.len(); for InstanceAndActiveVmm { instance, vmm } in destroyed_active_vmms { - let saga = SagaRequest::InstanceUpdate { - params: sagas::instance_update::Params { - serialized_authn: authn::saga::Serialized::for_opctx(opctx), - state: InstanceAndVmms { - instance, - active_vmm: vmm, - target_vmm: None, - }, - }, - }; + let saga = SagaRequest::InstanceUpdate { params: todo!() }; self.saga_req .send(saga) .await diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 4ff51ff5462..027661d1f6f 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -1338,12 +1338,7 @@ impl super::Nexus { if let Some(state) = state { let update_result = self .db_datastore - .instance_and_vmm_update_runtime( - instance_id, - &state.instance_state.into(), - &state.propolis_id, - &state.vmm_state.into(), - ) + .vmm_update_runtime(&state.propolis_id, &state.vmm_state.into()) .await; slog::debug!(&self.log, @@ -1352,7 +1347,8 @@ impl super::Nexus { "propolis_id" => %state.propolis_id, "result" => ?update_result); - update_result + // TODO(eliza): probably just change the retval to `bool` later... + update_result.map(|vmm_updated| (false, vmm_updated)) } else { Ok((false, false)) } @@ -2002,7 +1998,11 @@ pub(crate) async fn notify_instance_updated( .await?; let updated = datastore - .vmm_update_runtime(&propolis_id, &new_runtime_state.vmm_state) + .vmm_update_runtime( + &propolis_id, + // TODO(eliza): probably should take this by value... + &new_runtime_state.vmm_state.clone().into(), + ) .await?; // // Update OPTE and Dendrite if the instance's active sled assignment diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 91abccc52c6..7d7a03151c6 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -931,7 +931,7 @@ impl Nexus { &self.log, o!( "saga" => "instance_update", - "instance_id" => params.state.instance.id().to_string(), + "instance_id" => params.authz_instance.id().to_string(), ), ); tokio::spawn(async move { diff --git a/nexus/src/app/sagas/instance_update/destroyed.rs b/nexus/src/app/sagas/instance_update/destroyed.rs index 77bed8be436..b4cb12c57af 100644 --- a/nexus/src/app/sagas/instance_update/destroyed.rs +++ b/nexus/src/app/sagas/instance_update/destroyed.rs @@ -5,31 +5,21 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; +use super::Params; +use super::STATE; use crate::app::sagas::declare_saga_actions; use crate::app::sagas::ActionError; -use db::lookup::LookupPath; use nexus_db_model::Generation; +use nexus_db_model::Instance; use nexus_db_model::InstanceRuntimeState; +use nexus_db_model::Vmm; +use nexus_db_queries::db::datastore::InstanceAndVmms; use nexus_db_queries::db::identity::Resource; -use nexus_db_queries::{authn, authz, db}; use omicron_common::api::external; use omicron_common::api::external::Error; -use omicron_common::api::external::ResourceType; -use serde::{Deserialize, Serialize}; +use omicron_common::api::external::InstanceState; use slog::info; -/// Parameters to the instance update VMM destroyed sub-saga. -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct Params { - /// Authentication context to use to fetch the instance's current state from - /// the database. - pub serialized_authn: authn::saga::Serialized, - - pub instance: db::model::Instance, - - pub vmm: db::model::Vmm, -} - // instance update VMM destroyed subsaga: actions // This subsaga is responsible for handling an instance update where the @@ -102,11 +92,29 @@ impl NexusSaga for SagaVmmDestroyed { } } +fn get_destroyed_vmm( + sagactx: &NexusActionContext, +) -> Result, ActionError> { + let state = sagactx.lookup::(STATE)?; + match state.active_vmm { + Some(vmm) if vmm.runtime.state.state() == &InstanceState::Destroyed => { + Ok(Some((state.instance, vmm))) + } + _ => Ok(None), + } +} + async fn siud_release_sled_resources( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; + let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref vmm, ref instance, .. } = + let Params { ref serialized_authn, ref authz_instance } = sagactx.saga_params::()?; let opctx = @@ -114,8 +122,8 @@ async fn siud_release_sled_resources( info!( osagactx.log(), - "instance update (VMM destroyed): deallocating sled resource reservation"; - "instance_id" => %instance.id(), + "instance update (active VMM destroyed): deallocating sled resource reservation"; + "instance_id" => %authz_instance.id(), "propolis_id" => %vmm.id, "instance_update" => %"VMM destroyed", ); @@ -137,8 +145,14 @@ async fn siud_release_sled_resources( async fn siud_release_virtual_provisioning( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; + let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref instance, ref vmm, .. } = + let Params { ref serialized_authn, ref authz_instance } = sagactx.saga_params::()?; let opctx = @@ -147,7 +161,7 @@ async fn siud_release_virtual_provisioning( info!( osagactx.log(), "instance update (VMM destroyed): deallocating virtual provisioning resources"; - "instance_id" => %instance.id(), + "instance_id" => %authz_instance.id(), "propolis_id" => %vmm.id, "instance_update" => %"VMM destroyed", ); @@ -156,7 +170,7 @@ async fn siud_release_virtual_provisioning( .datastore() .virtual_provisioning_collection_delete_instance( &opctx, - instance.id(), + authz_instance.id(), instance.project_id, i64::from(instance.ncpus.0 .0), instance.memory, @@ -176,8 +190,13 @@ async fn siud_release_virtual_provisioning( async fn siud_unassign_oximeter_producer( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; let osagactx = sagactx.user_data(); - let Params { ref instance, ref serialized_authn, .. } = + let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let opctx = @@ -187,7 +206,7 @@ async fn siud_unassign_oximeter_producer( osagactx.datastore(), osagactx.log(), &opctx, - &instance.id(), + &authz_instance.id(), ) .await .map_err(ActionError::action_failed) @@ -196,10 +215,12 @@ async fn siud_unassign_oximeter_producer( async fn siud_delete_v2p_mappings( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; let osagactx = sagactx.user_data(); - let Params { ref instance, ref vmm, .. } = - sagactx.saga_params::()?; - info!( osagactx.log(), "instance update (VMM destroyed): deleting V2P mappings"; @@ -216,8 +237,13 @@ async fn siud_delete_v2p_mappings( async fn siud_delete_nat_entries( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref vmm, ref instance, .. } = + let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let opctx = @@ -226,16 +252,11 @@ async fn siud_delete_nat_entries( info!( osagactx.log(), "instance update (VMM destroyed): deleting NAT entries"; - "instance_id" => %instance.id(), + "instance_id" => %authz_instance.id(), "propolis_id" => %vmm.id, "instance_update" => %"VMM destroyed", ); - let (.., authz_instance) = LookupPath::new(&opctx, &osagactx.datastore()) - .instance_id(instance.id()) - .lookup_for(authz::Action::Modify) - .await - .map_err(ActionError::action_failed)?; osagactx .nexus() .instance_delete_dpd_config(&opctx, &authz_instance) @@ -247,8 +268,12 @@ async fn siud_delete_nat_entries( async fn siud_update_instance( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; let osagactx = sagactx.user_data(); - let Params { instance, vmm, .. } = sagactx.saga_params::()?; let new_runtime = InstanceRuntimeState { propolis_id: None, nexus_state: external::InstanceState::Stopped.into(), @@ -276,8 +301,13 @@ async fn siud_update_instance( async fn siud_mark_vmm_deleted( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else { + // if the update we are handling is not an active VMM destroyed update, + // bail --- there's nothing to do here. + return Ok(()); + }; let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref vmm, ref instance, .. } = + let Params { ref serialized_authn, .. } = sagactx.saga_params::()?; let opctx = diff --git a/nexus/src/app/sagas/instance_update/mod.rs b/nexus/src/app/sagas/instance_update/mod.rs index fd85509a2ee..c711d3a826b 100644 --- a/nexus/src/app/sagas/instance_update/mod.rs +++ b/nexus/src/app/sagas/instance_update/mod.rs @@ -25,26 +25,32 @@ pub(crate) struct Params { /// the database. pub serialized_authn: authn::saga::Serialized, - pub state: InstanceAndVmms, + pub authz_instance: authz::Instance, } const INSTANCE_LOCK_ID: &str = "saga_instance_lock_id"; -const INSTANCE_LOCK_GEN: &str = "saga_instance_lock_gen"; +const STATE: &str = "state"; // instance update saga: actions declare_saga_actions! { instance_update; - // Read the target Instance from CRDB and join with its active VMM and - // migration target VMM records if they exist, and then acquire the - // "instance updater" lock with this saga's ID if no other saga is currently - // updating the instance. + // Acquire the instance updater" lock with this saga's ID if no other saga + // is currently updating the instance. LOCK_INSTANCE -> "saga_instance_lock_gen" { + siu_lock_instance - siu_lock_instance_undo } + // Fetch the instance and VMM's state. + // N.B. that this must be performed as a separate action from + // `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the + // `LOCK_INSTANCE` action and release the lock. + FETCH_STATE -> "state" { + + siu_fetch_state + } + UNLOCK_INSTANCE -> "no_result7" { + siu_unlock_instance } @@ -72,52 +78,38 @@ impl NexusSaga for SagaInstanceUpdate { ACTION_GENERATE_ID.as_ref(), )); builder.append(lock_instance_action()); + builder.append(fetch_state_action()); // determine which subsaga to execute based on the state of the instance // and the VMMs associated with it. - match params.state { - // VMM destroyed subsaga - InstanceAndVmms { - ref instance, active_vmm: Some(ref vmm), .. - } if vmm.runtime.state.state() == &InstanceState::Destroyed => { - const DESTROYED_SUBSAGA_PARAMS: &str = - "params_for_vmm_destroyed_subsaga"; - let subsaga_params = destroyed::Params { - serialized_authn: params.serialized_authn.clone(), - instance: instance.clone(), - vmm: vmm.clone(), - }; - let subsaga_dag = { - let subsaga_builder = DagBuilder::new(SagaName::new( - destroyed::SagaVmmDestroyed::NAME, - )); - destroyed::SagaVmmDestroyed::make_saga_dag( - &subsaga_params, - subsaga_builder, - )? - }; - - builder.append(Node::constant( - DESTROYED_SUBSAGA_PARAMS, - serde_json::to_value(&subsaga_params).map_err(|e| { - SagaInitError::SerializeError( - DESTROYED_SUBSAGA_PARAMS.to_string(), - e, - ) - })?, - )); - - builder.append(Node::subsaga( - "vmm_destroyed_subsaga_no_result", - subsaga_dag, - DESTROYED_SUBSAGA_PARAMS, - )); - } - _ => { - // TODO(eliza): other subsagas - } + const DESTROYED_SUBSAGA_PARAMS: &str = + "params_for_vmm_destroyed_subsaga"; + let subsaga_dag = { + let subsaga_builder = DagBuilder::new(SagaName::new( + destroyed::SagaVmmDestroyed::NAME, + )); + destroyed::SagaVmmDestroyed::make_saga_dag( + ¶ms, + subsaga_builder, + )? }; + builder.append(Node::constant( + DESTROYED_SUBSAGA_PARAMS, + serde_json::to_value(¶ms).map_err(|e| { + SagaInitError::SerializeError( + DESTROYED_SUBSAGA_PARAMS.to_string(), + e, + ) + })?, + )); + + builder.append(Node::subsaga( + "vmm_destroyed_subsaga_no_result", + subsaga_dag, + DESTROYED_SUBSAGA_PARAMS, + )); + builder.append(unlock_instance_action()); Ok(builder.build()?) @@ -130,82 +122,153 @@ async fn siu_lock_instance( sagactx: NexusActionContext, ) -> Result { let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref state, .. } = + let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); let datastore = osagactx.datastore(); + let log = osagactx.log(); + let instance_id = authz_instance.id(); + slog::info!( + log, + "instance update: attempting to lock instance"; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + ); - let (.., authz_instance) = LookupPath::new(&opctx, datastore) - .instance_id(state.instance.id()) - .lookup_for(authz::Action::Modify) - .await - .map_err(ActionError::action_failed)?; + loop { + let instance = datastore + .instance_refetch(&opctx, &authz_instance) + .await + .map_err(ActionError::action_failed)?; + // Look at the current lock state of the instance and determine whether + // we can lock it. + match instance.runtime_state.updater_id { + Some(ref id) if id == &lock_id => { + slog::info!( + log, + "instance update: instance already locked by this saga"; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + ); + return Ok(instance.runtime_state.updater_gen); + } + Some(ref id) => { + slog::info!( + log, + "instance update: instance locked by another saga"; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + "locked_by" => %lock_id, + ); + return Err(ActionError::action_failed(serde_json::json!({ + "error": "instance locked by another saga", + "saga_id": lock_id, + "locked_by": id, + }))); + } + None => {} + }; + let gen = instance.runtime_state.updater_gen; + slog::debug!( + log, + "instance update: trying to acquire updater lock..."; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + "updater_gen" => ?gen, + ); + let lock = datastore + .instance_updater_try_lock(&opctx, &authz_instance, gen, &lock_id) + .await + .map_err(ActionError::action_failed)?; + match lock { + Some(lock_gen) => { + slog::info!( + log, + "instance update: acquired updater lock"; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + "updater_gen" => ?gen, + ); + return Ok(lock_gen); + } + None => { + slog::debug!( + log, + "instance update: generation has advanced, retrying..."; + "instance_id" => %instance_id, + "saga_id" => %lock_id, + "updater_gen" => ?gen, + ); + } + } + } +} - // try to acquire the instance updater lock - datastore - .instance_updater_try_lock( - &opctx, - &authz_instance, - state.instance.runtime_state.updater_gen, - &lock_id, - ) +async fn siu_fetch_state( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let Params { ref serialized_authn, ref authz_instance, .. } = + sagactx.saga_params::()?; + let opctx = + crate::context::op_context_for_saga_action(&sagactx, serialized_authn); + + osagactx + .datastore() + .instance_fetch_with_vmms(&opctx, authz_instance) .await - .map_err(ActionError::action_failed)? - .ok_or_else(|| { - ActionError::action_failed( - serde_json::json!({"error": "can't get ye lock"}), - ) - }) + .map_err(ActionError::action_failed) } async fn siu_unlock_instance( sagactx: NexusActionContext, ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref state, .. } = + let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; - let gen = sagactx.lookup::(INSTANCE_LOCK_GEN)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); let datastore = osagactx.datastore(); - let (.., authz_instance) = LookupPath::new(&opctx, datastore) - .instance_id(state.instance.id()) - .lookup_for(authz::Action::Modify) - .await - .map_err(ActionError::action_failed)?; + slog::info!( + osagactx.log(), + "instance update: unlocking instance"; + "instance_id" => %authz_instance.id(), + "saga_id" => %lock_id, + ); datastore - .instance_updater_unlock(&opctx, &authz_instance, &lock_id, gen) + .instance_updater_unlock(&opctx, authz_instance, &lock_id) .await .map_err(ActionError::action_failed)?; + Ok(()) } -// this is different from "lock instance" lol +// N.B. that this has to be a separate function just because the undo action +// must return `anyhow::Error` rather than `ActionError`. async fn siu_lock_instance_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); - let Params { ref serialized_authn, ref state, .. } = + let Params { ref serialized_authn, ref authz_instance, .. } = sagactx.saga_params::()?; let lock_id = sagactx.lookup::(INSTANCE_LOCK_ID)?; let opctx = crate::context::op_context_for_saga_action(&sagactx, serialized_authn); let datastore = osagactx.datastore(); - let (.., authz_instance) = LookupPath::new(&opctx, datastore) - .instance_id(state.instance.id()) - .lookup_for(authz::Action::Modify) - .await - .map_err(ActionError::action_failed)?; + slog::info!( + osagactx.log(), + "instance update: unlocking instance on unwind"; + "instance_id" => %authz_instance.id(), + "saga_id" => %lock_id, + ); + + datastore.instance_updater_unlock(&opctx, authz_instance, &lock_id).await?; - let updater_gen = state.instance.runtime_state.updater_gen.next().into(); - datastore - .instance_updater_unlock(&opctx, &authz_instance, &lock_id, updater_gen) - .await?; Ok(()) }