Skip to content

Commit

Permalink
clean up saga chaining code
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 10, 2024
1 parent f2c8b18 commit c824e49
Showing 1 changed file with 82 additions and 34 deletions.
116 changes: 82 additions & 34 deletions nexus/src/app/sagas/instance_update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::app::db::model::InstanceState;
use crate::app::db::model::MigrationState;
use crate::app::db::model::VmmState;
use crate::app::sagas::declare_saga_actions;
use anyhow::Context;
use chrono::Utc;
use nexus_db_queries::{authn, authz};
use nexus_types::identity::Resource;
Expand Down Expand Up @@ -253,9 +254,12 @@ declare_saga_actions! {
+ siu_unassign_oximeter_producer
}

// Release the lock and write back the new instance record.
UPDATE_AND_UNLOCK_INSTANCE -> "unlocked" {
+ siu_update_and_unlock_instance
// Write back the new instance record, releasing the instance updater lock,
// and re-fetch the VMM and migration states. If they have changed in a way
// that requires an additional update saga, attempt to execute an additional
// update saga immediately.
COMMIT_INSTANCE_UPDATES -> "commit_instance_updates" {
+ siu_commit_instance_updates
}

}
Expand Down Expand Up @@ -308,8 +312,8 @@ impl NexusSaga for SagaDoActualInstanceUpdate {
}

// Once we've finished mutating everything owned by the instance, we can
// write ck the updated state and release the instance lock.
builder.append(update_and_unlock_instance_action());
// write back the updated state and release the instance lock.
builder.append(commit_instance_updates_action());

// If either VMM linked to this instance has been destroyed, append
// subsagas to clean up the VMMs resources and mark them as deleted.
Expand Down Expand Up @@ -570,7 +574,7 @@ pub(super) async fn siu_unassign_oximeter_producer(
.map_err(ActionError::action_failed)
}

async fn siu_update_and_unlock_instance(
async fn siu_commit_instance_updates(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let RealParams { serialized_authn, authz_instance, ref update, .. } =
Expand All @@ -582,46 +586,90 @@ async fn siu_update_and_unlock_instance(
Some(&update.new_runtime),
)
.await?;
let instance_id = authz_instance.id();

// Check if the VMM or migration state has changed while the update saga was
// running and whether an additional update saga is now required. If one is
// required, try to start it.
//
// TODO(eliza): it would be nice if we didn't release the lock, determine
// the needed updates, and then start a new start-instance-update saga that
// re-locks the instance --- instead, perhaps we could keep the lock, and
// try to start a new "actual" instance update saga that inherits our lock.
// This way, we could also avoid computing updates required twice.
// But, I'm a bit sketched out by the implications of not committing update
// and dropping the lock in the same operation. This deserves more thought...
if let Err(error) =
chain_update_saga(&sagactx, authz_instance, serialized_authn).await
{
let osagactx = sagactx.user_data();
// If starting the new update saga failed, DO NOT unwind this saga and
// undo all the work we've done successfully! Instead, just kick the
// instance-updater background task to try and start a new saga
// eventually, and log a warning.
warn!(
osagactx.log(),
"instance update: failed to start successor saga!";
"instance_id" => %instance_id,
"error" => %error,
);
osagactx.nexus().background_tasks.task_instance_updater.activate();
}

Ok(())
}

async fn chain_update_saga(
sagactx: &NexusActionContext,
authz_instance: authz::Instance,
serialized_authn: authn::saga::Serialized,
) -> Result<(), anyhow::Error> {
let opctx =
crate::context::op_context_for_saga_action(&sagactx, &serialized_authn);
crate::context::op_context_for_saga_action(sagactx, &serialized_authn);
let osagactx = sagactx.user_data();
let instance_id = authz_instance.id();

// fetch the state from the database again to see if we should immediately
// Fetch the state from the database again to see if we should immediately
// run a new saga.
// TODO(eliza): go back and make the unlock-instance query return the
// current state, instead...
let new_state = match osagactx
let new_state = osagactx
.datastore()
.instance_fetch_all(&opctx, &authz_instance)
.await
{
Ok(s) => s,
Err(e) => {
warn!(osagactx.log(), "instance update: failed to fetch state on saga completion";
"instance_id" => %authz_instance.id(),
"error" => %e);
// if we can't refetch here, don't unwind all the work we did do.
// the instance-updater background task will take care of it.
return Ok(());
}
};
.context("failed to fetch latest snapshot for instance")?;

if UpdatesRequired::for_snapshot(osagactx.log(), &new_state).is_some() {
if let Err(e) = osagactx
if let Some(update) =
UpdatesRequired::for_snapshot(osagactx.log(), &new_state)
{
debug!(
osagactx.log(),
"instance update: additional updates required, preparing a \
successor update saga...";
"instance_id" => %instance_id,
"update.new_runtime_state" => ?update.new_runtime,
"update.network_config_update" => ?update.network_config,
"update.destroy_active_vmm" => ?update.destroy_active_vmm,
"update.destroy_target_vmm" => ?update.destroy_target_vmm,
"update.deprovision" => update.deprovision,
);
let saga_dag = SagaInstanceUpdate::prepare(&Params {
serialized_authn,
authz_instance,
})
.context("failed to build new update saga DAG")?;
let saga = osagactx
.nexus()
.sagas
.saga_execute::<SagaInstanceUpdate>(Params {
// everyone in the friend group just venmo-ing the same
// serialized_authn back and forth forever.
serialized_authn,
authz_instance,
})
.saga_prepare(saga_dag)
.await
{
// again, if this fails, don't unwind all the good work we already did.
warn!(osagactx.log(), "instant update: subsequent saga execution failed"; "error" => %e);
}
.context("failed to prepare new update saga")?;
saga.start().await.context("failed to start successor update saga")?;
// N.B. that we don't wait for the successor update saga to *complete*
// here. We just want to make sure it starts.
info!(
osagactx.log(),
"instance update: successor update saga started!";
"instance_id" => %instance_id,
);
}

Ok(())
Expand Down

0 comments on commit c824e49

Please sign in to comment.