Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better abstract Nexus saga execution subsystem #5968

Merged
merged 5 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions nexus/db-queries/src/db/sec_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ impl steno::SecStore for CockroachDbSecStore {
// This is an internal service query to CockroachDB.
backoff::retry_policy_internal_service(),
|| {
// An interesting question is how to handle errors.
//
// In general, there are some kinds of database errors that are
// temporary/server errors (e.g. network failures), and some
// that are permanent/client errors (e.g. conflict during
Expand All @@ -85,10 +83,9 @@ impl steno::SecStore for CockroachDbSecStore {
// errors that likely require operator intervention.)
//
// At a higher level, callers should plan for the fact that
// record_event could potentially loop forever. See
// https://github.com/oxidecomputer/omicron/issues/5406 and the
// note in `nexus/src/app/saga.rs`'s `execute_saga` for more
// details.
// record_event (and, so, saga execution) could potentially loop
// indefinitely while the datastore (or other dependent
// services) are down.
self.datastore
.saga_create_event(&our_event)
.map_err(backoff::BackoffError::transient)
Expand Down
13 changes: 7 additions & 6 deletions nexus/src/app/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ impl super::Nexus {
create_params: params.clone(),
};
let saga_outputs = self
.execute_saga::<sagas::disk_create::SagaDiskCreate>(saga_params)
.sagas
.saga_execute::<sagas::disk_create::SagaDiskCreate>(saga_params)
.await?;
let disk_created = saga_outputs
.lookup_node_output::<db::model::Disk>("created_disk")
Expand Down Expand Up @@ -342,7 +343,8 @@ impl super::Nexus {
disk_id: authz_disk.id(),
volume_id: db_disk.volume_id,
};
self.execute_saga::<sagas::disk_delete::SagaDiskDelete>(saga_params)
self.sagas
.saga_execute::<sagas::disk_delete::SagaDiskDelete>(saga_params)
.await?;
Ok(())
}
Expand Down Expand Up @@ -585,10 +587,9 @@ impl super::Nexus {
snapshot_name: finalize_params.snapshot_name.clone(),
};

self.execute_saga::<sagas::finalize_disk::SagaFinalizeDisk>(
saga_params,
)
.await?;
self.sagas
.saga_execute::<sagas::finalize_disk::SagaFinalizeDisk>(saga_params)
.await?;

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ impl super::Nexus {
image_param,
};

self.execute_saga::<sagas::image_delete::SagaImageDelete>(saga_params)
self.sagas
.saga_execute::<sagas::image_delete::SagaImageDelete>(saga_params)
.await?;

Ok(())
Expand Down
36 changes: 21 additions & 15 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ impl super::Nexus {
};

let saga_outputs = self
.execute_saga::<sagas::instance_create::SagaInstanceCreate>(
.sagas
.saga_execute::<sagas::instance_create::SagaInstanceCreate>(
saga_params,
)
.await?;
Expand Down Expand Up @@ -462,10 +463,11 @@ impl super::Nexus {
instance,
boundary_switches,
};
self.execute_saga::<sagas::instance_delete::SagaInstanceDelete>(
saga_params,
)
.await?;
self.sagas
.saga_execute::<sagas::instance_delete::SagaInstanceDelete>(
saga_params,
)
.await?;
Ok(())
}

Expand Down Expand Up @@ -510,10 +512,11 @@ impl super::Nexus {
src_vmm: vmm.clone(),
migrate_params: params,
};
self.execute_saga::<sagas::instance_migrate::SagaInstanceMigrate>(
saga_params,
)
.await?;
self.sagas
.saga_execute::<sagas::instance_migrate::SagaInstanceMigrate>(
saga_params,
)
.await?;

// TODO correctness TODO robustness TODO design
// Should we lookup the instance again here?
Expand Down Expand Up @@ -757,10 +760,11 @@ impl super::Nexus {
db_instance: instance.clone(),
};

self.execute_saga::<sagas::instance_start::SagaInstanceStart>(
saga_params,
)
.await?;
self.sagas
.saga_execute::<sagas::instance_start::SagaInstanceStart>(
saga_params,
)
.await?;

self.db_datastore.instance_fetch_with_vmm(opctx, &authz_instance).await
}
Expand Down Expand Up @@ -1938,7 +1942,8 @@ impl super::Nexus {
};

let saga_outputs = self
.execute_saga::<sagas::instance_ip_attach::SagaInstanceIpAttach>(
.sagas
.saga_execute::<sagas::instance_ip_attach::SagaInstanceIpAttach>(
saga_params,
)
.await?;
Expand Down Expand Up @@ -1967,7 +1972,8 @@ impl super::Nexus {
};

let saga_outputs = self
.execute_saga::<sagas::instance_ip_detach::SagaInstanceIpDetach>(
.sagas
.saga_execute::<sagas::instance_ip_detach::SagaInstanceIpDetach>(
saga_params,
)
.await?;
Expand Down
26 changes: 20 additions & 6 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! Nexus, the service that operates much of the control plane in an Oxide fleet

use self::external_endpoints::NexusCertResolver;
use self::saga::SagaExecutor;
use crate::app::oximeter::LazyTimeseriesClient;
use crate::app::sagas::SagaRequest;
use crate::populate::populate_start;
Expand Down Expand Up @@ -132,7 +133,7 @@ pub struct Nexus {
authz: Arc<authz::Authz>,

/// saga execution coordinator
sec_client: Arc<steno::SecClient>,
sagas: SagaExecutor,

/// Task representing completion of recovered Sagas
recovery_task: std::sync::Mutex<Option<db::RecoveryTask>>,
Expand Down Expand Up @@ -238,6 +239,7 @@ impl Nexus {
Arc::clone(&db_datastore),
log.new(o!("component" => "SecStore")),
)) as Arc<dyn steno::SecStore>;

let sec_client = Arc::new(steno::sec(
log.new(o!(
"component" => "SEC",
Expand All @@ -246,6 +248,11 @@ impl Nexus {
sec_store,
));

let sagas = SagaExecutor::new(
Arc::clone(&sec_client),
log.new(o!("component" => "SagaExecutor")),
);

let client_state = dpd_client::ClientState {
tag: String::from("nexus"),
log: log.new(o!(
Expand Down Expand Up @@ -425,7 +432,7 @@ impl Nexus {
log: log.new(o!()),
db_datastore: Arc::clone(&db_datastore),
authz: Arc::clone(&authz),
sec_client: Arc::clone(&sec_client),
sagas,
recovery_task: std::sync::Mutex::new(None),
external_server: std::sync::Mutex::new(None),
techport_external_server: std::sync::Mutex::new(None),
Expand Down Expand Up @@ -467,6 +474,7 @@ impl Nexus {

// TODO-cleanup all the extra Arcs here seems wrong
let nexus = Arc::new(nexus);
nexus.sagas.set_nexus(nexus.clone());
let opctx = OpContext::for_background(
log.new(o!("component" => "SagaRecoverer")),
Arc::clone(&authz),
Expand All @@ -480,7 +488,6 @@ impl Nexus {
Arc::new(Arc::new(SagaContext::new(
Arc::clone(&nexus),
saga_logger,
Arc::clone(&authz),
))),
db_datastore,
Arc::clone(&sec_client),
Expand Down Expand Up @@ -554,6 +561,10 @@ impl Nexus {
&self.tunables
}

pub fn authz(&self) -> &Arc<authz::Authz> {
&self.authz
}

pub(crate) async fn wait_for_populate(&self) -> Result<(), anyhow::Error> {
let mut my_rx = self.populate_status.clone();
loop {
Expand Down Expand Up @@ -934,7 +945,8 @@ impl Nexus {
let nexus = self.clone();
tokio::spawn(async move {
let saga_result = nexus
.execute_saga::<sagas::region_replacement_start::SagaRegionReplacementStart>(
.sagas
.saga_execute::<sagas::region_replacement_start::SagaRegionReplacementStart>(
params,
)
.await;
Expand All @@ -958,7 +970,8 @@ impl Nexus {
let nexus = self.clone();
tokio::spawn(async move {
let saga_result = nexus
.execute_saga::<sagas::region_replacement_drive::SagaRegionReplacementDrive>(
.sagas
.saga_execute::<sagas::region_replacement_drive::SagaRegionReplacementDrive>(
params,
)
.await;
Expand All @@ -982,7 +995,8 @@ impl Nexus {
let nexus = self.clone();
tokio::spawn(async move {
let saga_result = nexus
.execute_saga::<sagas::region_replacement_finish::SagaRegionReplacementFinish>(
.sagas
.saga_execute::<sagas::region_replacement_finish::SagaRegionReplacementFinish>(
params,
)
.await;
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ impl super::Nexus {
authz_silo,
};
let saga_outputs = self
.execute_saga::<sagas::project_create::SagaProjectCreate>(
.sagas
.saga_execute::<sagas::project_create::SagaProjectCreate>(
saga_params,
)
.await?;
Expand Down
6 changes: 3 additions & 3 deletions nexus/src/app/rack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ impl super::Nexus {

// TODO
// configure rack networking / boundary services here
// Currently calling some of the apis directly, but should we be using sagas
// going forward via self.run_saga()? Note that self.create_runnable_saga and
// self.execute_saga are currently not available within this scope.
// Currently calling some of the apis directly, but should we be using
// sagas going forward via self.sagas.saga_execute()? Note that
// this may not be available within this scope.
Comment on lines +363 to +365
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with @rcgoodfellow (who wrote this comment) and we weren't sure what this comment meant about these facilities not being available here. The higher level point is presumably still valid (that we should consider using a saga here) so I left the comment mostly alone.

info!(log, "Recording Rack Network Configuration");
let address_lot_name = Name::from_str(INFRA_LOT).map_err(|e| {
Error::internal_error(&format!(
Expand Down
Loading
Loading