diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index 617413f172..5a020b6feb 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -14,6 +14,7 @@ use crate::db::pagination::paginated; use crate::db::pagination::Paginator; use crate::db::DbConnection; use crate::db::TransactionError; +use crate::transaction_retry::OptionalError; use anyhow::Context; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; @@ -664,7 +665,7 @@ impl DataStore { // Ensure that blueprint we're about to delete is not the // current target. let current_target = - self.blueprint_current_target_only(&conn).await?; + self.blueprint_current_target_only(&conn, false).await?; if current_target.target_id == blueprint_id { return Err(TransactionError::CustomError( Error::conflict(format!( @@ -787,6 +788,117 @@ impl DataStore { Ok(()) } + /// Ensure all external networking IPs and service vNICs described by + /// `blueprint` are allocated (for in-service zones) or deallocated + /// (otherwise), conditional on `blueprint` being the current target + /// blueprint. + /// + /// This method may be safely executed from the blueprint executor RPW; the + /// condition on the current target blueprint ensures a Nexus attempting to + /// realize an out of date blueprint can't overwrite changes made by a Nexus + /// that realized the current target. + pub async fn blueprint_ensure_external_networking_resources( + &self, + opctx: &OpContext, + blueprint: &Blueprint, + ) -> Result<(), Error> { + self.blueprint_ensure_external_networking_resources_impl( + opctx, + blueprint, + #[cfg(test)] + None, + #[cfg(test)] + None, + ) + .await + } + + // The third and fourth arguments to this test only exist when run under + // test, and allows the calling test to control the general timing of the + // transaction executed by this method: + // + // 1. Check that `blueprint` is the current target blueprint + // 2. Set `target_check_done` is set to true (the test can wait on this) + // 3. Run remainder of transaction to allocate/deallocate resources + // 4. Wait until `return_on_completion` is set to true + // 5. Return + // + // If either of these arguments are `None`, steps 2 or 4 will be skipped. + async fn blueprint_ensure_external_networking_resources_impl( + &self, + opctx: &OpContext, + blueprint: &Blueprint, + #[cfg(test)] target_check_done: Option< + std::sync::Arc, + >, + #[cfg(test)] return_on_completion: Option< + std::sync::Arc, + >, + ) -> Result<(), Error> { + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper( + "blueprint_ensure_external_networking_resources", + ) + .transaction(&conn, |conn| { + let err = err.clone(); + #[cfg(test)] + let target_check_done = target_check_done.clone(); + #[cfg(test)] + let return_on_completion = return_on_completion.clone(); + + async move { + // Bail out if `blueprint` isn't the current target. + let current_target = self + .blueprint_current_target_only(&conn, true) + .await + .map_err(|e| err.bail(e))?; + if current_target.target_id != blueprint.id { + return Err(err.bail(Error::invalid_request(format!( + "blueprint {} is not the current target blueprint ({})", + blueprint.id, current_target.target_id + )))); + } + + // See the comment on this method; this lets us notify our test + // caller that we've performed our target blueprint check. + #[cfg(test)] + { + use std::sync::atomic::Ordering; + if let Some(gate) = target_check_done { + gate.store(true, Ordering::SeqCst); + } + } + + // TODO actual work + + // See the comment on this method; this lets us wait until our + // test caller is ready for us to return. + #[cfg(test)] + { + use std::sync::atomic::Ordering; + use std::time::Duration; + if let Some(gate) = return_on_completion { + while !gate.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + } + + Ok(()) + } + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + err + } else { + public_error_from_diesel(e, ErrorHandler::Server) + } + }) + } + /// Set the current target blueprint /// /// In order to become the target blueprint, `target`'s parent blueprint @@ -930,7 +1042,7 @@ impl DataStore { opctx.authorize(authz::Action::Read, &authz::BLUEPRINT_CONFIG).await?; let conn = self.pool_connection_authorized(opctx).await?; - let target = self.blueprint_current_target_only(&conn).await?; + let target = self.blueprint_current_target_only(&conn, false).await?; // The blueprint for the current target cannot be deleted while it is // the current target, but it's possible someone else (a) made a new @@ -951,7 +1063,7 @@ impl DataStore { ) -> Result { opctx.authorize(authz::Action::Read, &authz::BLUEPRINT_CONFIG).await?; let conn = self.pool_connection_authorized(opctx).await?; - self.blueprint_current_target_only(&conn).await + self.blueprint_current_target_only(&conn, false).await } // Helper to fetch the current blueprint target (without fetching the entire @@ -961,13 +1073,23 @@ impl DataStore { async fn blueprint_current_target_only( &self, conn: &async_bb8_diesel::Connection, + select_for_update: bool, ) -> Result { use db::schema::bp_target::dsl; - let current_target = dsl::bp_target - .order_by(dsl::version.desc()) - .first_async::(conn) - .await + let query_result = if select_for_update { + dsl::bp_target + .order_by(dsl::version.desc()) + .for_update() + .first_async::(conn) + .await + } else { + dsl::bp_target + .order_by(dsl::version.desc()) + .first_async::(conn) + .await + }; + let current_target = query_result .optional() .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; @@ -1361,6 +1483,8 @@ mod tests { use omicron_common::address::Ipv6Subnet; use omicron_common::disk::DiskIdentity; use omicron_test_utils::dev; + use omicron_test_utils::dev::poll::wait_for_condition; + use omicron_test_utils::dev::poll::CondCheckError; use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::SledUuid; use omicron_uuid_kinds::ZpoolUuid; @@ -1371,6 +1495,10 @@ mod tests { use slog::Logger; use std::mem; use std::net::Ipv6Addr; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use std::time::Duration; static EMPTY_PLANNING_INPUT: Lazy = Lazy::new(|| PlanningInputBuilder::empty_input()); @@ -2061,6 +2189,174 @@ mod tests { logctx.cleanup_successful(); } + #[tokio::test] + async fn test_ensure_external_networking_bails_on_bad_target() { + // Setup + let logctx = dev::test_setup_log( + "test_ensure_external_networking_bails_on_bad_target", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create an initial blueprint and a child. + let blueprint1 = BlueprintBuilder::build_empty_with_sleds( + std::iter::empty(), + "test1", + ); + let blueprint2 = BlueprintBuilder::new_based_on( + &logctx.log, + &blueprint1, + &EMPTY_PLANNING_INPUT, + "test2", + ) + .expect("failed to create builder") + .build(); + + // Insert both into the blueprint table. + datastore.blueprint_insert(&opctx, &blueprint1).await.unwrap(); + datastore.blueprint_insert(&opctx, &blueprint2).await.unwrap(); + + let bp1_target = BlueprintTarget { + target_id: blueprint1.id, + enabled: true, + time_made_target: now_db_precision(), + }; + let bp2_target = BlueprintTarget { + target_id: blueprint2.id, + enabled: true, + time_made_target: now_db_precision(), + }; + + // Set bp1_target as the current target. + datastore + .blueprint_target_set_current(&opctx, bp1_target) + .await + .unwrap(); + + // Attempting to ensure the (empty) resources for bp1 should succeed. + datastore + .blueprint_ensure_external_networking_resources(&opctx, &blueprint1) + .await + .expect("ensured networking resources for empty blueprint 1"); + + // Attempting to ensure the (empty) resources for bp2 should fail, + // because it isn't the target blueprint. + let err = datastore + .blueprint_ensure_external_networking_resources(&opctx, &blueprint2) + .await + .expect_err("failed because blueprint 2 isn't the target"); + assert!( + err.to_string().contains("is not the current target blueprint"), + "unexpected error: {err}" + ); + + // Create flags to control method execution. + let target_check_done = Arc::new(AtomicBool::new(false)); + let return_on_completion = Arc::new(AtomicBool::new(false)); + + // Spawn a task to execute our method. + let mut ensure_resources_task = tokio::spawn({ + let datastore = datastore.clone(); + let opctx = + OpContext::for_tests(logctx.log.clone(), datastore.clone()); + let target_check_done = target_check_done.clone(); + let return_on_completion = return_on_completion.clone(); + async move { + datastore + .blueprint_ensure_external_networking_resources_impl( + &opctx, + &blueprint1, + Some(target_check_done), + Some(return_on_completion), + ) + .await + } + }); + + // Wait until `task` has proceeded past the point at which it's checked + // the target blueprint. + wait_for_condition( + || async { + if target_check_done.load(Ordering::SeqCst) { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(50), + &Duration::from_secs(10), + ) + .await + .expect("`target_check_done` not set to true"); + + // Now spawn another task that attempts to update the current target to + // blueprint2. + let mut update_target_task = tokio::spawn({ + let datastore = datastore.clone(); + let opctx = + OpContext::for_tests(logctx.log.clone(), datastore.clone()); + async move { + datastore.blueprint_target_set_current(&opctx, bp2_target).await + } + }); + + // Neither of our spawned tasks should be able to make progress: + // `ensure_resources_task` is waiting for us to set + // `return_on_completion` to true, and `update_target_task` should be + // queued by Cockroach, because + // `blueprint_ensure_external_networking_resources` should have + // performed a `SELECT ... FOR UPDATE` on the current target, forcing + // the query that wants to change it to wait until the transaction + // completes. + // + // We'll somewhat haphazardly test this by trying to wait for either + // task to finish, and succeeding on a timeout of a few seconds. This + // will spuriously succeed if both are executing on a very overloaded + // system, but hopefully will fail often enough if we've gotten this + // wrong. + tokio::select! { + result = &mut ensure_resources_task => { + panic!( + "unexpected completion of \ + `blueprint_ensure_external_networking_resources`: \ + {result:?}", + ); + } + result = &mut update_target_task => { + panic!( + "unexpected completion of \ + `blueprint_target_set_current`: {result:?}", + ); + } + _ = tokio::time::sleep(Duration::from_secs(5)) => (), + } + + // Release `ensure_resources_task` to finish. + return_on_completion.store(true, Ordering::SeqCst); + + tokio::time::timeout(Duration::from_secs(10), ensure_resources_task) + .await + .expect( + "time out waiting for \ + `blueprint_ensure_external_networking_resources`", + ) + .expect("panic in `blueprint_ensure_external_networking_resources") + .expect("ensured networking resources for empty blueprint 2"); + + // Our task to set bp2 as the target should now also complete. + tokio::time::timeout(Duration::from_secs(10), update_target_task) + .await + .expect( + "time out waiting for `blueprint_target_set_current`", + ) + .expect("panic in `blueprint_target_set_current") + .expect("updated target to blueprint 2"); + + // Clean up. + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + fn assert_all_zones_in_service(blueprint: &Blueprint) { let not_in_service = blueprint .all_omicron_zones(BlueprintZoneFilter::All)