diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 939929e665..0b626804e1 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -9,7 +9,6 @@ use super::SQL_BATCH_SIZE; use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; -use crate::db::model::Generation; use crate::db::pagination::paginated; use crate::db::pagination::paginated_multicolumn; use crate::db::pagination::Paginator; @@ -17,10 +16,12 @@ use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; +use nexus_auth::authz; use nexus_auth::context::OpContext; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; +use std::ops::Add; impl DataStore { pub async fn saga_create( @@ -80,21 +81,15 @@ impl DataStore { /// now, we're implementing saga adoption only in cases where the original /// SEC/Nexus has been expunged.) /// - /// However, in the future, it may be possible for multiple SECs to try and - /// update the same saga, and overwrite each other's state. For example, - /// one SEC might try and update the state to Running while the other one - /// updates it to Done. That case would have to be carefully considered and - /// tested here, probably using the (currently unused) - /// `current_adopt_generation` field to enable optimistic concurrency. - /// - /// To reiterate, we are *not* considering the case where several SECs try - /// to update the same saga. That will be a future enhancement. + /// It's conceivable that multiple SECs do try to udpate the same saga + /// concurrently. That would be a bug. This is noticed and prevented by + /// making this query conditional on current_sec and failing with a conflict + /// if the current SEC has changed. pub async fn saga_update_state( &self, saga_id: steno::SagaId, new_state: steno::SagaCachedState, current_sec: db::saga_types::SecId, - current_adopt_generation: Generation, ) -> Result<(), Error> { use db::schema::saga::dsl; @@ -102,7 +97,6 @@ impl DataStore { let result = diesel::update(dsl::saga) .filter(dsl::id.eq(saga_id)) .filter(dsl::current_sec.eq(current_sec)) - .filter(dsl::adopt_generation.eq(current_adopt_generation)) .set(dsl::saga_state.eq(db::saga_types::SagaCachedState(new_state))) .check_if_exists::(saga_id) .execute_and_check(&*self.pool_connection_unauthorized().await?) @@ -119,20 +113,19 @@ impl DataStore { match result.status { UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => Err(Error::invalid_request( - format!( - "failed to update saga {:?} with state {:?}: preconditions not met: \ - expected current_sec = {:?}, adopt_generation = {:?}, \ - but found current_sec = {:?}, adopt_generation = {:?}, state = {:?}", + UpdateStatus::NotUpdatedButExists => { + Err(Error::invalid_request(format!( + "failed to update saga {:?} with state {:?}:\ + preconditions not met: \ + expected current_sec = {:?}, \ + but found current_sec = {:?}, state = {:?}", saga_id, new_state, current_sec, - current_adopt_generation, result.found.current_sec, - result.found.adopt_generation, result.found.saga_state, - ) - )), + ))) + } } } @@ -207,16 +200,75 @@ impl DataStore { Ok(events) } + + /// Updates all sagas that are currently assigned to any of the SEC ids in + /// `sec_ids`, assigning them to `new_sec_id` instead. + /// + /// Generally, an SEC id corresponds to a Nexus id. This change causes the + /// Nexus instance `new_sec_id` to discover these sagas and resume executing + /// them the next time it performs saga recovery (which is normally on + /// startup and periodically). Generally, `new_sec_id` is the _current_ + /// Nexus instance and the caller should activate the saga recovery + /// background task after calling this function to immediately resume the + /// newly-assigned sagas. + /// + /// **Warning:** This operation is only safe if the other SECs `sec_ids` are + /// not currently running. If those SECs are still running, then two (or + /// more) SECs may wind up running the same saga concurrently. This would + /// likely violate implicit assumptions made by various saga actions, + /// leading to hard-to-debug errors and state corruption. + pub async fn sagas_reassign_sec( + &self, + opctx: &OpContext, + sec_ids: &[db::saga_types::SecId], + new_sec_id: db::saga_types::SecId, + ) -> Result { + opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + + let now = chrono::Utc::now(); + let conn = self.pool_connection_authorized(opctx).await?; + + // It would be more robust to do this in batches. However, Diesel does + // not appear to support the UPDATE ... LIMIT syntax using the normal + // builder. In practice, it's extremely unlikely we'd have so many + // in-progress sagas that this would be a problem. + use db::schema::saga::dsl; + diesel::update( + dsl::saga + .filter(dsl::current_sec.is_not_null()) + .filter( + dsl::current_sec.eq_any( + sec_ids.into_iter().cloned().collect::>(), + ), + ) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))), + ) + .set(( + dsl::current_sec.eq(Some(new_sec_id)), + dsl::adopt_generation.eq(dsl::adopt_generation.add(1)), + dsl::adopt_time.eq(now), + )) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] mod test { use super::*; use crate::db::datastore::test_utils::datastore_test; + use async_bb8_diesel::AsyncConnection; + use async_bb8_diesel::AsyncSimpleConnection; + use db::queries::ALLOW_FULL_TABLE_SCAN_SQL; use nexus_db_model::{SagaNodeEvent, SecId}; use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::Generation; use omicron_test_utils::dev; use rand::seq::SliceRandom; + use std::collections::BTreeSet; use uuid::Uuid; // Tests pagination in listing sagas that are candidates for recovery @@ -440,7 +492,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Running, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Running again"); @@ -451,7 +502,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Done, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Done"); @@ -463,7 +513,6 @@ mod test { node_cx.saga_id, steno::SagaCachedState::Done, node_cx.sec_id, - db::model::Generation::new(), ) .await .expect("updating state to Done again"); @@ -509,4 +558,156 @@ mod test { SagaNodeEvent::new(event, self.sec_id) } } + + #[tokio::test] + async fn test_saga_reassignment() { + // Test setup + let logctx = dev::test_setup_log("test_saga_reassignment"); + let mut db = test_setup_database(&logctx.log).await; + let (_, datastore) = datastore_test(&logctx, &db).await; + let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone()); + + // Populate the database with a few different sagas: + // + // - assigned to SEC A: done, running, and unwinding + // - assigned to SEC B: done, running, and unwinding + // - assigned to SEC C: done, running, and unwinding + // - assigned to SEC D: done, running, and unwinding + // + // Then we'll reassign SECs B's and C's sagas to SEC A and check exactly + // which sagas were changed by this. This exercises: + // - that we don't touch A's sagas (the one we're assigning *to*) + // - that we do touch both B's and C's sagas (the ones we're assigning + // *from*) + // - that we don't touch D's sagas (some other SEC) + // - that we don't touch any "done" sagas + // - that we do touch both running and unwinding sagas + let mut sagas_to_insert = Vec::new(); + let sec_a = SecId(Uuid::new_v4()); + let sec_b = SecId(Uuid::new_v4()); + let sec_c = SecId(Uuid::new_v4()); + let sec_d = SecId(Uuid::new_v4()); + + for sec_id in [sec_a, sec_b, sec_c, sec_d] { + for state in [ + steno::SagaCachedState::Running, + steno::SagaCachedState::Unwinding, + steno::SagaCachedState::Done, + ] { + let params = steno::SagaCreateParams { + id: steno::SagaId(Uuid::new_v4()), + name: steno::SagaName::new("tewst saga"), + dag: serde_json::value::Value::Null, + state, + }; + + sagas_to_insert + .push(db::model::saga_types::Saga::new(sec_id, params)); + } + } + println!("sagas to insert: {:?}", sagas_to_insert); + + // These two sets are complements, but we write out the conditions to + // double-check that we've got it right. + let sagas_affected: BTreeSet<_> = sagas_to_insert + .iter() + .filter_map(|saga| { + ((saga.creator == sec_b || saga.creator == sec_c) + && (saga.saga_state.0 == steno::SagaCachedState::Running + || saga.saga_state.0 + == steno::SagaCachedState::Unwinding)) + .then(|| saga.id) + }) + .collect(); + let sagas_unaffected: BTreeSet<_> = sagas_to_insert + .iter() + .filter_map(|saga| { + (saga.creator == sec_a + || saga.creator == sec_d + || saga.saga_state.0 == steno::SagaCachedState::Done) + .then(|| saga.id) + }) + .collect(); + println!("sagas affected: {:?}", sagas_affected); + println!("sagas UNaffected: {:?}", sagas_unaffected); + assert_eq!(sagas_affected.intersection(&sagas_unaffected).count(), 0); + assert_eq!( + sagas_affected.len() + sagas_unaffected.len(), + sagas_to_insert.len() + ); + + // Insert the sagas. + let count = { + use db::schema::saga::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(dsl::saga) + .values(sagas_to_insert) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .expect("successful insertion") + }; + assert_eq!(count, sagas_affected.len() + sagas_unaffected.len()); + + // Reassign uncompleted sagas from SECs B and C to SEC A. + let nreassigned = datastore + .sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) + .await + .expect("failed to re-assign sagas"); + + // Fetch all the sagas and check their states. + let all_sagas: Vec<_> = datastore + .pool_connection_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + use db::schema::saga::dsl; + conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?; + dsl::saga + .select(nexus_db_model::Saga::as_select()) + .load_async(&conn) + .await + }) + .await + .unwrap(); + + for saga in all_sagas { + println!("checking saga: {:?}", saga); + let current_sec = saga.current_sec.unwrap(); + if sagas_affected.contains(&saga.id) { + assert!(saga.creator == sec_b || saga.creator == sec_c); + assert_eq!(current_sec, sec_a); + assert_eq!(*saga.adopt_generation, Generation::from(2)); + assert!( + saga.saga_state.0 == steno::SagaCachedState::Running + || saga.saga_state.0 + == steno::SagaCachedState::Unwinding + ); + } else if sagas_unaffected.contains(&saga.id) { + assert_eq!(current_sec, saga.creator); + assert_eq!(*saga.adopt_generation, Generation::from(1)); + // Its SEC and state could be anything since we've deliberately + // included sagas with various states and SECs that should not + // be affected by the reassignment. + } else { + println!( + "ignoring saga that was not created by this test: {:?}", + saga + ); + } + } + + assert_eq!(nreassigned, sagas_affected.len()); + + // If we do it again, we should make no changes. + let nreassigned = datastore + .sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) + .await + .expect("failed to re-assign sagas"); + assert_eq!(nreassigned, 0); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } diff --git a/nexus/db-queries/src/db/sec_store.rs b/nexus/db-queries/src/db/sec_store.rs index 0dcc3aa717..920ff3aee1 100644 --- a/nexus/db-queries/src/db/sec_store.rs +++ b/nexus/db-queries/src/db/sec_store.rs @@ -4,7 +4,7 @@ //! Implementation of [`steno::SecStore`] backed by Omicron's database -use crate::db::{self, model::Generation}; +use crate::db; use anyhow::Context; use async_trait::async_trait; use dropshot::HttpError; @@ -102,12 +102,7 @@ impl steno::SecStore for CockroachDbSecStore { &log, || { self.datastore - .saga_update_state( - id, - update, - self.sec_id, - Generation::new(), - ) + .saga_update_state(id, update, self.sec_id) .map_err(backoff::BackoffError::transient) }, "updating saga state", diff --git a/nexus/reconfigurator/execution/src/cockroachdb.rs b/nexus/reconfigurator/execution/src/cockroachdb.rs index 498944598d..277f5f91c4 100644 --- a/nexus/reconfigurator/execution/src/cockroachdb.rs +++ b/nexus/reconfigurator/execution/src/cockroachdb.rs @@ -39,6 +39,7 @@ mod test { use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::CockroachDbClusterVersion; use std::sync::Arc; + use uuid::Uuid; type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; @@ -101,7 +102,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await diff --git a/nexus/reconfigurator/execution/src/dns.rs b/nexus/reconfigurator/execution/src/dns.rs index 846d19ead3..4395944b25 100644 --- a/nexus/reconfigurator/execution/src/dns.rs +++ b/nexus/reconfigurator/execution/src/dns.rs @@ -1250,7 +1250,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1390,7 +1390,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1464,7 +1464,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1500,7 +1500,7 @@ mod test { datastore, resolver, &blueprint2, - "test-suite", + Uuid::new_v4(), &overrides, ) .await @@ -1594,7 +1594,7 @@ mod test { datastore, resolver, &blueprint, - "test-suite", + Uuid::new_v4(), &overrides, ) .await diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index bb525b1b8b..8606187762 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -24,6 +24,7 @@ use slog::info; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::net::SocketAddrV6; +use uuid::Uuid; mod cockroachdb; mod datasets; @@ -31,6 +32,7 @@ mod dns; mod omicron_physical_disks; mod omicron_zones; mod overridables; +mod sagas; mod sled_state; pub use dns::blueprint_external_dns_config; @@ -73,38 +75,32 @@ impl From for Sled { /// /// The assumption is that callers are running this periodically or in a loop to /// deal with transient errors or changes in the underlying system state. -pub async fn realize_blueprint( +pub async fn realize_blueprint( opctx: &OpContext, datastore: &DataStore, resolver: &Resolver, blueprint: &Blueprint, - nexus_label: S, -) -> Result<(), Vec> -where - String: From, -{ + nexus_id: Uuid, +) -> Result> { realize_blueprint_with_overrides( opctx, datastore, resolver, blueprint, - nexus_label, + nexus_id, &Default::default(), ) .await } -pub async fn realize_blueprint_with_overrides( +pub async fn realize_blueprint_with_overrides( opctx: &OpContext, datastore: &DataStore, resolver: &Resolver, blueprint: &Blueprint, - nexus_label: S, + nexus_id: Uuid, overrides: &Overridables, -) -> Result<(), Vec> -where - String: From, -{ +) -> Result> { let opctx = opctx.child(BTreeMap::from([( "comment".to_string(), blueprint.comment.clone(), @@ -182,7 +178,7 @@ where dns::deploy_dns( &opctx, datastore, - String::from(nexus_label), + nexus_id.to_string(), blueprint, &sleds_by_id, overrides, @@ -215,14 +211,43 @@ where omicron_physical_disks::decommission_expunged_disks(&opctx, datastore) .await?; + // From this point on, we'll assume that any errors that we encounter do + // *not* require stopping execution. We'll just accumulate them and return + // them all at the end. + // + // TODO We should probably do this with more of the errors above, too. + let mut errors = Vec::new(); + + // For any expunged Nexus zones, re-assign in-progress sagas to some other + // Nexus. If this fails for some reason, it doesn't affect anything else. + let sec_id = nexus_db_model::SecId(nexus_id); + let reassigned = sagas::reassign_sagas_from_expunged( + &opctx, datastore, blueprint, sec_id, + ) + .await + .context("failed to re-assign sagas"); + let needs_saga_recovery = match reassigned { + Ok(needs_recovery) => needs_recovery, + Err(error) => { + errors.push(error); + false + } + }; + // This is likely to error if any cluster upgrades are in progress (which // can take some time), so it should remain at the end so that other parts // of the blueprint can progress normally. - cockroachdb::ensure_settings(&opctx, datastore, blueprint) - .await - .map_err(|err| vec![err])?; + if let Err(error) = + cockroachdb::ensure_settings(&opctx, datastore, blueprint).await + { + errors.push(error); + } - Ok(()) + if errors.is_empty() { + Ok(needs_saga_recovery) + } else { + Err(errors) + } } #[cfg(test)] diff --git a/nexus/reconfigurator/execution/src/sagas.rs b/nexus/reconfigurator/execution/src/sagas.rs new file mode 100644 index 0000000000..458328ef00 --- /dev/null +++ b/nexus/reconfigurator/execution/src/sagas.rs @@ -0,0 +1,71 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Re-assign sagas from expunged Nexus zones + +use nexus_db_model::SecId; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::Blueprint; +use nexus_types::deployment::BlueprintZoneFilter; +use omicron_common::api::external::Error; +use omicron_uuid_kinds::GenericUuid; +use slog::{debug, info, warn}; + +/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the +/// specified nexus (`nexus_id`). +pub(crate) async fn reassign_sagas_from_expunged( + opctx: &OpContext, + datastore: &DataStore, + blueprint: &Blueprint, + nexus_id: SecId, +) -> Result { + let log = &opctx.log; + + // Identify any Nexus zones that have been expunged and need to have sagas + // re-assigned. + // + // TODO: Currently, we take any expunged Nexus instances and attempt to + // assign all their sagas to ourselves. Per RFD 289, we can only re-assign + // sagas between two instances of Nexus that are at the same version. Right + // now this can't happen so there's nothing to do here to ensure that + // constraint. However, once we support allowing the control plane to be + // online _during_ an upgrade, there may be multiple different Nexus + // instances running at the same time. At that point, we will need to make + // sure that we only ever try to assign ourselves sagas from other Nexus + // instances that we know are running the same version as ourselves. + let nexus_zone_ids: Vec<_> = blueprint + .all_omicron_zones(BlueprintZoneFilter::Expunged) + .filter_map(|(_, z)| { + z.zone_type + .is_nexus() + .then(|| nexus_db_model::SecId(z.id.into_untyped_uuid())) + }) + .collect(); + + debug!(log, "re-assign sagas: found Nexus instances"; + "nexus_zone_ids" => ?nexus_zone_ids); + + let result = + datastore.sagas_reassign_sec(opctx, &nexus_zone_ids, nexus_id).await; + + match result { + Ok(count) => { + info!(log, "re-assigned sagas"; + "nexus_zone_ids" => ?nexus_zone_ids, + "count" => count, + ); + + Ok(count != 0) + } + Err(error) => { + warn!(log, "failed to re-assign sagas"; + "nexus_zone_ids" => ?nexus_zone_ids, + &error, + ); + + Err(error) + } + } +} diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 6bd805a491..37c276fa07 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -448,7 +448,8 @@ impl BackgroundTasksInitializer { datastore.clone(), resolver.clone(), rx_blueprint.clone(), - nexus_id.to_string(), + nexus_id, + task_saga_recovery.clone(), ); let rx_blueprint_exec = blueprint_executor.watcher(); driver.register(TaskDefinition { diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index ee780812ae..b430270ec9 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -4,7 +4,7 @@ //! Background task for realizing a plan blueprint -use crate::app::background::BackgroundTask; +use crate::app::background::{Activator, BackgroundTask}; use futures::future::BoxFuture; use futures::FutureExt; use internal_dns::resolver::Resolver; @@ -14,6 +14,7 @@ use nexus_types::deployment::{Blueprint, BlueprintTarget}; use serde_json::json; use std::sync::Arc; use tokio::sync::watch; +use uuid::Uuid; /// Background task that takes a [`Blueprint`] and realizes the change to /// the state of the system based on the `Blueprint`. @@ -21,8 +22,9 @@ pub struct BlueprintExecutor { datastore: Arc, resolver: Resolver, rx_blueprint: watch::Receiver>>, - nexus_label: String, + nexus_id: Uuid, tx: watch::Sender, + saga_recovery: Activator, } impl BlueprintExecutor { @@ -32,10 +34,18 @@ impl BlueprintExecutor { rx_blueprint: watch::Receiver< Option>, >, - nexus_label: String, + nexus_id: Uuid, + saga_recovery: Activator, ) -> BlueprintExecutor { let (tx, _) = watch::channel(0); - BlueprintExecutor { datastore, resolver, rx_blueprint, nexus_label, tx } + BlueprintExecutor { + datastore, + resolver, + rx_blueprint, + nexus_id, + tx, + saga_recovery, + } } pub fn watcher(&self) -> watch::Receiver { @@ -81,16 +91,23 @@ impl BlueprintExecutor { &self.datastore, &self.resolver, blueprint, - &self.nexus_label, + self.nexus_id, ) .await; // Trigger anybody waiting for this to finish. self.tx.send_modify(|count| *count = *count + 1); + // If executing the blueprint requires activating the saga recovery + // background task, do that now. + info!(&opctx.log, "activating saga recovery task"); + if let Ok(true) = result { + self.saga_recovery.activate(); + } + // Return the result as a `serde_json::Value` match result { - Ok(()) => json!({}), + Ok(_) => json!({}), Err(errors) => { let errors: Vec<_> = errors.into_iter().map(|e| format!("{:#}", e)).collect(); @@ -115,7 +132,7 @@ impl BackgroundTask for BlueprintExecutor { #[cfg(test)] mod test { use super::BlueprintExecutor; - use crate::app::background::BackgroundTask; + use crate::app::background::{Activator, BackgroundTask}; use httptest::matchers::{all_of, request}; use httptest::responders::status_code; use httptest::Expectation; @@ -261,7 +278,8 @@ mod test { datastore.clone(), resolver.clone(), blueprint_rx, - String::from("test-suite"), + Uuid::new_v4(), + Activator::new(), ); // Now we're ready.