From b30980907c8e3e4b275e28f2a5bddde3510f7f0e Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Fri, 2 Aug 2024 11:58:52 -0700 Subject: [PATCH] abandoned attempt to do this with limits --- nexus/db-queries/src/db/datastore/saga.rs | 117 ++++++++++++++++++ nexus/reconfigurator/execution/src/lib.rs | 38 +++++- nexus/reconfigurator/execution/src/sagas.rs | 59 +++++++++ .../background/tasks/blueprint_execution.rs | 2 + 4 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 nexus/reconfigurator/execution/src/sagas.rs diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 939929e665..bf274f2456 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -13,14 +13,34 @@ use crate::db::model::Generation; use crate::db::pagination::paginated; use crate::db::pagination::paginated_multicolumn; use crate::db::pagination::Paginator; +use crate::db::pool::DbConnection; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; +use nexus_auth::authz; use nexus_auth::context::OpContext; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; +use std::ops::Add; + +/// Reports the result of `sagas_reassign_sec_batched()`. +/// +/// Callers need to know two things: +/// +/// 1. Whether any sagas may have been re-assigned +/// (because they'll want to kick off saga recovery for them) +/// 2. Whether any errors were encountered +#[derive(Debug)] +pub(crate) enum Reassigned { + /// We successfully re-assigned all sagas that needed it + All { count: u32 }, + /// We encountered an error and cannot tell how many sagas were re-assigned. + /// It was at least this many. (The count can be zero, but it's still + /// possible that some were re-assigned.) + AtLeast { count: u32, error: Error }, +} impl DataStore { pub async fn saga_create( @@ -207,6 +227,93 @@ impl DataStore { Ok(events) } + + pub async fn sagas_reassign_sec_all_batched( + &self, + opctx: &OpContext, + nexus_zone_ids: &[db::saga_types::SecId], + new_sec_id: db::saga_types::SecId, + ) -> Reassigned { + let now = chrono::Utc::now(); + let conn = match self.pool_connection_authorized(opctx).await { + Ok(c) => c, + Err(error) => return Reassigned::AtLeast { count: 0, error }, + }; + + let mut count = 0; + let limit: u32 = SQL_BATCH_SIZE.get(); + loop { + debug!(&opctx.log, "sagas_reassign_sec_batched"; + "count_so_far" => count); + match self + .sagas_reassign_sec( + &conn, + opctx, + nexus_zone_ids, + new_sec_id, + limit, + now, + ) + .await + { + Ok(c) => { + count += c; + if c < limit { + break; + } + } + Err(error) => { + return Reassigned::AtLeast { count, error }; + } + } + } + + Reassigned::All { count } + } + + // XXX-dap TODO-doc + async fn sagas_reassign_sec( + &self, + conn: &async_bb8_diesel::Connection, + opctx: &OpContext, + nexus_zone_ids: &[db::saga_types::SecId], + new_sec_id: db::saga_types::SecId, + limit: u32, + time: chrono::DateTime, + ) -> Result { + use db::schema::saga::dsl; + + opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + + diesel::update( + dsl::saga + .filter(dsl::current_sec.is_not_null()) + .filter(dsl::current_sec.eq_any( + nexus_zone_ids.into_iter().cloned().collect::>(), + )) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))) + .limit(i64::from(limit)), + ) + .set(( + dsl::current_sec.eq(Some(new_sec_id)), + dsl::adopt_generation.eq(dsl::adopt_generation.add(1)), + dsl::adopt_time.eq(time), + )) + .execute_async(conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .and_then(|c_usize| { + u32::try_from(c_usize).map_err(|_| { + Error::internal_error(&format!( + "database reported unexpected count of \ + records updated (did not fit into u32): {}", + c_usize, + )) + }) + }) + } } #[cfg(test)] @@ -510,3 +617,13 @@ mod test { } } } + +// XXX-dap TODO-coverage want test that inserts: +// - some sagas assigned to this SEC that are done +// - more than BATCH_SIZE sagas assigned to this SEC that are unwinding +// - more than BATCH_SIZE sagas assigned to this SEC that are running +// - some sagas assigned to another SEC that are running +// +// then two tests: +// 1. run the one-batch thing and make sure we only got at most the batch size +// 2. run the whole thing and make sure that we got exactly the right ones diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index e3d2019230..d41e35f0a4 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -24,6 +24,7 @@ use slog::info; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::net::SocketAddrV6; +use uuid::Uuid; mod cockroachdb; mod datasets; @@ -32,6 +33,7 @@ mod external_networking; mod omicron_physical_disks; mod omicron_zones; mod overridables; +mod sagas; mod sled_state; pub use dns::blueprint_external_dns_config; @@ -80,6 +82,7 @@ pub async fn realize_blueprint( resolver: &Resolver, blueprint: &Blueprint, nexus_label: S, + nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? ) -> Result<(), Vec> where String: From, @@ -90,6 +93,7 @@ where resolver, blueprint, nexus_label, + nexus_id, &Default::default(), ) .await @@ -101,6 +105,7 @@ pub async fn realize_blueprint_with_overrides( resolver: &Resolver, blueprint: &Blueprint, nexus_label: S, + nexus_id: Uuid, // XXX-dap type, and replace nexus_label too? overrides: &Overridables, ) -> Result<(), Vec> where @@ -233,14 +238,39 @@ where omicron_physical_disks::decommission_expunged_disks(&opctx, datastore) .await?; + // From this point on, we'll assume that any errors that we encounter do + // *not* require stopping execution. We'll just accumulate them and return + // them all at the end. + // + // TODO We should probably do this with more of the errors above, too. + let mut errors = Vec::new(); + + // For any expunged Nexus zones, re-assign in-progress sagas to some other + // Nexus. If this fails for some reason, it doesn't affect anything else. + let sec_id = nexus_db_model::SecId(nexus_id); + if let Err(error) = sagas::reassign_sagas_from_expunged( + &opctx, datastore, blueprint, sec_id, + ) + .await + .context("failed to re-assign sagas") + { + errors.push(error); + } + // This is likely to error if any cluster upgrades are in progress (which // can take some time), so it should remain at the end so that other parts // of the blueprint can progress normally. - cockroachdb::ensure_settings(&opctx, datastore, blueprint) - .await - .map_err(|err| vec![err])?; + if let Err(error) = + cockroachdb::ensure_settings(&opctx, datastore, blueprint).await + { + errors.push(error); + } - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } } #[cfg(test)] diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs new file mode 100644 index 0000000000..1c15413d40 --- /dev/null +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -0,0 +1,59 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Re-assign sagas from expunged Nexus zones + +use nexus_db_model::SecId; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::Blueprint; +use nexus_types::deployment::BlueprintZoneFilter; +use omicron_common::api::external::Error; +use slog::info; +use std::num::NonZeroU32; + +/// Reports what happened when we tried to re-assign sagas +/// +/// Callers need to know two things: +/// +/// 1. Whether any sagas may have been re-assigned +/// (because they'll want to kick off saga recovery for them) +/// 2. Whether any errors were encountered +#[derive(Debug)] +pub(crate) enum Reassigned { + /// We successfully re-assigned all sagas that needed it + All { count: u32 }, + /// We encountered an error and cannot tell how many sagas were re-assigned. + /// It was at least this many. (The count can be zero, but it's still + /// possible that some were re-assigned.) + AtLeast { count: u32, error: Error }, +} + +/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the +/// specified nexus (`nexus_id`). +// TODO-dap activate recovery background task +pub(crate) async fn reassign_sagas_from_expunged( + opctx: &OpContext, + datastore: &DataStore, + blueprint: &Blueprint, + nexus_id: SecId, +) -> Reassigned { + let log = &opctx.log; + + // Identify any Nexus zones that have been expunged and need to have sagas + // re-assigned. + let nexus_zones_ids: Vec<_> = blueprint + .all_omicron_zones(BlueprintZoneFilter::Expunged) + .filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id)) + .collect(); + + debug!(log, "re-assign sagas: found Nexus instances"; + "nexus_zones_ids" => ?nexus_zones_ids); + + + let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id); + info!(log, "re-assign sagas"; + "nexus_zones_ids" => ?nexus_zones_ids); + +} diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index 460d74360d..fc8673839f 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -76,12 +76,14 @@ impl BlueprintExecutor { }); } + let nexus_id = todo!(); // XXX-dap let result = nexus_reconfigurator_execution::realize_blueprint( opctx, &self.datastore, &self.resolver, blueprint, &self.nexus_label, + nexus_id, ) .await;