Skip to content

Commit

Permalink
refactor notify_instance_updated a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 4, 2024
1 parent 1d64aa8 commit 0f986e1
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 70 deletions.
43 changes: 21 additions & 22 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,39 +152,38 @@ impl InstanceWatcher {
let new_runtime_state: SledInstanceState = state.into();
check.outcome =
CheckOutcome::Success(new_runtime_state.vmm_state.state.into());
slog::debug!(
debug!(
opctx.log,
"updating instance state";
"state" => ?new_runtime_state.vmm_state.state,
);
check.result = crate::app::instance::notify_instance_updated(
match crate::app::instance::notify_instance_updated(
&datastore,
sagas.as_ref(),
&opctx,
InstanceUuid::from_untyped_uuid(target.instance_id),
new_runtime_state,
&new_runtime_state,
)
.await
.map_err(|e| {
slog::warn!(
opctx.log,
"error updating instance";
"error" => ?e,
);
match e {
Error::ObjectNotFound { .. } => {
Incomplete::InstanceNotFound
{
Err(e) => {
warn!(opctx.log, "error updating instance"; "error" => %e);
check.result = match e {
Error::ObjectNotFound { .. } => {
Err(Incomplete::InstanceNotFound)
}
_ => Err(Incomplete::UpdateFailed),
};
}
Ok(Some(saga)) => {
check.update_saga_queued = true;
if let Err(e) = sagas.saga_start(saga).await {
warn!(opctx.log, "update saga failed"; "error" => ?e);
check.result = Err(Incomplete::UpdateFailed);
}
_ => Incomplete::UpdateFailed,
}
})
.map(|updated| {
slog::debug!(
opctx.log, "update successful";
"vmm_updated" => ?updated,
);
check.update_saga_queued = updated;
});
Ok(None) => {}
};

check
}
}
Expand Down
81 changes: 37 additions & 44 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,53 +1404,44 @@ impl super::Nexus {
pub(crate) async fn notify_instance_updated(
self: &Arc<Self>,
opctx: &OpContext,
instance_id: &InstanceUuid,
instance_id: InstanceUuid,
new_runtime_state: &nexus::SledInstanceState,
) -> Result<(), Error> {
let migrations = new_runtime_state.migrations();
let propolis_id = new_runtime_state.propolis_id;
info!(opctx.log, "received new VMM runtime state from sled agent";
"instance_id" => %instance_id,
"propolis_id" => %propolis_id,
"vmm_state" => ?new_runtime_state.vmm_state,
"migration_state" => ?migrations,
);
let saga = notify_instance_updated(
&self.db_datastore,
opctx,
instance_id,
new_runtime_state,
)
.await?;

let (vmm_updated, migration_updated) = self
.db_datastore
.vmm_and_migration_update_runtime(
propolis_id,
// TODO(eliza): probably should take this by value...
&new_runtime_state.vmm_state.clone().into(),
migrations,
)
.await?;
let updated = vmm_updated || migration_updated.unwrap_or(false);
if updated {
// We don't need to wait for the instance update saga to run to
// completion to return OK to the sled-agent --- all it needs to care
// about is that the VMM/migration state in the database was updated.
// Even if we fail to successfully start an update saga, the
// instance-updater background task will eventually see that the
// instance is in a state which requires an update saga, and ensure that
// one is eventually executed.
//
// Therefore, just spawn the update saga in a new task, and return.
if let Some(saga) = saga {
info!(opctx.log, "starting update saga for {instance_id}";
"instance_id" => %instance_id,
"propolis_id" => %propolis_id,
"vmm_state" => ?new_runtime_state.vmm_state,
"migration_state" => ?migrations,
"migration_state" => ?new_runtime_state.migrations(),
);
let (.., authz_instance) =
LookupPath::new(&opctx, &self.db_datastore)
.instance_id(instance_id.into_untyped_uuid())
.lookup_for(authz::Action::Modify)
.await?;
let saga_params = sagas::instance_update::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
authz_instance,
};
let sagas = self.sagas.clone();
let log = opctx.log.clone();
tokio::spawn(async move {
sagas
.saga_execute::<sagas::instance_update::SagaInstanceUpdate>(
saga_params,
)
.await
if let Err(error) = sagas.saga_start(saga).await {
warn!(&log, "update saga for {instance_id} failed!";
"instance_id" => %instance_id,
"error" => %error,
);
}
});
}

Ok(())
}

Expand Down Expand Up @@ -1890,14 +1881,13 @@ impl super::Nexus {
}

/// Invoked by a sled agent to publish an updated runtime state for an
/// Instance.
/// Instance, returning an update saga for that instance.
pub(crate) async fn notify_instance_updated(
datastore: &DataStore,
sagas: &dyn StartSaga,
opctx: &OpContext,
instance_id: InstanceUuid,
new_runtime_state: nexus::SledInstanceState,
) -> Result<bool, Error> {
new_runtime_state: &nexus::SledInstanceState,
) -> Result<Option<steno::SagaDag>, Error> {
use sagas::instance_update;

let migrations = new_runtime_state.migrations();
Expand All @@ -1917,8 +1907,11 @@ pub(crate) async fn notify_instance_updated(
migrations,
)
.await?;
let updated = vmm_updated || migration_updated.unwrap_or(false);

// If the instance or VMM records in the database have changed as a result
// of this update, prepare an `instance-update` saga to ensure that the
// changes are reflected by the instance record.
let updated = vmm_updated || migration_updated.unwrap_or(false);
if updated {
let (.., authz_instance) = LookupPath::new(&opctx, datastore)
.instance_id(instance_id.into_untyped_uuid())
Expand All @@ -1930,10 +1923,10 @@ pub(crate) async fn notify_instance_updated(
authz_instance,
},
)?;
sagas.saga_start(saga).await?;
Ok(Some(saga))
} else {
Ok(None)
}

Ok(updated)
}

#[cfg(test)]
Expand Down
9 changes: 6 additions & 3 deletions nexus/src/app/sagas/instance_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ async fn sim_instance_migrate(

#[cfg(test)]
mod tests {
use crate::app::db::datastore::InstanceAndActiveVmm;
use crate::app::sagas::test_helpers;
use camino::Utf8Path;
use dropshot::test_util::ClientTestContext;
Expand All @@ -622,6 +623,8 @@ mod tests {
ByteCount, IdentityMetadataCreateParams, InstanceCpuCount,
};
use omicron_sled_agent::sim::Server;
use omicron_test_utils::dev::poll;
use std::time::Duration;

use super::*;

Expand Down Expand Up @@ -859,9 +862,9 @@ mod tests {
cptestctx,
instance_id,
)
.await.instance().clone();
if new_state.runtime().nexus_state == nexus_db_model::InstanceState::Vmm {
Err(poll::CondCheckError::<nexus_db_model::Instance>::NotYet)
.await;
if new_state.instance().runtime().nexus_state == nexus_db_model::InstanceState::Vmm {
Err(poll::CondCheckError::<InstanceAndActiveVmm>::NotYet)
} else {
Ok(new_state)
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/internal_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn cpapi_instances_put(
nexus
.notify_instance_updated(
&opctx,
&InstanceUuid::from_untyped_uuid(path.instance_id),
InstanceUuid::from_untyped_uuid(path.instance_id),
&new_state,
)
.await?;
Expand Down

0 comments on commit 0f986e1

Please sign in to comment.