From b30980907c8e3e4b275e28f2a5bddde3510f7f0e Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Fri, 2 Aug 2024 11:58:52 -0700 Subject: [PATCH 1/9] abandoned attempt to do this with limits --- nexus/db-queries/src/db/datastore/saga.rs | 117 ++++++++++++++++++ nexus/reconfigurator/execution/src/lib.rs | 38 +++++- nexus/reconfigurator/execution/src/sagas.rs | 59 +++++++++ .../background/tasks/blueprint_execution.rs | 2 + 4 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 nexus/reconfigurator/execution/src/sagas.rs diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 939929e665..bf274f2456 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -13,14 +13,34 @@ use crate::db::model::Generation; use crate::db::pagination::paginated; use crate::db::pagination::paginated_multicolumn; use crate::db::pagination::Paginator; +use crate::db::pool::DbConnection; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; +use nexus_auth::authz; use nexus_auth::context::OpContext; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; +use std::ops::Add; + +/// Reports the result of `sagas_reassign_sec_batched()`. +/// +/// Callers need to know two things: +/// +/// 1. Whether any sagas may have been re-assigned +/// (because they'll want to kick off saga recovery for them) +/// 2. Whether any errors were encountered +#[derive(Debug)] +pub(crate) enum Reassigned { + /// We successfully re-assigned all sagas that needed it + All { count: u32 }, + /// We encountered an error and cannot tell how many sagas were re-assigned. + /// It was at least this many. (The count can be zero, but it's still + /// possible that some were re-assigned.) + AtLeast { count: u32, error: Error }, +} impl DataStore { pub async fn saga_create( @@ -207,6 +227,93 @@ impl DataStore { Ok(events) } + + pub async fn sagas_reassign_sec_all_batched( + &self, + opctx: &OpContext, + nexus_zone_ids: &[db::saga_types::SecId], + new_sec_id: db::saga_types::SecId, + ) -> Reassigned { + let now = chrono::Utc::now(); + let conn = match self.pool_connection_authorized(opctx).await { + Ok(c) => c, + Err(error) => return Reassigned::AtLeast { count: 0, error }, + }; + + let mut count = 0; + let limit: u32 = SQL_BATCH_SIZE.get(); + loop { + debug!(&opctx.log, "sagas_reassign_sec_batched"; + "count_so_far" => count); + match self + .sagas_reassign_sec( + &conn, + opctx, + nexus_zone_ids, + new_sec_id, + limit, + now, + ) + .await + { + Ok(c) => { + count += c; + if c < limit { + break; + } + } + Err(error) => { + return Reassigned::AtLeast { count, error }; + } + } + } + + Reassigned::All { count } + } + + // XXX-dap TODO-doc + async fn sagas_reassign_sec( + &self, + conn: &async_bb8_diesel::Connection, + opctx: &OpContext, + nexus_zone_ids: &[db::saga_types::SecId], + new_sec_id: db::saga_types::SecId, + limit: u32, + time: chrono::DateTime, + ) -> Result { + use db::schema::saga::dsl; + + opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + + diesel::update( + dsl::saga + .filter(dsl::current_sec.is_not_null()) + .filter(dsl::current_sec.eq_any( + nexus_zone_ids.into_iter().cloned().collect::>(), + )) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))) + .limit(i64::from(limit)), + ) + .set(( + dsl::current_sec.eq(Some(new_sec_id)), + dsl::adopt_generation.eq(dsl::adopt_generation.add(1)), + dsl::adopt_time.eq(time), + )) + .execute_async(conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .and_then(|c_usize| { + u32::try_from(c_usize).map_err(|_| { + Error::internal_error(&format!( + "database reported unexpected count of \ + records updated (did not fit into u32): {}", + c_usize, + )) + }) + }) + } } #[cfg(test)] @@ -510,3 +617,13 @@ mod test { } } } + +// XXX-dap TODO-coverage want test that inserts: +// - some sagas assigned to this SEC that are done +// - more than BATCH_SIZE sagas assigned to this SEC that are unwinding +// - more than BATCH_SIZE sagas assigned to this SEC that are running +// - some sagas assigned to another SEC that are running +// +// then two tests: +// 1. run the one-batch thing and make sure we only got at most the batch size +// 2. run the whole thing and make sure that we got exactly the right ones diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index e3d2019230..d41e35f0a4 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -24,6 +24,7 @@ use slog::info; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::net::SocketAddrV6; +use uuid::Uuid; mod cockroachdb; mod datasets; @@ -32,6 +33,7 @@ mod external_networking; mod omicron_physical_disks; mod omicron_zones; mod overridables; +mod sagas; mod sled_state; pub use dns::blueprint_external_dns_config; @@ -80,6 +82,7 @@ pub async fn realize_blueprint( resolver: &Resolver, blueprint: &Blueprint, nexus_label: S, + nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? ) -> Result<(), Vec> where String: From, @@ -90,6 +93,7 @@ where resolver, blueprint, nexus_label, + nexus_id, &Default::default(), ) .await @@ -101,6 +105,7 @@ pub async fn realize_blueprint_with_overrides( resolver: &Resolver, blueprint: &Blueprint, nexus_label: S, + nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? overrides: &Overridables, ) -> Result<(), Vec> where @@ -233,14 +238,39 @@ where omicron_physical_disks::decommission_expunged_disks(&opctx, datastore) .await?; + // From this point on, we'll assume that any errors that we encounter do + // *not* require stopping execution. We'll just accumulate them and return + // them all at the end. + // + // TODO We should probably do this with more of the errors above, too. + let mut errors = Vec::new(); + + // For any expunged Nexus zones, re-assign in-progress sagas to some other + // Nexus. If this fails for some reason, it doesn't affect anything else. + let sec_id = nexus_db_model::SecId(nexus_id); + if let Err(error) = sagas::reassign_sagas_from_expunged( + &opctx, datastore, blueprint, sec_id, + ) + .await + .context("failed to re-assign sagas") + { + errors.push(error); + } + // This is likely to error if any cluster upgrades are in progress (which // can take some time), so it should remain at the end so that other parts // of the blueprint can progress normally. - cockroachdb::ensure_settings(&opctx, datastore, blueprint) - .await - .map_err(|err| vec![err])?; + if let Err(error) = + cockroachdb::ensure_settings(&opctx, datastore, blueprint).await + { + errors.push(error); + } - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } } #[cfg(test)] diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs new file mode 100644 index 0000000000..1c15413d40 --- /dev/null +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -0,0 +1,59 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Re-assign sagas from expunged Nexus zones + +use nexus_db_model::SecId; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::Blueprint; +use nexus_types::deployment::BlueprintZoneFilter; +use omicron_common::api::external::Error; +use slog::info; +use std::num::NonZeroU32; + +/// Reports what happened when we tried to re-assign sagas +/// +/// Callers need to know two things: +/// +/// 1. Whether any sagas may have been re-assigned +/// (because they'll want to kick off saga recovery for them) +/// 2. Whether any errors were encountered +#[derive(Debug)] +pub(crate) enum Reassigned { + /// We successfully re-assigned all sagas that needed it + All { count: u32 }, + /// We encountered an error and cannot tell how many sagas were re-assigned. + /// It was at least this many. (The count can be zero, but it's still + /// possible that some were re-assigned.) + AtLeast { count: u32, error: Error }, +} + +/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the +/// specified nexus (`nexus_id`). +// TODO-dap activate recovery background task +pub(crate) async fn reassign_sagas_from_expunged( + opctx: &OpContext, + datastore: &DataStore, + blueprint: &Blueprint, + nexus_id: SecId, +) -> Reassigned { + let log = &opctx.log; + + // Identify any Nexus zones that have been expunged and need to have sagas + // re-assigned. + let nexus_zones_ids: Vec<_> = blueprint + .all_omicron_zones(BlueprintZoneFilter::Expunged) + .filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id)) + .collect(); + + debug!(log, "re-assign sagas: found Nexus instances"; + "nexus_zones_ids" => ?nexus_zones_ids); + + + let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id); + info!(log, "re-assign sagas"; + "nexus_zones_ids" => ?nexus_zones_ids); + +} diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index 460d74360d..fc8673839f 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -76,12 +76,14 @@ impl BlueprintExecutor { }); } + let nexus_id = todo!(); // XXX-dap let result = nexus_reconfigurator_execution::realize_blueprint( opctx, &self.datastore, &self.resolver, blueprint, &self.nexus_label, + nexus_id, ) .await; From 2692ba8f3da9f8f1f8f7b8d7f35cd46523b1411b Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Fri, 2 Aug 2024 12:17:25 -0700 Subject: [PATCH 2/9] WIP: lower parts --- nexus/db-queries/src/db/datastore/saga.rs | 94 +++---------------- .../execution/src/cockroachdb.rs | 3 +- nexus/reconfigurator/execution/src/dns.rs | 10 +- nexus/reconfigurator/execution/src/lib.rs | 39 ++++---- nexus/reconfigurator/execution/src/sagas.rs | 62 ++++++------ nexus/src/app/background/init.rs | 3 +- .../background/tasks/blueprint_execution.rs | 36 +++++-- 7 files changed, 97 insertions(+), 150 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index bf274f2456..1927bd3783 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -13,7 +13,6 @@ use crate::db::model::Generation; use crate::db::pagination::paginated; use crate::db::pagination::paginated_multicolumn; use crate::db::pagination::Paginator; -use crate::db::pool::DbConnection; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; @@ -25,23 +24,6 @@ use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; use std::ops::Add; -/// Reports the result of `sagas_reassign_sec_batched()`. -/// -/// Callers need to know two things: -/// -/// 1. Whether any sagas may have been re-assigned -/// (because they'll want to kick off saga recovery for them) -/// 2. Whether any errors were encountered -#[derive(Debug)] -pub(crate) enum Reassigned { - /// We successfully re-assigned all sagas that needed it - All { count: u32 }, - /// We encountered an error and cannot tell how many sagas were re-assigned. - /// It was at least this many. (The count can be zero, but it's still - /// possible that some were re-assigned.) - AtLeast { count: u32, error: Error }, -} - impl DataStore { pub async fn saga_create( &self, @@ -228,63 +210,23 @@ impl DataStore { Ok(events) } - pub async fn sagas_reassign_sec_all_batched( - &self, - opctx: &OpContext, - nexus_zone_ids: &[db::saga_types::SecId], - new_sec_id: db::saga_types::SecId, - ) -> Reassigned { - let now = chrono::Utc::now(); - let conn = match self.pool_connection_authorized(opctx).await { - Ok(c) => c, - Err(error) => return Reassigned::AtLeast { count: 0, error }, - }; - - let mut count = 0; - let limit: u32 = SQL_BATCH_SIZE.get(); - loop { - debug!(&opctx.log, "sagas_reassign_sec_batched"; - "count_so_far" => count); - match self - .sagas_reassign_sec( - &conn, - opctx, - nexus_zone_ids, - new_sec_id, - limit, - now, - ) - .await - { - Ok(c) => { - count += c; - if c < limit { - break; - } - } - Err(error) => { - return Reassigned::AtLeast { count, error }; - } - } - } - - Reassigned::All { count } - } - // XXX-dap TODO-doc - async fn sagas_reassign_sec( + pub async fn sagas_reassign_sec( &self, - conn: &async_bb8_diesel::Connection, opctx: &OpContext, nexus_zone_ids: &[db::saga_types::SecId], new_sec_id: db::saga_types::SecId, - limit: u32, - time: chrono::DateTime, - ) -> Result { - use db::schema::saga::dsl; - + ) -> Result { opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + let now = chrono::Utc::now(); + let conn = self.pool_connection_authorized(opctx).await?; + + // It would be more robust to do this in batches. However, Diesel does + // not appear to support the UPDATE ... LIMIT syntax using the normal + // builder. In practice, it's extremely unlikely we'd have so many + // in-progress sagas that this would be a problem. + use db::schema::saga::dsl; diesel::update( dsl::saga .filter(dsl::current_sec.is_not_null()) @@ -293,26 +235,16 @@ impl DataStore { )) .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( steno::SagaCachedState::Done, - ))) - .limit(i64::from(limit)), + ))), ) .set(( dsl::current_sec.eq(Some(new_sec_id)), dsl::adopt_generation.eq(dsl::adopt_generation.add(1)), - dsl::adopt_time.eq(time), + dsl::adopt_time.eq(now), )) - .execute_async(conn) + .execute_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) - .and_then(|c_usize| { - u32::try_from(c_usize).map_err(|_| { - Error::internal_error(&format!( - "database reported unexpected count of \ - records updated (did not fit into u32): {}", - c_usize, - )) - }) - }) } } diff --git a/nexus/reconfigurator/execution/src/cockroachdb.rs b/nexus/reconfigurator/execution/src/cockroachdb.rs index 498944598d..277f5f91c4 100644 --- a/nexus/reconfigurator/execution/src/cockroachdb.rs +++ b/nexus/reconfigurator/execution/src/cockroachdb.rs @@ -39,6 +39,7 @@ mod test { use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::CockroachDbClusterVersion; use std::sync::Arc; + use uuid::Uuid; type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; @@ -101,7 +102,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await diff --git a/nexus/reconfigurator/execution/src/dns.rs b/nexus/reconfigurator/execution/src/dns.rs index 3504d41e4d..8d840f918f 100644 --- a/nexus/reconfigurator/execution/src/dns.rs +++ b/nexus/reconfigurator/execution/src/dns.rs @@ -1194,7 +1194,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1332,7 +1332,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1406,7 +1406,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1442,7 +1442,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1536,7 +1536,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index d41e35f0a4..ffd54cc3ab 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -76,41 +76,32 @@ impl From for Sled { /// /// The assumption is that callers are running this periodically or in a loop to /// deal with transient errors or changes in the underlying system state. -pub async fn realize_blueprint( +pub async fn realize_blueprint( opctx: &OpContext, datastore: &DataStore, resolver: &Resolver, blueprint: &Blueprint, - nexus_label: S, - nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? -) -> Result<(), Vec> -where - String: From, -{ + nexus_id: Uuid, +) -> Result> { realize_blueprint_with_overrides( opctx, datastore, resolver, blueprint, - nexus_label, nexus_id, &Default::default(), ) .await } -pub async fn realize_blueprint_with_overrides( +pub async fn realize_blueprint_with_overrides( opctx: &OpContext, datastore: &DataStore, resolver: &Resolver, blueprint: &Blueprint, - nexus_label: S, - nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? + nexus_id: Uuid, overrides: &Overridables, -) -> Result<(), Vec> -where - String: From, -{ +) -> Result> { let opctx = opctx.child(BTreeMap::from([( "comment".to_string(), blueprint.comment.clone(), @@ -205,7 +196,7 @@ where dns::deploy_dns( &opctx, datastore, - String::from(nexus_label), + nexus_id.to_string(), blueprint, &sleds_by_id, overrides, @@ -248,14 +239,18 @@ where // For any expunged Nexus zones, re-assign in-progress sagas to some other // Nexus. If this fails for some reason, it doesn't affect anything else. let sec_id = nexus_db_model::SecId(nexus_id); - if let Err(error) = sagas::reassign_sagas_from_expunged( + let reassigned = sagas::reassign_sagas_from_expunged( &opctx, datastore, blueprint, sec_id, ) .await - .context("failed to re-assign sagas") - { - errors.push(error); - } + .context("failed to re-assign sagas"); + let needs_saga_recovery = match reassigned { + Ok(needs_recovery) => needs_recovery, + Err(error) => { + errors.push(error); + false + } + }; // This is likely to error if any cluster upgrades are in progress (which // can take some time), so it should remain at the end so that other parts @@ -267,7 +262,7 @@ where } if errors.is_empty() { - Ok(()) + Ok(needs_saga_recovery) } else { Err(errors) } diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs index 1c15413d40..58d9b95c0a 100644 --- a/nexus/reconfigurator/execution/src/sagas.rs +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -10,50 +10,52 @@ use nexus_db_queries::db::DataStore; use nexus_types::deployment::Blueprint; use nexus_types::deployment::BlueprintZoneFilter; use omicron_common::api::external::Error; -use slog::info; -use std::num::NonZeroU32; - -/// Reports what happened when we tried to re-assign sagas -/// -/// Callers need to know two things: -/// -/// 1. Whether any sagas may have been re-assigned -/// (because they'll want to kick off saga recovery for them) -/// 2. Whether any errors were encountered -#[derive(Debug)] -pub(crate) enum Reassigned { - /// We successfully re-assigned all sagas that needed it - All { count: u32 }, - /// We encountered an error and cannot tell how many sagas were re-assigned. - /// It was at least this many. (The count can be zero, but it's still - /// possible that some were re-assigned.) - AtLeast { count: u32, error: Error }, -} +use omicron_uuid_kinds::GenericUuid; +use slog::{debug, info, warn}; /// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the /// specified nexus (`nexus_id`). -// TODO-dap activate recovery background task pub(crate) async fn reassign_sagas_from_expunged( opctx: &OpContext, datastore: &DataStore, blueprint: &Blueprint, nexus_id: SecId, -) -> Reassigned { +) -> Result { let log = &opctx.log; // Identify any Nexus zones that have been expunged and need to have sagas // re-assigned. - let nexus_zones_ids: Vec<_> = blueprint + let nexus_zone_ids: Vec<_> = blueprint .all_omicron_zones(BlueprintZoneFilter::Expunged) - .filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id)) + .filter_map(|(_, z)| { + z.zone_type + .is_nexus() + .then(|| nexus_db_model::SecId(z.id.into_untyped_uuid())) + }) .collect(); debug!(log, "re-assign sagas: found Nexus instances"; - "nexus_zones_ids" => ?nexus_zones_ids); - - - let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id); - info!(log, "re-assign sagas"; - "nexus_zones_ids" => ?nexus_zones_ids); - + "nexus_zone_ids" => ?nexus_zone_ids); + + let result = + datastore.sagas_reassign_sec(opctx, &nexus_zone_ids, nexus_id).await; + + match result { + Ok(count) => { + info!(log, "re-assigned sagas"; + "nexus_zone_ids" => ?nexus_zone_ids, + "count" => count, + ); + + Ok(count != 0) + } + Err(error) => { + warn!(log, "failed to re-assign sagas"; + "nexus_zone_ids" => ?nexus_zone_ids, + &error, + ); + + Err(error) + } + } } diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 2f1c4cd738..bc36d89a14 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -435,7 +435,8 @@ impl BackgroundTasksInitializer { datastore.clone(), resolver.clone(), rx_blueprint.clone(), - nexus_id.to_string(), + nexus_id, + task_saga_recovery.clone(), ); let rx_blueprint_exec = blueprint_executor.watcher(); driver.register(TaskDefinition { diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index fc8673839f..2572cf25a1 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -4,7 +4,7 @@ //! Background task for realizing a plan blueprint -use crate::app::background::BackgroundTask; +use crate::app::background::{Activator, BackgroundTask}; use futures::future::BoxFuture; use futures::FutureExt; use internal_dns::resolver::Resolver; @@ -14,6 +14,7 @@ use nexus_types::deployment::{Blueprint, BlueprintTarget}; use serde_json::json; use std::sync::Arc; use tokio::sync::watch; +use uuid::Uuid; /// Background task that takes a [`Blueprint`] and realizes the change to /// the state of the system based on the `Blueprint`. @@ -21,8 +22,9 @@ pub struct BlueprintExecutor { datastore: Arc, resolver: Resolver, rx_blueprint: watch::Receiver>>, - nexus_label: String, + nexus_id: Uuid, tx: watch::Sender, + saga_recovery: Activator, } impl BlueprintExecutor { @@ -32,10 +34,18 @@ impl BlueprintExecutor { rx_blueprint: watch::Receiver< Option>, >, - nexus_label: String, + nexus_id: Uuid, + saga_recovery: Activator, ) -> BlueprintExecutor { let (tx, _) = watch::channel(0); - BlueprintExecutor { datastore, resolver, rx_blueprint, nexus_label, tx } + BlueprintExecutor { + datastore, + resolver, + rx_blueprint, + nexus_id, + tx, + saga_recovery, + } } pub fn watcher(&self) -> watch::Receiver { @@ -76,23 +86,28 @@ impl BlueprintExecutor { }); } - let nexus_id = todo!(); // XXX-dap let result = nexus_reconfigurator_execution::realize_blueprint( opctx, &self.datastore, &self.resolver, blueprint, - &self.nexus_label, - nexus_id, + self.nexus_id, ) .await; // Trigger anybody waiting for this to finish. self.tx.send_modify(|count| *count = *count + 1); + // If executing the blueprint requires activating the saga recovery + // background task, do that now. + info!(&opctx.log, "activating saga recovery task"); + if let Ok(true) = result { + self.saga_recovery.activate(); + } + // Return the result as a `serde_json::Value` match result { - Ok(()) => json!({}), + Ok(_) => json!({}), Err(errors) => { let errors: Vec<_> = errors.into_iter().map(|e| format!("{:#}", e)).collect(); @@ -117,7 +132,7 @@ impl BackgroundTask for BlueprintExecutor { #[cfg(test)] mod test { use super::BlueprintExecutor; - use crate::app::background::BackgroundTask; + use crate::app::background::{Activator, BackgroundTask}; use httptest::matchers::{all_of, request}; use httptest::responders::status_code; use httptest::Expectation; @@ -242,7 +257,8 @@ mod test { datastore.clone(), resolver.clone(), blueprint_rx, - String::from("test-suite"), + Uuid::new_v4(), + Activator::new(), ); // Now we're ready. From 90d84096fce3018bcf657f9618cf57c876ac6eaa Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Tue, 13 Aug 2024 17:34:14 -0700 Subject: [PATCH 3/9] transferring sagas does not actually work --- nexus/db-queries/src/db/datastore/saga.rs | 36 ++++++++--------------- nexus/db-queries/src/db/sec_store.rs | 9 ++---- 2 files changed, 14 insertions(+), 31 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 1927bd3783..7a38129254 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -9,7 +9,6 @@ use super::SQL_BATCH_SIZE; use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; -use crate::db::model::Generation; use crate::db::pagination::paginated; use crate::db::pagination::paginated_multicolumn; use crate::db::pagination::Paginator; @@ -82,21 +81,15 @@ impl DataStore { /// now, we're implementing saga adoption only in cases where the original /// SEC/Nexus has been expunged.) /// - /// However, in the future, it may be possible for multiple SECs to try and - /// update the same saga, and overwrite each other's state. For example, - /// one SEC might try and update the state to Running while the other one - /// updates it to Done. That case would have to be carefully considered and - /// tested here, probably using the (currently unused) - /// `current_adopt_generation` field to enable optimistic concurrency. - /// - /// To reiterate, we are *not* considering the case where several SECs try - /// to update the same saga. That will be a future enhancement. + /// It's conceivable that multiple SECs do try to udpate the same saga + /// concurrently. That would be a bug. This is noticed and prevented by + /// making this query conditional on current_sec and failing with a conflict + /// if the current SEC has changed. pub async fn saga_update_state( &self, saga_id: steno::SagaId, new_state: steno::SagaCachedState, current_sec: db::saga_types::SecId, - current_adopt_generation: Generation, ) -> Result<(), Error> { use db::schema::saga::dsl; @@ -104,7 +97,6 @@ impl DataStore { let result = diesel::update(dsl::saga) .filter(dsl::id.eq(saga_id)) .filter(dsl::current_sec.eq(current_sec)) - .filter(dsl::adopt_generation.eq(current_adopt_generation)) .set(dsl::saga_state.eq(db::saga_types::SagaCachedState(new_state))) .check_if_exists::(saga_id) .execute_and_check(&*self.pool_connection_unauthorized().await?) @@ -121,20 +113,19 @@ impl DataStore { match result.status { UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => Err(Error::invalid_request( - format!( - "failed to update saga {:?} with state {:?}: preconditions not met: \ - expected current_sec = {:?}, adopt_generation = {:?}, \ - but found current_sec = {:?}, adopt_generation = {:?}, state = {:?}", + UpdateStatus::NotUpdatedButExists => { + Err(Error::invalid_request(format!( + "failed to update saga {:?} with state {:?}:\ + preconditions not met: \ + expected current_sec = {:?}, \ + but found current_sec = {:?}, state = {:?}", saga_id, new_state, current_sec, - current_adopt_generation, result.found.current_sec, - result.found.adopt_generation, result.found.saga_state, - ) - )), + ))) + } } } @@ -479,7 +470,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Running, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Running again"); @@ -490,7 +480,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Done, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Done"); @@ -502,7 +491,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Done, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Done again"); diff --git a/nexus/db-queries/src/db/sec_store.rs b/nexus/db-queries/src/db/sec_store.rs index 0dcc3aa717..920ff3aee1 100644 --- a/nexus/db-queries/src/db/sec_store.rs +++ b/nexus/db-queries/src/db/sec_store.rs @@ -4,7 +4,7 @@ //! Implementation of [`steno::SecStore`] backed by Omicron's database -use crate::db::{self, model::Generation}; +use crate::db; use anyhow::Context; use async_trait::async_trait; use dropshot::HttpError; @@ -102,12 +102,7 @@ impl steno::SecStore for CockroachDbSecStore { &log, || { self.datastore - .saga_update_state( - id, - update, - self.sec_id, - Generation::new(), - ) + .saga_update_state(id, update, self.sec_id) .map_err(backoff::BackoffError::transient) }, "updating saga state", From 00649e617ea68f391532fd9671e662dcbdadd8e4 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Wed, 14 Aug 2024 10:45:43 -0700 Subject: [PATCH 4/9] add docs, test at datastore level --- nexus/db-queries/src/db/datastore/saga.rs | 184 ++++++++++++++++++++-- 1 file changed, 173 insertions(+), 11 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 7a38129254..53b505c0e5 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -201,7 +201,22 @@ impl DataStore { Ok(events) } - // XXX-dap TODO-doc + /// Updates all sagas that are currently assigned to any of the Nexus + /// instances in `nexus_zone_ids`, assigning them to `new_sec_id` instead. + /// + /// This change causes the Nexus instance `new_sec_id` to discover these + /// sagas and resume executing them the next time it performs saga recovery + /// (which is normally on startup and periodically). Generally, + /// `new_sec_id` is the _current_ Nexus instance and the caller should + /// activate the saga recovery background task after calling this function + /// to immediately resume the newly-assigned sagas. + /// + /// **Warning:** This operation is only safe if the Nexus instances + /// `nexus_zone_ids` are not currently running. If those Nexus instances + /// are still running, then two (or more) Nexus instances may wind up + /// running the same saga concurrently. This would likely violate implicit + /// assumptions made by various saga actions, leading to hard-to-debug + /// errors and state corruption. pub async fn sagas_reassign_sec( &self, opctx: &OpContext, @@ -243,10 +258,15 @@ impl DataStore { mod test { use super::*; use crate::db::datastore::test_utils::datastore_test; + use async_bb8_diesel::AsyncConnection; + use async_bb8_diesel::AsyncSimpleConnection; + use db::queries::ALLOW_FULL_TABLE_SCAN_SQL; use nexus_db_model::{SagaNodeEvent, SecId}; use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::Generation; use omicron_test_utils::dev; use rand::seq::SliceRandom; + use std::collections::BTreeSet; use uuid::Uuid; // Tests pagination in listing sagas that are candidates for recovery @@ -536,14 +556,156 @@ mod test { SagaNodeEvent::new(event, self.sec_id) } } -} -// XXX-dap TODO-coverage want test that inserts: -// - some sagas assigned to this SEC that are done -// - more than BATCH_SIZE sagas assigned to this SEC that are unwinding -// - more than BATCH_SIZE sagas assigned to this SEC that are running -// - some sagas assigned to another SEC that are running -// -// then two tests: -// 1. run the one-batch thing and make sure we only got at most the batch size -// 2. run the whole thing and make sure that we got exactly the right ones + #[tokio::test] + async fn test_saga_reassignment() { + // Test setup + let logctx = dev::test_setup_log("test_saga_reassignment"); + let mut db = test_setup_database(&logctx.log).await; + let (_, datastore) = datastore_test(&logctx, &db).await; + let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone()); + + // Populate the database with a few different sagas: + // + // - assigned to SEC A: done, running, and unwinding + // - assigned to SEC B: done, running, and unwinding + // - assigned to SEC C: done, running, and unwinding + // - assigned to SEC D: done, running, and unwinding + // + // Then we'll reassign SECs B's and C's sagas to SEC A and check exactly + // which sagas were changed by this. This exercises: + // - that we don't touch A's sagas (the one we're assigning *to*) + // - that we do touch both B's and C's sagas (the ones we're assigning + // *from*) + // - that we don't touch D's sagas (some other SEC) + // - that we don't touch any "done" sagas + // - that we do touch both running and unwinding sagas + let mut sagas_to_insert = Vec::new(); + let sec_a = SecId(Uuid::new_v4()); + let sec_b = SecId(Uuid::new_v4()); + let sec_c = SecId(Uuid::new_v4()); + let sec_d = SecId(Uuid::new_v4()); + + for sec_id in [sec_a, sec_b, sec_c, sec_d] { + for state in [ + steno::SagaCachedState::Running, + steno::SagaCachedState::Unwinding, + steno::SagaCachedState::Done, + ] { + let params = steno::SagaCreateParams { + id: steno::SagaId(Uuid::new_v4()), + name: steno::SagaName::new("tewst saga"), + dag: serde_json::value::Value::Null, + state, + }; + + sagas_to_insert + .push(db::model::saga_types::Saga::new(sec_id, params)); + } + } + println!("sagas to insert: {:?}", sagas_to_insert); + + // These two sets are complements, but we write out the conditions to + // double-check that we've got it right. + let sagas_affected: BTreeSet<_> = sagas_to_insert + .iter() + .filter_map(|saga| { + ((saga.creator == sec_b || saga.creator == sec_c) + && (saga.saga_state.0 == steno::SagaCachedState::Running + || saga.saga_state.0 + == steno::SagaCachedState::Unwinding)) + .then(|| saga.id) + }) + .collect(); + let sagas_unaffected: BTreeSet<_> = sagas_to_insert + .iter() + .filter_map(|saga| { + (saga.creator == sec_a + || saga.creator == sec_d + || saga.saga_state.0 == steno::SagaCachedState::Done) + .then(|| saga.id) + }) + .collect(); + println!("sagas affected: {:?}", sagas_affected); + println!("sagas UNaffected: {:?}", sagas_unaffected); + assert_eq!(sagas_affected.intersection(&sagas_unaffected).count(), 0); + assert_eq!( + sagas_affected.len() + sagas_unaffected.len(), + sagas_to_insert.len() + ); + + // Insert the sagas. + let count = { + use db::schema::saga::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(dsl::saga) + .values(sagas_to_insert) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .expect("successful insertion") + }; + assert_eq!(count, sagas_affected.len() + sagas_unaffected.len()); + + // Reassign uncompleted sagas from SECs B and C to SEC A. + let nreassigned = datastore + .sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) + .await + .expect("failed to re-assign sagas"); + + // Fetch all the sagas and check their states. + let all_sagas: Vec<_> = datastore + .pool_connection_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + use db::schema::saga::dsl; + conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?; + dsl::saga + .select(nexus_db_model::Saga::as_select()) + .load_async(&conn) + .await + }) + .await + .unwrap(); + + for saga in all_sagas { + println!("checking saga: {:?}", saga); + let current_sec = saga.current_sec.unwrap(); + if sagas_affected.contains(&saga.id) { + assert!(saga.creator == sec_b || saga.creator == sec_c); + assert_eq!(current_sec, sec_a); + assert_eq!(*saga.adopt_generation, Generation::from(2)); + assert!( + saga.saga_state.0 == steno::SagaCachedState::Running + || saga.saga_state.0 + == steno::SagaCachedState::Unwinding + ); + } else if sagas_unaffected.contains(&saga.id) { + assert_eq!(current_sec, saga.creator); + assert_eq!(*saga.adopt_generation, Generation::from(1)); + // Its SEC and state could be anything since we've deliberately + // included sagas with various states and SECs that should not + // be affected by the reassignment. + } else { + println!( + "ignoring saga that was not created by this test: {:?}", + saga + ); + } + } + + assert_eq!(nreassigned, sagas_affected.len()); + + // If we do it again, we should make no changes. + let nreassigned = datastore + .sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) + .await + .expect("failed to re-assign sagas"); + assert_eq!(nreassigned, 0); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} From 6a31f2ad9d75216694ed0c3a131465cadc54e170 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Wed, 14 Aug 2024 13:42:10 -0700 Subject: [PATCH 5/9] first cut at end-to-end test --- Cargo.lock | 1 + .../db-queries/src/db/datastore/deployment.rs | 4 + nexus/reconfigurator/execution/src/sagas.rs | 6 + .../background/tasks/blueprint_execution.rs | 329 +++++++++++++++++- nexus/test-utils/Cargo.toml | 1 + nexus/test-utils/src/lib.rs | 9 + nexus/tests/integration_tests/demo_saga.rs | 9 +- 7 files changed, 339 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74a7405e57..c5befde253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5178,6 +5178,7 @@ dependencies = [ "hyper 0.14.30", "illumos-utils", "internal-dns", + "nexus-client", "nexus-config", "nexus-db-queries", "nexus-sled-agent-shared", diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index d413f9507a..1e05bec6c1 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -53,6 +53,7 @@ use nexus_types::deployment::CockroachDbPreserveDowngrade; use nexus_types::external_api::views::SledState; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; +use omicron_common::api::external::InternalContext; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; @@ -863,6 +864,7 @@ impl DataStore { SelectFlavor::ForUpdate, ) .await + .internal_context("getting current target") .map_err(|e| err.bail(e))?; if current_target.target_id != blueprint.id { return Err(err.bail(Error::invalid_request(format!( @@ -897,6 +899,7 @@ impl DataStore { .map(|(_sled_id, zone)| zone), ) .await + .internal_context("ensure_zone_external_networking_deallocated") .map_err(|e| err.bail(e))?; self.ensure_zone_external_networking_allocated_on_connection( &conn, @@ -908,6 +911,7 @@ impl DataStore { .map(|(_sled_id, zone)| zone), ) .await + .internal_context("ensure_zone_external_networking_allocated") .map_err(|e| err.bail(e))?; // See the comment on this method; this lets us wait until our diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs index 58d9b95c0a..1bd7b2f1d0 100644 --- a/nexus/reconfigurator/execution/src/sagas.rs +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -59,3 +59,9 @@ pub(crate) async fn reassign_sagas_from_expunged( } } } + +// We do not have tests at this layer since it's so thin. The datastore +// operation (which is the main thing above) is tested separately. There's an +// integration test in Nexus that tests not only that we re-assigned any +// in-progress sagas but that the recovery process picked them up and completed +// them. diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index b430270ec9..7e95b01e84 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -107,7 +107,9 @@ impl BlueprintExecutor { // Return the result as a `serde_json::Value` match result { - Ok(_) => json!({}), + Ok(_) => json!({ + "target_id": blueprint.id.to_string(), + }), Err(errors) => { let errors: Vec<_> = errors.into_iter().map(|e| format!("{:#}", e)).collect(); @@ -133,35 +135,53 @@ impl BackgroundTask for BlueprintExecutor { mod test { use super::BlueprintExecutor; use crate::app::background::{Activator, BackgroundTask}; + use crate::app::saga::create_saga_dag; + use crate::app::sagas::demo; + use anyhow::Context; + use futures::TryStreamExt; use httptest::matchers::{all_of, request}; use httptest::responders::status_code; use httptest::Expectation; + use nexus_client::types::{BlueprintTargetSet, SagaState}; use nexus_db_model::{ ByteCount, SledBaseboard, SledSystemHardware, SledUpdate, Zpool, }; use nexus_db_queries::authn; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; + use nexus_reconfigurator_planning::blueprint_builder::BlueprintBuilder; use nexus_sled_agent_shared::inventory::OmicronZoneDataset; use nexus_test_utils_macros::nexus_test; - use nexus_types::deployment::BlueprintZoneFilter; use nexus_types::deployment::{ blueprint_zone_type, Blueprint, BlueprintPhysicalDisksConfig, BlueprintTarget, BlueprintZoneConfig, BlueprintZoneDisposition, BlueprintZoneType, BlueprintZonesConfig, CockroachDbPreserveDowngrade, + OmicronZoneExternalFloatingIp, SledDetails, SledDisk, SledResources, + }; + use nexus_types::deployment::{BlueprintZoneFilter, PlanningInputBuilder}; + use nexus_types::external_api::views::{ + PhysicalDiskPolicy, PhysicalDiskState, SledPolicy, SledProvisionPolicy, + SledState, }; - use nexus_types::external_api::views::SledState; - use omicron_common::api::external::Generation; + use omicron_common::address::{IpRange, Ipv4Range, Ipv6Subnet}; + use omicron_common::api::external::{Generation, MacAddr, Vni}; + use omicron_common::api::internal::shared::{ + NetworkInterface, NetworkInterfaceKind, + }; + use omicron_common::disk::DiskIdentity; use omicron_common::zpool_name::ZpoolName; - use omicron_uuid_kinds::GenericUuid; - use omicron_uuid_kinds::OmicronZoneUuid; - use omicron_uuid_kinds::SledUuid; + use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use omicron_uuid_kinds::ZpoolUuid; + use omicron_uuid_kinds::{DemoSagaUuid, GenericUuid}; + use omicron_uuid_kinds::{ExternalIpUuid, SledUuid}; + use omicron_uuid_kinds::{OmicronZoneUuid, PhysicalDiskUuid}; + use oxnet::IpNet; use serde::Deserialize; use serde_json::json; - use std::collections::BTreeMap; - use std::net::SocketAddr; + use std::collections::{BTreeMap, BTreeSet}; + use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}; use std::sync::Arc; + use std::time::Duration; use tokio::sync::watch; use uuid::Uuid; @@ -174,6 +194,7 @@ mod test { blueprint_zones: BTreeMap, blueprint_disks: BTreeMap, dns_version: Generation, + enabled: bool, ) -> (BlueprintTarget, Blueprint) { let id = Uuid::new_v4(); // Assume all sleds are active. @@ -193,7 +214,7 @@ mod test { let target = BlueprintTarget { target_id: id, - enabled: true, + enabled, time_made_target: chrono::Utc::now(), }; let blueprint = Blueprint { @@ -299,13 +320,15 @@ mod test { BTreeMap::new(), BTreeMap::new(), generation, + true, ) .await, ); + let blueprint_id = blueprint.1.id; blueprint_tx.send(Some(blueprint)).unwrap(); let value = task.activate(&opctx).await; println!("activating with no zones: {:?}", value); - assert_eq!(value, json!({})); + assert_eq!(value, json!({"target_id": blueprint_id})); // Create a non-empty blueprint describing two servers and verify that // the task correctly winds up making requests to both of them and @@ -352,8 +375,10 @@ mod test { ]), BTreeMap::new(), generation, + true, ) .await; + let blueprint_id = blueprint.1.id; // Insert records for the zpools backing the datasets in these zones. for (sled_id, config) in @@ -393,7 +418,7 @@ mod test { // Activate the task to trigger zone configuration on the sled-agents let value = task.activate(&opctx).await; println!("activating two sled agents: {:?}", value); - assert_eq!(value, json!({})); + assert_eq!(value, json!({"target_id": blueprint_id})); s1.verify_and_clear(); s2.verify_and_clear(); @@ -449,4 +474,284 @@ mod test { s1.verify_and_clear(); s2.verify_and_clear(); } + + // Tests that sagas assigned to expunged Nexus zones get re-assigned, + // recovered, and completed. + #[nexus_test(server = crate::Server)] + async fn test_saga_reassignment(cptestctx: &ControlPlaneTestContext) { + let nexus_client = cptestctx.nexus_internal_client().await; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let logctx = &cptestctx.logctx; + let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone()); + + // First, create a blueprint whose execution will assign sagas to this + // Nexus. + let (new_nexus_zone_id, blueprint) = { + let sled_id = SledUuid::new_v4(); + let rack_id = Uuid::new_v4(); + let update = SledUpdate::new( + sled_id.into_untyped_uuid(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12345, 0, 0), + SledBaseboard { + serial_number: "one".into(), + part_number: "test".into(), + revision: 1, + }, + SledSystemHardware { + is_scrimlet: false, + usable_hardware_threads: 4, + usable_physical_ram: ByteCount(1000.into()), + reservoir_size: ByteCount(999.into()), + }, + rack_id, + nexus_db_model::Generation::new(), + ); + datastore + .sled_upsert(update) + .await + .expect("Failed to insert sled to db"); + + let new_nexus_zone_id = OmicronZoneUuid::new_v4(); + let pool_id = ZpoolUuid::new_v4(); + let zones = BTreeMap::from([( + sled_id, + BlueprintZonesConfig { + generation: Generation::new(), + zones: vec![BlueprintZoneConfig { + disposition: BlueprintZoneDisposition::Expunged, + id: new_nexus_zone_id, + underlay_address: "::1".parse().unwrap(), + filesystem_pool: Some(ZpoolName::new_external(pool_id)), + zone_type: BlueprintZoneType::Nexus( + blueprint_zone_type::Nexus { + internal_address: "[::1]:0".parse().unwrap(), + external_ip: OmicronZoneExternalFloatingIp { + id: ExternalIpUuid::new_v4(), + ip: Ipv4Addr::LOCALHOST.into(), + }, + nic: NetworkInterface { + id: Uuid::new_v4(), + kind: NetworkInterfaceKind::Service { + id: new_nexus_zone_id + .into_untyped_uuid(), + }, + name: "test-nic".parse().unwrap(), + ip: Ipv4Addr::LOCALHOST.into(), + mac: MacAddr::random_system(), + subnet: IpNet::new( + Ipv4Addr::LOCALHOST.into(), + 8, + ) + .unwrap(), + vni: Vni::SERVICES_VNI, + primary: true, + slot: 0, + transit_ips: Default::default(), + }, + external_tls: false, + external_dns_servers: Vec::new(), + }, + ), + }], + }, + )]); + let (_, blueprint) = create_blueprint( + &datastore, + &opctx, + zones, + BTreeMap::new(), + Generation::new(), + false, + ) + .await; + (new_nexus_zone_id, blueprint) + }; + + // Create an entry in the database for a new "demo" saga owned by that + // expunged Nexus. There will be no log entries, so it's as though that + // Nexus has just created the saga when it was expunged. + let new_saga_id = steno::SagaId(Uuid::new_v4()); + println!("new saga id {}", new_saga_id); + let other_nexus_sec_id = + nexus_db_model::SecId(new_nexus_zone_id.into_untyped_uuid()); + let demo_saga_id = DemoSagaUuid::from_untyped_uuid(Uuid::new_v4()); + let saga_params = demo::Params { id: demo_saga_id }; + let dag = serde_json::to_value( + create_saga_dag::(saga_params) + .expect("create demo saga DAG"), + ) + .expect("serialize demo saga DAG"); + let params = steno::SagaCreateParams { + id: new_saga_id, + name: steno::SagaName::new("test saga"), + dag, + state: steno::SagaCachedState::Running, + }; + let new_saga = nexus_db_model::Saga::new(other_nexus_sec_id, params); + datastore.saga_create(&new_saga).await.expect("created saga"); + + // Set the blueprint that we created as the target and enable execution. + println!("setting blueprint target {}", blueprint.id); + nexus_client + .blueprint_target_set_enabled(&BlueprintTargetSet { + target_id: blueprint.id, + enabled: true, + }) + .await + .expect("set blueprint target"); + + // Helper to take the response from bgtask_list() and pull out the + // status for a particular background task. + fn last_task_status_map<'a>( + tasks: &'a BTreeMap, + name: &'a str, + ) -> Result< + &'a serde_json::Map, + CondCheckError<()>, + > { + let task = tasks + .get(name) + .with_context(|| format!("missing task {:?}", name)) + .unwrap(); + let nexus_client::types::LastResult::Completed(r) = &task.last + else { + println!( + "waiting for task {:?} to complete at least once", + name + ); + return Err(CondCheckError::NotYet); + }; + let details = &r.details; + Ok(details + .as_object() + .with_context(|| { + format!("task {}: last status was not a map", name) + }) + .unwrap()) + } + + fn status_target_id( + status: &serde_json::Map, + ) -> Result> { + Ok(status + .get("target_id") + .ok_or_else(|| { + println!("waiting for task to report a target_id"); + CondCheckError::NotYet + })? + .as_str() + .expect("target_id was not a string") + .parse() + .expect("target_id was not a uuid")) + } + + // Wait a bit for: + // + // - the new target blueprint to be loaded + // - the new target blueprint to be executed + // - the saga to be transferred + // + // We could just wait for the end result but debugging will be easier if + // we can report where things got stuck. + wait_for_condition( + || async { + let task_status = nexus_client + .bgtask_list() + .await + .expect("list background tasks"); + let tasks = task_status + .into_inner() + .into_iter() + .collect::>(); + + // Wait for the loader to report our new blueprint as the + // current target. + let loader_status = + last_task_status_map(&tasks, "blueprint_loader")?; + println!("waiting: found loader status: {:?}", loader_status); + let target_id = status_target_id(&loader_status)?; + if target_id != blueprint.id { + println!( + "waiting: taking lap (loader target id is not ours)" + ); + return Err(CondCheckError::NotYet); + } + + // Wait for the execution task to report having tried to execute + // our blueprint. For our purposes, it's not critical that it + // succeeded. It might fail for reasons not having to do with + // what we're testing. + let execution_status = + last_task_status_map(&tasks, "blueprint_executor")?; + println!( + "waiting: found execution status: {:?}", + execution_status + ); + let target_id = status_target_id(&execution_status)?; + if target_id != blueprint.id { + println!( + "waiting: taking lap (execution target id is not ours)" + ); + return Err(CondCheckError::NotYet); + } + + // For debugging, print out the saga recovery task's status. + // It's not worth trying to parse it. And it might not yet have + // even carried out recovery. + println!( + "waiting: found saga recovery status: {:?}", + last_task_status_map(&tasks, "saga_recovery"), + ); + + // Wait too for our demo saga to show up in the saga list. That + // means recovery has completed. + let sagas = nexus_client + .saga_list_stream(None, None) + .try_collect::>() + .await + .expect("listed sagas"); + if sagas.iter().any(|s| s.id == new_saga_id.0) { + Ok(()) + } else { + println!("waiting: taking lap (saga not yet found)"); + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(50), + &Duration::from_secs(60), + ) + .await + .expect("execution completion"); + + // Now complete the demo saga. + nexus_client + .saga_demo_complete(&demo_saga_id) + .await + .expect("demo saga complete"); + + // And wait for it to actually finish. + wait_for_condition( + || async { + let saga = nexus_client + .saga_list_stream(None, None) + .try_collect::>() + .await + .expect("listed sagas") + .into_iter() + .find(|s| s.id == new_saga_id.0) + .expect("demo saga in list of sagas"); + println!("checking saga status: {:?}", saga); + if saga.state == SagaState::Succeeded { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(50), + &Duration::from_secs(10), + ) + .await + .expect("saga completion"); + } } diff --git a/nexus/test-utils/Cargo.toml b/nexus/test-utils/Cargo.toml index a883bc83c5..250f15e3d7 100644 --- a/nexus/test-utils/Cargo.toml +++ b/nexus/test-utils/Cargo.toml @@ -25,6 +25,7 @@ http.workspace = true hyper.workspace = true illumos-utils.workspace = true internal-dns.workspace = true +nexus-client.workspace = true nexus-config.workspace = true nexus-db-queries.workspace = true nexus-sled-agent-shared.workspace = true diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 7c190974a1..22f1fb27c2 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -142,6 +142,15 @@ impl ControlPlaneTestContext { format!("*.sys.{}", self.external_dns_zone_name) } + pub async fn nexus_internal_client(&self) -> nexus_client::Client { + let log = &self.logctx.log; + let nexus_internal_url = format!( + "http://{}", + self.server.get_http_server_internal_address().await + ); + nexus_client::Client::new(&nexus_internal_url, log.clone()) + } + pub async fn teardown(mut self) { self.server.close().await; self.database.cleanup().await.unwrap(); diff --git a/nexus/tests/integration_tests/demo_saga.rs b/nexus/tests/integration_tests/demo_saga.rs index 888fa35965..aa9b63159c 100644 --- a/nexus/tests/integration_tests/demo_saga.rs +++ b/nexus/tests/integration_tests/demo_saga.rs @@ -7,7 +7,6 @@ use futures::TryStreamExt; use nexus_client::types::Saga; use nexus_client::types::SagaState; -use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; use omicron_test_utils::dev::poll::wait_for_condition; use omicron_test_utils::dev::poll::CondCheckError; @@ -20,13 +19,7 @@ type ControlPlaneTestContext = // saga's state matches what we expect along the way. #[nexus_test] async fn test_demo_saga(cptestctx: &ControlPlaneTestContext) { - let log = &cptestctx.logctx.log; - let nexus_internal_url = format!( - "http://{}", - cptestctx.server.get_http_server_internal_address().await - ); - let nexus_client = - nexus_client::Client::new(&nexus_internal_url, log.clone()); + let nexus_client = cptestctx.nexus_internal_client().await; let sagas_before = list_sagas(&nexus_client).await; eprintln!("found sagas (before): {:?}", sagas_before); From befc6c6183ec5fdbc69ad50e867461b68f44a399 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Wed, 14 Aug 2024 16:15:50 -0700 Subject: [PATCH 6/9] Revert "first cut at end-to-end test" This reverts commit 6a31f2ad9d75216694ed0c3a131465cadc54e170. --- Cargo.lock | 1 - .../db-queries/src/db/datastore/deployment.rs | 4 - nexus/reconfigurator/execution/src/sagas.rs | 6 - .../background/tasks/blueprint_execution.rs | 329 +----------------- nexus/test-utils/Cargo.toml | 1 - nexus/test-utils/src/lib.rs | 9 - nexus/tests/integration_tests/demo_saga.rs | 9 +- 7 files changed, 20 insertions(+), 339 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c5befde253..74a7405e57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5178,7 +5178,6 @@ dependencies = [ "hyper 0.14.30", "illumos-utils", "internal-dns", - "nexus-client", "nexus-config", "nexus-db-queries", "nexus-sled-agent-shared", diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index 1e05bec6c1..d413f9507a 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -53,7 +53,6 @@ use nexus_types::deployment::CockroachDbPreserveDowngrade; use nexus_types::external_api::views::SledState; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; -use omicron_common::api::external::InternalContext; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; @@ -864,7 +863,6 @@ impl DataStore { SelectFlavor::ForUpdate, ) .await - .internal_context("getting current target") .map_err(|e| err.bail(e))?; if current_target.target_id != blueprint.id { return Err(err.bail(Error::invalid_request(format!( @@ -899,7 +897,6 @@ impl DataStore { .map(|(_sled_id, zone)| zone), ) .await - .internal_context("ensure_zone_external_networking_deallocated") .map_err(|e| err.bail(e))?; self.ensure_zone_external_networking_allocated_on_connection( &conn, @@ -911,7 +908,6 @@ impl DataStore { .map(|(_sled_id, zone)| zone), ) .await - .internal_context("ensure_zone_external_networking_allocated") .map_err(|e| err.bail(e))?; // See the comment on this method; this lets us wait until our diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs index 1bd7b2f1d0..58d9b95c0a 100644 --- a/nexus/reconfigurator/execution/src/sagas.rs +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -59,9 +59,3 @@ pub(crate) async fn reassign_sagas_from_expunged( } } } - -// We do not have tests at this layer since it's so thin. The datastore -// operation (which is the main thing above) is tested separately. There's an -// integration test in Nexus that tests not only that we re-assigned any -// in-progress sagas but that the recovery process picked them up and completed -// them. diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index 7e95b01e84..b430270ec9 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -107,9 +107,7 @@ impl BlueprintExecutor { // Return the result as a `serde_json::Value` match result { - Ok(_) => json!({ - "target_id": blueprint.id.to_string(), - }), + Ok(_) => json!({}), Err(errors) => { let errors: Vec<_> = errors.into_iter().map(|e| format!("{:#}", e)).collect(); @@ -135,53 +133,35 @@ impl BackgroundTask for BlueprintExecutor { mod test { use super::BlueprintExecutor; use crate::app::background::{Activator, BackgroundTask}; - use crate::app::saga::create_saga_dag; - use crate::app::sagas::demo; - use anyhow::Context; - use futures::TryStreamExt; use httptest::matchers::{all_of, request}; use httptest::responders::status_code; use httptest::Expectation; - use nexus_client::types::{BlueprintTargetSet, SagaState}; use nexus_db_model::{ ByteCount, SledBaseboard, SledSystemHardware, SledUpdate, Zpool, }; use nexus_db_queries::authn; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; - use nexus_reconfigurator_planning::blueprint_builder::BlueprintBuilder; use nexus_sled_agent_shared::inventory::OmicronZoneDataset; use nexus_test_utils_macros::nexus_test; + use nexus_types::deployment::BlueprintZoneFilter; use nexus_types::deployment::{ blueprint_zone_type, Blueprint, BlueprintPhysicalDisksConfig, BlueprintTarget, BlueprintZoneConfig, BlueprintZoneDisposition, BlueprintZoneType, BlueprintZonesConfig, CockroachDbPreserveDowngrade, - OmicronZoneExternalFloatingIp, SledDetails, SledDisk, SledResources, - }; - use nexus_types::deployment::{BlueprintZoneFilter, PlanningInputBuilder}; - use nexus_types::external_api::views::{ - PhysicalDiskPolicy, PhysicalDiskState, SledPolicy, SledProvisionPolicy, - SledState, }; - use omicron_common::address::{IpRange, Ipv4Range, Ipv6Subnet}; - use omicron_common::api::external::{Generation, MacAddr, Vni}; - use omicron_common::api::internal::shared::{ - NetworkInterface, NetworkInterfaceKind, - }; - use omicron_common::disk::DiskIdentity; + use nexus_types::external_api::views::SledState; + use omicron_common::api::external::Generation; use omicron_common::zpool_name::ZpoolName; - use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; + use omicron_uuid_kinds::GenericUuid; + use omicron_uuid_kinds::OmicronZoneUuid; + use omicron_uuid_kinds::SledUuid; use omicron_uuid_kinds::ZpoolUuid; - use omicron_uuid_kinds::{DemoSagaUuid, GenericUuid}; - use omicron_uuid_kinds::{ExternalIpUuid, SledUuid}; - use omicron_uuid_kinds::{OmicronZoneUuid, PhysicalDiskUuid}; - use oxnet::IpNet; use serde::Deserialize; use serde_json::json; - use std::collections::{BTreeMap, BTreeSet}; - use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}; + use std::collections::BTreeMap; + use std::net::SocketAddr; use std::sync::Arc; - use std::time::Duration; use tokio::sync::watch; use uuid::Uuid; @@ -194,7 +174,6 @@ mod test { blueprint_zones: BTreeMap, blueprint_disks: BTreeMap, dns_version: Generation, - enabled: bool, ) -> (BlueprintTarget, Blueprint) { let id = Uuid::new_v4(); // Assume all sleds are active. @@ -214,7 +193,7 @@ mod test { let target = BlueprintTarget { target_id: id, - enabled, + enabled: true, time_made_target: chrono::Utc::now(), }; let blueprint = Blueprint { @@ -320,15 +299,13 @@ mod test { BTreeMap::new(), BTreeMap::new(), generation, - true, ) .await, ); - let blueprint_id = blueprint.1.id; blueprint_tx.send(Some(blueprint)).unwrap(); let value = task.activate(&opctx).await; println!("activating with no zones: {:?}", value); - assert_eq!(value, json!({"target_id": blueprint_id})); + assert_eq!(value, json!({})); // Create a non-empty blueprint describing two servers and verify that // the task correctly winds up making requests to both of them and @@ -375,10 +352,8 @@ mod test { ]), BTreeMap::new(), generation, - true, ) .await; - let blueprint_id = blueprint.1.id; // Insert records for the zpools backing the datasets in these zones. for (sled_id, config) in @@ -418,7 +393,7 @@ mod test { // Activate the task to trigger zone configuration on the sled-agents let value = task.activate(&opctx).await; println!("activating two sled agents: {:?}", value); - assert_eq!(value, json!({"target_id": blueprint_id})); + assert_eq!(value, json!({})); s1.verify_and_clear(); s2.verify_and_clear(); @@ -474,284 +449,4 @@ mod test { s1.verify_and_clear(); s2.verify_and_clear(); } - - // Tests that sagas assigned to expunged Nexus zones get re-assigned, - // recovered, and completed. - #[nexus_test(server = crate::Server)] - async fn test_saga_reassignment(cptestctx: &ControlPlaneTestContext) { - let nexus_client = cptestctx.nexus_internal_client().await; - let nexus = &cptestctx.server.server_context().nexus; - let datastore = nexus.datastore(); - let logctx = &cptestctx.logctx; - let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone()); - - // First, create a blueprint whose execution will assign sagas to this - // Nexus. - let (new_nexus_zone_id, blueprint) = { - let sled_id = SledUuid::new_v4(); - let rack_id = Uuid::new_v4(); - let update = SledUpdate::new( - sled_id.into_untyped_uuid(), - SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12345, 0, 0), - SledBaseboard { - serial_number: "one".into(), - part_number: "test".into(), - revision: 1, - }, - SledSystemHardware { - is_scrimlet: false, - usable_hardware_threads: 4, - usable_physical_ram: ByteCount(1000.into()), - reservoir_size: ByteCount(999.into()), - }, - rack_id, - nexus_db_model::Generation::new(), - ); - datastore - .sled_upsert(update) - .await - .expect("Failed to insert sled to db"); - - let new_nexus_zone_id = OmicronZoneUuid::new_v4(); - let pool_id = ZpoolUuid::new_v4(); - let zones = BTreeMap::from([( - sled_id, - BlueprintZonesConfig { - generation: Generation::new(), - zones: vec![BlueprintZoneConfig { - disposition: BlueprintZoneDisposition::Expunged, - id: new_nexus_zone_id, - underlay_address: "::1".parse().unwrap(), - filesystem_pool: Some(ZpoolName::new_external(pool_id)), - zone_type: BlueprintZoneType::Nexus( - blueprint_zone_type::Nexus { - internal_address: "[::1]:0".parse().unwrap(), - external_ip: OmicronZoneExternalFloatingIp { - id: ExternalIpUuid::new_v4(), - ip: Ipv4Addr::LOCALHOST.into(), - }, - nic: NetworkInterface { - id: Uuid::new_v4(), - kind: NetworkInterfaceKind::Service { - id: new_nexus_zone_id - .into_untyped_uuid(), - }, - name: "test-nic".parse().unwrap(), - ip: Ipv4Addr::LOCALHOST.into(), - mac: MacAddr::random_system(), - subnet: IpNet::new( - Ipv4Addr::LOCALHOST.into(), - 8, - ) - .unwrap(), - vni: Vni::SERVICES_VNI, - primary: true, - slot: 0, - transit_ips: Default::default(), - }, - external_tls: false, - external_dns_servers: Vec::new(), - }, - ), - }], - }, - )]); - let (_, blueprint) = create_blueprint( - &datastore, - &opctx, - zones, - BTreeMap::new(), - Generation::new(), - false, - ) - .await; - (new_nexus_zone_id, blueprint) - }; - - // Create an entry in the database for a new "demo" saga owned by that - // expunged Nexus. There will be no log entries, so it's as though that - // Nexus has just created the saga when it was expunged. - let new_saga_id = steno::SagaId(Uuid::new_v4()); - println!("new saga id {}", new_saga_id); - let other_nexus_sec_id = - nexus_db_model::SecId(new_nexus_zone_id.into_untyped_uuid()); - let demo_saga_id = DemoSagaUuid::from_untyped_uuid(Uuid::new_v4()); - let saga_params = demo::Params { id: demo_saga_id }; - let dag = serde_json::to_value( - create_saga_dag::(saga_params) - .expect("create demo saga DAG"), - ) - .expect("serialize demo saga DAG"); - let params = steno::SagaCreateParams { - id: new_saga_id, - name: steno::SagaName::new("test saga"), - dag, - state: steno::SagaCachedState::Running, - }; - let new_saga = nexus_db_model::Saga::new(other_nexus_sec_id, params); - datastore.saga_create(&new_saga).await.expect("created saga"); - - // Set the blueprint that we created as the target and enable execution. - println!("setting blueprint target {}", blueprint.id); - nexus_client - .blueprint_target_set_enabled(&BlueprintTargetSet { - target_id: blueprint.id, - enabled: true, - }) - .await - .expect("set blueprint target"); - - // Helper to take the response from bgtask_list() and pull out the - // status for a particular background task. - fn last_task_status_map<'a>( - tasks: &'a BTreeMap, - name: &'a str, - ) -> Result< - &'a serde_json::Map, - CondCheckError<()>, - > { - let task = tasks - .get(name) - .with_context(|| format!("missing task {:?}", name)) - .unwrap(); - let nexus_client::types::LastResult::Completed(r) = &task.last - else { - println!( - "waiting for task {:?} to complete at least once", - name - ); - return Err(CondCheckError::NotYet); - }; - let details = &r.details; - Ok(details - .as_object() - .with_context(|| { - format!("task {}: last status was not a map", name) - }) - .unwrap()) - } - - fn status_target_id( - status: &serde_json::Map, - ) -> Result> { - Ok(status - .get("target_id") - .ok_or_else(|| { - println!("waiting for task to report a target_id"); - CondCheckError::NotYet - })? - .as_str() - .expect("target_id was not a string") - .parse() - .expect("target_id was not a uuid")) - } - - // Wait a bit for: - // - // - the new target blueprint to be loaded - // - the new target blueprint to be executed - // - the saga to be transferred - // - // We could just wait for the end result but debugging will be easier if - // we can report where things got stuck. - wait_for_condition( - || async { - let task_status = nexus_client - .bgtask_list() - .await - .expect("list background tasks"); - let tasks = task_status - .into_inner() - .into_iter() - .collect::>(); - - // Wait for the loader to report our new blueprint as the - // current target. - let loader_status = - last_task_status_map(&tasks, "blueprint_loader")?; - println!("waiting: found loader status: {:?}", loader_status); - let target_id = status_target_id(&loader_status)?; - if target_id != blueprint.id { - println!( - "waiting: taking lap (loader target id is not ours)" - ); - return Err(CondCheckError::NotYet); - } - - // Wait for the execution task to report having tried to execute - // our blueprint. For our purposes, it's not critical that it - // succeeded. It might fail for reasons not having to do with - // what we're testing. - let execution_status = - last_task_status_map(&tasks, "blueprint_executor")?; - println!( - "waiting: found execution status: {:?}", - execution_status - ); - let target_id = status_target_id(&execution_status)?; - if target_id != blueprint.id { - println!( - "waiting: taking lap (execution target id is not ours)" - ); - return Err(CondCheckError::NotYet); - } - - // For debugging, print out the saga recovery task's status. - // It's not worth trying to parse it. And it might not yet have - // even carried out recovery. - println!( - "waiting: found saga recovery status: {:?}", - last_task_status_map(&tasks, "saga_recovery"), - ); - - // Wait too for our demo saga to show up in the saga list. That - // means recovery has completed. - let sagas = nexus_client - .saga_list_stream(None, None) - .try_collect::>() - .await - .expect("listed sagas"); - if sagas.iter().any(|s| s.id == new_saga_id.0) { - Ok(()) - } else { - println!("waiting: taking lap (saga not yet found)"); - Err(CondCheckError::<()>::NotYet) - } - }, - &Duration::from_millis(50), - &Duration::from_secs(60), - ) - .await - .expect("execution completion"); - - // Now complete the demo saga. - nexus_client - .saga_demo_complete(&demo_saga_id) - .await - .expect("demo saga complete"); - - // And wait for it to actually finish. - wait_for_condition( - || async { - let saga = nexus_client - .saga_list_stream(None, None) - .try_collect::>() - .await - .expect("listed sagas") - .into_iter() - .find(|s| s.id == new_saga_id.0) - .expect("demo saga in list of sagas"); - println!("checking saga status: {:?}", saga); - if saga.state == SagaState::Succeeded { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &Duration::from_millis(50), - &Duration::from_secs(10), - ) - .await - .expect("saga completion"); - } } diff --git a/nexus/test-utils/Cargo.toml b/nexus/test-utils/Cargo.toml index 250f15e3d7..a883bc83c5 100644 --- a/nexus/test-utils/Cargo.toml +++ b/nexus/test-utils/Cargo.toml @@ -25,7 +25,6 @@ http.workspace = true hyper.workspace = true illumos-utils.workspace = true internal-dns.workspace = true -nexus-client.workspace = true nexus-config.workspace = true nexus-db-queries.workspace = true nexus-sled-agent-shared.workspace = true diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 22f1fb27c2..7c190974a1 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -142,15 +142,6 @@ impl ControlPlaneTestContext { format!("*.sys.{}", self.external_dns_zone_name) } - pub async fn nexus_internal_client(&self) -> nexus_client::Client { - let log = &self.logctx.log; - let nexus_internal_url = format!( - "http://{}", - self.server.get_http_server_internal_address().await - ); - nexus_client::Client::new(&nexus_internal_url, log.clone()) - } - pub async fn teardown(mut self) { self.server.close().await; self.database.cleanup().await.unwrap(); diff --git a/nexus/tests/integration_tests/demo_saga.rs b/nexus/tests/integration_tests/demo_saga.rs index aa9b63159c..888fa35965 100644 --- a/nexus/tests/integration_tests/demo_saga.rs +++ b/nexus/tests/integration_tests/demo_saga.rs @@ -7,6 +7,7 @@ use futures::TryStreamExt; use nexus_client::types::Saga; use nexus_client::types::SagaState; +use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; use omicron_test_utils::dev::poll::wait_for_condition; use omicron_test_utils::dev::poll::CondCheckError; @@ -19,7 +20,13 @@ type ControlPlaneTestContext = // saga's state matches what we expect along the way. #[nexus_test] async fn test_demo_saga(cptestctx: &ControlPlaneTestContext) { - let nexus_client = cptestctx.nexus_internal_client().await; + let log = &cptestctx.logctx.log; + let nexus_internal_url = format!( + "http://{}", + cptestctx.server.get_http_server_internal_address().await + ); + let nexus_client = + nexus_client::Client::new(&nexus_internal_url, log.clone()); let sagas_before = list_sagas(&nexus_client).await; eprintln!("found sagas (before): {:?}", sagas_before); From c14ad8c9893e6335113f10dcf5331ca01d927343 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Wed, 14 Aug 2024 16:55:13 -0700 Subject: [PATCH 7/9] add a note for our future selves --- nexus/reconfigurator/execution/src/sagas.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs index 58d9b95c0a..458328ef00 100644 --- a/nexus/reconfigurator/execution/src/sagas.rs +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -25,6 +25,16 @@ pub(crate) async fn reassign_sagas_from_expunged( // Identify any Nexus zones that have been expunged and need to have sagas // re-assigned. + // + // TODO: Currently, we take any expunged Nexus instances and attempt to + // assign all their sagas to ourselves. Per RFD 289, we can only re-assign + // sagas between two instances of Nexus that are at the same version. Right + // now this can't happen so there's nothing to do here to ensure that + // constraint. However, once we support allowing the control plane to be + // online _during_ an upgrade, there may be multiple different Nexus + // instances running at the same time. At that point, we will need to make + // sure that we only ever try to assign ourselves sagas from other Nexus + // instances that we know are running the same version as ourselves. let nexus_zone_ids: Vec<_> = blueprint .all_omicron_zones(BlueprintZoneFilter::Expunged) .filter_map(|(_, z)| { From 87e57183e7c702a5dc3eea42b3ba4d24a029cf52 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Fri, 16 Aug 2024 15:06:29 -0700 Subject: [PATCH 8/9] clean up Nexus/SEC distinction a bit --- nexus/db-queries/src/db/datastore/saga.rs | 32 +++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 53b505c0e5..2413a3f6de 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -201,26 +201,26 @@ impl DataStore { Ok(events) } - /// Updates all sagas that are currently assigned to any of the Nexus - /// instances in `nexus_zone_ids`, assigning them to `new_sec_id` instead. + /// Updates all sagas that are currently assigned to any of the SEC ids in + /// `sec_ids`, assigning them to `new_sec_id` instead. /// - /// This change causes the Nexus instance `new_sec_id` to discover these - /// sagas and resume executing them the next time it performs saga recovery - /// (which is normally on startup and periodically). Generally, - /// `new_sec_id` is the _current_ Nexus instance and the caller should - /// activate the saga recovery background task after calling this function - /// to immediately resume the newly-assigned sagas. + /// Generally, an SEC id corresponds to a Nexus id. This change causes the + /// Nexus instance `new_sec_id` to discover these sagas and resume executing + /// them the next time it performs saga recovery (which is normally on + /// startup and periodically). Generally, `new_sec_id` is the _current_ + /// Nexus instance and the caller should activate the saga recovery + /// background task after calling this function to immediately resume the + /// newly-assigned sagas. /// - /// **Warning:** This operation is only safe if the Nexus instances - /// `nexus_zone_ids` are not currently running. If those Nexus instances - /// are still running, then two (or more) Nexus instances may wind up - /// running the same saga concurrently. This would likely violate implicit - /// assumptions made by various saga actions, leading to hard-to-debug - /// errors and state corruption. + /// **Warning:** This operation is only safe if the other SECs `sec_ids` are + /// not currently running. If those SECs are still running, then two (or + /// more) SECs may wind up running the same saga concurrently. This would + /// likely violate implicit assumptions made by various saga actions, + /// leading to hard-to-debug errors and state corruption. pub async fn sagas_reassign_sec( &self, opctx: &OpContext, - nexus_zone_ids: &[db::saga_types::SecId], + sec_ids: &[db::saga_types::SecId], new_sec_id: db::saga_types::SecId, ) -> Result { opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; @@ -237,7 +237,7 @@ impl DataStore { dsl::saga .filter(dsl::current_sec.is_not_null()) .filter(dsl::current_sec.eq_any( - nexus_zone_ids.into_iter().cloned().collect::>(), + sec_ids.into_iter().cloned().collect::>(), )) .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( steno::SagaCachedState::Done, From a5210caf773da375e2261d89623b2de5f181efa3 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Fri, 16 Aug 2024 15:10:52 -0700 Subject: [PATCH 9/9] rustfmt --- nexus/db-queries/src/db/datastore/saga.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 2413a3f6de..0b626804e1 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -236,9 +236,11 @@ impl DataStore { diesel::update( dsl::saga .filter(dsl::current_sec.is_not_null()) - .filter(dsl::current_sec.eq_any( - sec_ids.into_iter().cloned().collect::>(), - )) + .filter( + dsl::current_sec.eq_any( + sec_ids.into_iter().cloned().collect::>(), + ), + ) .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( steno::SagaCachedState::Done, ))),