Skip to content

Commit

Permalink
clean things up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed May 29, 2024
1 parent 0ac6532 commit ba55078
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 117 deletions.
134 changes: 17 additions & 117 deletions nexus/src/app/sagas/instance_update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ use uuid::Uuid;

mod destroyed;

/// Parameters to the start instance update saga.
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Params {
/// Authentication context to use to fetch the instance's current state from
/// the database.
pub serialized_authn: authn::saga::Serialized,

pub authz_instance: authz::Instance,
}
// The public interface to this saga is actually a smaller saga that starts the
// "real" update saga, which inherits the lock from the start saga. This is
// because the decision of which subsaga(s) to run depends on the state of the
// instance record read from the database *once the lock has been acquired*,
// and the saga DAG for the "real" instance update saga may be constructed only
// after the instance state has been fetched. However, since the the instance
// state must be read inside the lock, that *also* needs to happen in a saga,
// so that the lock is always dropped when unwinding. Thus, we have a second,
// smaller saga which starts our real saga, and then the real saga, which
// decides what DAG to build based on the instance fetched by the start saga.
//
// Don't worry, this won't be on the test.
mod start;
pub(crate) use self::start::{Params, SagaInstanceUpdate};

/// Parameters to the "real" instance update saga.
#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -43,21 +48,6 @@ const INSTANCE_LOCK_ID: &str = "saga_instance_lock_id";
declare_saga_actions! {
instance_update;

// Acquire the instance updater" lock with this saga's ID if no other saga
// is currently updating the instance.
LOCK_INSTANCE -> "saga_instance_lock_gen" {
+ siu_lock_instance
- siu_lock_instance_undo
}

// Fetch the instance and VMM's state, and start the "real" instance update saga.
// N.B. that this must be performed as a separate action from
// `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the
// `LOCK_INSTANCE` action and release the lock.
FETCH_STATE_AND_START_REAL_SAGA -> "state" {
+ siu_fetch_state_and_start_real_saga
}

// Become the instance updater
BECOME_UPDATER -> "generation" {
+ siu_become_updater
Expand All @@ -70,36 +60,9 @@ declare_saga_actions! {
}

// instance update saga: definition
struct SagaDoActualInstanceUpdate;

#[derive(Debug)]
pub(crate) struct SagaInstanceUpdate;
impl NexusSaga for SagaInstanceUpdate {
const NAME: &'static str = "start-instance-update";
type Params = Params;

fn register_actions(registry: &mut ActionRegistry) {
instance_update_register_actions(registry);
}

fn make_saga_dag(
_params: &Self::Params,
mut builder: DagBuilder,
) -> Result<steno::Dag, super::SagaInitError> {
builder.append(Node::action(
INSTANCE_LOCK_ID,
"GenerateInstanceLockId",
ACTION_GENERATE_ID.as_ref(),
));
builder.append(lock_instance_action());
builder.append(fetch_state_and_start_real_saga_action());

Ok(builder.build()?)
}
}

struct SagaRealInstanceUpdate;

impl NexusSaga for SagaRealInstanceUpdate {
impl NexusSaga for SagaDoActualInstanceUpdate {
const NAME: &'static str = "instance-update";
type Params = RealParams;

Expand Down Expand Up @@ -165,58 +128,6 @@ impl NexusSaga for SagaRealInstanceUpdate {
}
}

// instance update saga: action implementations

async fn siu_lock_instance(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;
let lock_id = sagactx.lookup::<Uuid>(INSTANCE_LOCK_ID)?;
let opctx =
crate::context::op_context_for_saga_action(&sagactx, serialized_authn);
slog::info!(
osagactx.log(),
"instance update: attempting to lock instance";
"instance_id" => %authz_instance.id(),
"saga_id" => %lock_id,
);
osagactx
.datastore()
.instance_updater_lock(&opctx, authz_instance, &lock_id)
.await
.map_err(ActionError::action_failed)
.map(|_| ())
}

async fn siu_fetch_state_and_start_real_saga(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let Params { serialized_authn, authz_instance, .. } =
sagactx.saga_params::<Params>()?;
let opctx =
crate::context::op_context_for_saga_action(&sagactx, &serialized_authn);

let state = osagactx
.datastore()
.instance_fetch_with_vmms(&opctx, &authz_instance)
.await
.map_err(ActionError::action_failed)?;
osagactx
.nexus()
.execute_saga::<SagaRealInstanceUpdate>(RealParams {
serialized_authn,
authz_instance,
state,
})
.await
.map_err(ActionError::action_failed)?;

Ok(())
}

async fn siu_become_updater(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
Expand Down Expand Up @@ -244,7 +155,7 @@ async fn siu_become_updater(

slog::info!(
osagactx.log(),
"instance update: became instance updater";
"Now, I am become Updater, the destroyer of VMMs.";
"instance_id" => %authz_instance.id(),
"saga_id" => %lock_id,
"parent_id" => ?state.instance.runtime_state.updater_id,
Expand All @@ -271,17 +182,6 @@ async fn siu_unlock_instance(
unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await
}

// N.B. that this has to be a separate function just because the undo action
// must return `anyhow::Error` rather than `ActionError`.
async fn siu_lock_instance_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;
unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await?;
Ok(())
}

async fn unlock_instance_inner(
serialized_authn: &authn::saga::Serialized,
authz_instance: &authz::Instance,
Expand Down
140 changes: 140 additions & 0 deletions nexus/src/app/sagas/instance_update/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

// instance update start saga

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use super::{
ActionRegistry, NexusActionContext, NexusSaga, SagaInitError,
ACTION_GENERATE_ID, INSTANCE_LOCK_ID,
};
use crate::app::sagas::declare_saga_actions;
use nexus_db_queries::{authn, authz};
use serde::{Deserialize, Serialize};
use steno::{ActionError, DagBuilder, Node};
use uuid::Uuid;

/// Parameters to the start instance update saga.
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Params {
/// Authentication context to use to fetch the instance's current state from
/// the database.
pub serialized_authn: authn::saga::Serialized,

pub authz_instance: authz::Instance,
}

// instance update saga: actions

declare_saga_actions! {
instance_update;

// Acquire the instance updater" lock with this saga's ID if no other saga
// is currently updating the instance.
LOCK_INSTANCE -> "saga_instance_lock_gen" {
+ siu_lock_instance
- siu_lock_instance_undo
}

// Fetch the instance and VMM's state, and start the "real" instance update saga.
// N.B. that this must be performed as a separate action from
// `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the
// `LOCK_INSTANCE` action and release the lock.
FETCH_STATE_AND_START_REAL_SAGA -> "state" {
+ siu_fetch_state_and_start_real_saga
}
}

// instance update saga: definition

#[derive(Debug)]
pub(crate) struct SagaInstanceUpdate;
impl NexusSaga for SagaInstanceUpdate {
const NAME: &'static str = "start-instance-update";
type Params = Params;

fn register_actions(registry: &mut ActionRegistry) {
instance_update_register_actions(registry);
}

fn make_saga_dag(
_params: &Self::Params,
mut builder: DagBuilder,
) -> Result<steno::Dag, SagaInitError> {
builder.append(Node::action(
INSTANCE_LOCK_ID,
"GenerateInstanceLockId",
ACTION_GENERATE_ID.as_ref(),
));
builder.append(lock_instance_action());
builder.append(fetch_state_and_start_real_saga_action());

Ok(builder.build()?)
}
}

// start instance update saga: action implementations

async fn siu_lock_instance(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;
let lock_id = sagactx.lookup::<Uuid>(INSTANCE_LOCK_ID)?;
let opctx =
crate::context::op_context_for_saga_action(&sagactx, serialized_authn);
slog::info!(
osagactx.log(),
"instance update: attempting to lock instance";
"instance_id" => %authz_instance.id(),
"saga_id" => %lock_id,
);
osagactx
.datastore()
.instance_updater_lock(&opctx, authz_instance, &lock_id)
.await
.map_err(ActionError::action_failed)
.map(|_| ())
}

async fn siu_lock_instance_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
let Params { ref serialized_authn, ref authz_instance, .. } =
sagactx.saga_params::<Params>()?;
super::unlock_instance_inner(serialized_authn, authz_instance, &sagactx)
.await?;
Ok(())
}

async fn siu_fetch_state_and_start_real_saga(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let Params { serialized_authn, authz_instance, .. } =
sagactx.saga_params::<Params>()?;
let opctx =
crate::context::op_context_for_saga_action(&sagactx, &serialized_authn);

let state = osagactx
.datastore()
.instance_fetch_with_vmms(&opctx, &authz_instance)
.await
.map_err(ActionError::action_failed)?;
osagactx
.nexus()
.execute_saga::<super::SagaDoActualInstanceUpdate>(super::RealParams {
serialized_authn,
authz_instance,
state,
})
.await
.map_err(ActionError::action_failed)?;

Ok(())
}

0 comments on commit ba55078

Please sign in to comment.