Skip to content

Commit

Permalink
rewrite most of the saga
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed May 24, 2024
1 parent 003322f commit 22f6fb1
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 144 deletions.
3 changes: 0 additions & 3 deletions nexus/db-queries/src/db/datastore/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ impl DataStore {
opctx: &OpContext,
authz_instance: &authz::Instance,
saga_lock_id: &Uuid,
updater_gen: Generation,
) -> Result<bool, Error> {
use db::schema::instance::dsl;

Expand All @@ -491,8 +490,6 @@ impl DataStore {
// - the provided updater ID matches that of the saga that has
// currently locked this instance.
.filter(dsl::updater_id.eq(Some(*saga_lock_id)))
// - the generation is the same as the captured generation.
.filter(dsl::updater_gen.eq(updater_gen))
.set((
dsl::updater_gen.eq(dsl::updater_gen + 1),
dsl::updater_id.eq(None::<Uuid>),
Expand Down
11 changes: 1 addition & 10 deletions nexus/src/app/background/instance_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,7 @@ impl InstanceUpdater {
stats.destroyed_active_vmms = destroyed_active_vmms.len();

for InstanceAndActiveVmm { instance, vmm } in destroyed_active_vmms {
let saga = SagaRequest::InstanceUpdate {
params: sagas::instance_update::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
state: InstanceAndVmms {
instance,
active_vmm: vmm,
target_vmm: None,
},
},
};
let saga = SagaRequest::InstanceUpdate { params: todo!() };
self.saga_req
.send(saga)
.await
Expand Down
16 changes: 8 additions & 8 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,12 +1338,7 @@ impl super::Nexus {
if let Some(state) = state {
let update_result = self
.db_datastore
.instance_and_vmm_update_runtime(
instance_id,
&state.instance_state.into(),
&state.propolis_id,
&state.vmm_state.into(),
)
.vmm_update_runtime(&state.propolis_id, &state.vmm_state.into())
.await;

slog::debug!(&self.log,
Expand All @@ -1352,7 +1347,8 @@ impl super::Nexus {
"propolis_id" => %state.propolis_id,
"result" => ?update_result);

update_result
// TODO(eliza): probably just change the retval to `bool` later...
update_result.map(|vmm_updated| (false, vmm_updated))
} else {
Ok((false, false))
}
Expand Down Expand Up @@ -2002,7 +1998,11 @@ pub(crate) async fn notify_instance_updated(
.await?;

let updated = datastore
.vmm_update_runtime(&propolis_id, &new_runtime_state.vmm_state)
.vmm_update_runtime(
&propolis_id,
// TODO(eliza): probably should take this by value...
&new_runtime_state.vmm_state.clone().into(),
)
.await?;

// // Update OPTE and Dendrite if the instance's active sled assignment
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ impl Nexus {
&self.log,
o!(
"saga" => "instance_update",
"instance_id" => params.state.instance.id().to_string(),
"instance_id" => params.authz_instance.id().to_string(),
),
);
tokio::spawn(async move {
Expand Down
102 changes: 66 additions & 36 deletions nexus/src/app/sagas/instance_update/destroyed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,21 @@
use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::Params;
use super::STATE;
use crate::app::sagas::declare_saga_actions;
use crate::app::sagas::ActionError;
use db::lookup::LookupPath;
use nexus_db_model::Generation;
use nexus_db_model::Instance;
use nexus_db_model::InstanceRuntimeState;
use nexus_db_model::Vmm;
use nexus_db_queries::db::datastore::InstanceAndVmms;
use nexus_db_queries::db::identity::Resource;
use nexus_db_queries::{authn, authz, db};
use omicron_common::api::external;
use omicron_common::api::external::Error;
use omicron_common::api::external::ResourceType;
use serde::{Deserialize, Serialize};
use omicron_common::api::external::InstanceState;
use slog::info;

/// Parameters to the instance update VMM destroyed sub-saga.
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Params {
/// Authentication context to use to fetch the instance's current state from
/// the database.
pub serialized_authn: authn::saga::Serialized,

pub instance: db::model::Instance,

pub vmm: db::model::Vmm,
}

// instance update VMM destroyed subsaga: actions

// This subsaga is responsible for handling an instance update where the
Expand Down Expand Up @@ -102,20 +92,38 @@ impl NexusSaga for SagaVmmDestroyed {
}
}

fn get_destroyed_vmm(
sagactx: &NexusActionContext,
) -> Result<Option<(Instance, Vmm)>, ActionError> {
let state = sagactx.lookup::<InstanceAndVmms>(STATE)?;
match state.active_vmm {
Some(vmm) if vmm.runtime.state.state() == &InstanceState::Destroyed => {
Ok(Some((state.instance, vmm)))
}
_ => Ok(None),
}
}

async fn siud_release_sled_resources(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};

let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref vmm, ref instance, .. } =
let Params { ref serialized_authn, ref authz_instance } =
sagactx.saga_params::<Params>()?;

let opctx =
crate::context::op_context_for_saga_action(&sagactx, serialized_authn);

info!(
osagactx.log(),
"instance update (VMM destroyed): deallocating sled resource reservation";
"instance_id" => %instance.id(),
"instance update (active VMM destroyed): deallocating sled resource reservation";
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"instance_update" => %"VMM destroyed",
);
Expand All @@ -137,8 +145,14 @@ async fn siud_release_sled_resources(
async fn siud_release_virtual_provisioning(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};

let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref instance, ref vmm, .. } =
let Params { ref serialized_authn, ref authz_instance } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -147,7 +161,7 @@ async fn siud_release_virtual_provisioning(
info!(
osagactx.log(),
"instance update (VMM destroyed): deallocating virtual provisioning resources";
"instance_id" => %instance.id(),
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"instance_update" => %"VMM destroyed",
);
Expand All @@ -156,7 +170,7 @@ async fn siud_release_virtual_provisioning(
.datastore()
.virtual_provisioning_collection_delete_instance(
&opctx,
instance.id(),
authz_instance.id(),
instance.project_id,
i64::from(instance.ncpus.0 .0),
instance.memory,
Expand All @@ -176,8 +190,13 @@ async fn siud_release_virtual_provisioning(
async fn siud_unassign_oximeter_producer(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref instance, ref serialized_authn, .. } =
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -187,7 +206,7 @@ async fn siud_unassign_oximeter_producer(
osagactx.datastore(),
osagactx.log(),
&opctx,
&instance.id(),
&authz_instance.id(),
)
.await
.map_err(ActionError::action_failed)
Expand All @@ -196,10 +215,12 @@ async fn siud_unassign_oximeter_producer(
async fn siud_delete_v2p_mappings(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref instance, ref vmm, .. } =
sagactx.saga_params::<Params>()?;

info!(
osagactx.log(),
"instance update (VMM destroyed): deleting V2P mappings";
Expand All @@ -216,8 +237,13 @@ async fn siud_delete_v2p_mappings(
async fn siud_delete_nat_entries(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((_, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref vmm, ref instance, .. } =
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand All @@ -226,16 +252,11 @@ async fn siud_delete_nat_entries(
info!(
osagactx.log(),
"instance update (VMM destroyed): deleting NAT entries";
"instance_id" => %instance.id(),
"instance_id" => %authz_instance.id(),
"propolis_id" => %vmm.id,
"instance_update" => %"VMM destroyed",
);

let (.., authz_instance) = LookupPath::new(&opctx, &osagactx.datastore())
.instance_id(instance.id())
.lookup_for(authz::Action::Modify)
.await
.map_err(ActionError::action_failed)?;
osagactx
.nexus()
.instance_delete_dpd_config(&opctx, &authz_instance)
Expand All @@ -247,8 +268,12 @@ async fn siud_delete_nat_entries(
async fn siud_update_instance(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { instance, vmm, .. } = sagactx.saga_params::<Params>()?;
let new_runtime = InstanceRuntimeState {
propolis_id: None,
nexus_state: external::InstanceState::Stopped.into(),
Expand Down Expand Up @@ -276,8 +301,13 @@ async fn siud_update_instance(
async fn siud_mark_vmm_deleted(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let Some((instance, vmm)) = get_destroyed_vmm(&sagactx)? else {
// if the update we are handling is not an active VMM destroyed update,
// bail --- there's nothing to do here.
return Ok(());
};
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref vmm, ref instance, .. } =
let Params { ref serialized_authn, .. } =
sagactx.saga_params::<Params>()?;

let opctx =
Expand Down
Loading

0 comments on commit 22f6fb1

Please sign in to comment.