diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index 39bde98ea8..5579425a63 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -9,7 +9,7 @@ use nexus_types::internal_api; use uuid::Uuid; /// A record representing a registered `oximeter` collector. -#[derive(Queryable, Insertable, Debug, Clone, Copy)] +#[derive(Queryable, Insertable, Debug, Clone, Copy, PartialEq, Eq)] #[diesel(table_name = oximeter)] pub struct OximeterInfo { /// The ID for this oximeter instance. @@ -18,6 +18,17 @@ pub struct OximeterInfo { pub time_created: DateTime, /// When this resource was last modified. pub time_modified: DateTime, + /// When this resource was expunged. + // + // We typically refer to _zones_ as expunged; this isn't quite the same + // thing since this is the record of a running Oximeter instance. Some time + // after an Oximeter zone has been expunged (usually not very long!), the + // blueprint_executor RPW will mark the Oximeter instance that was running + // in that zone as expunged, setting this field to a non-None value, which + // will cause it to no longer be chosen as a potential collector for + // producers (and will result in any producers it had been assigned being + // reassigned to some other collector). + pub time_expunged: Option>, /// The address on which this `oximeter` instance listens for requests. pub ip: ipnetwork::IpNetwork, /// The port on which this `oximeter` instance listens for requests. @@ -31,6 +42,7 @@ impl OximeterInfo { id: info.collector_id, time_created: now, time_modified: now, + time_expunged: None, ip: info.address.ip().into(), port: info.address.port().into(), } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 071c7e9229..8f137a7bbf 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -794,6 +794,7 @@ table! { id -> Uuid, time_created -> Timestamptz, time_modified -> Timestamptz, + time_expunged -> Nullable, ip -> Inet, port -> Int4, } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 77d419efbf..bb5fc294d4 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(97, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(98, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(98, "oximeter-add-time-expunged"), KnownVersion::new(97, "lookup-region-snapshot-by-region-id"), KnownVersion::new(96, "inv-dataset"), KnownVersion::new(95, "turn-boot-on-fault-into-auto-restart"), diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 5b1163dc8b..6eec9500dc 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -115,6 +115,7 @@ pub use dns::DnsVersionUpdateBuilder; pub use instance::{InstanceAndActiveVmm, InstanceGestalt}; pub use inventory::DataStoreInventoryTest; use nexus_db_model::AllSchemaVersions; +pub use oximeter::CollectorReassignment; pub use rack::RackInit; pub use rack::SledUnderlayAllocationResult; pub use region::RegionAllocationFor; diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 1aa3435cb6..0c4b5077f2 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -15,18 +15,35 @@ use crate::db::model::OximeterInfo; use crate::db::model::ProducerEndpoint; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; +use crate::db::queries; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; use chrono::Utc; use diesel::prelude::*; +use diesel::result::DatabaseErrorKind; +use diesel::result::Error as DieselError; +use diesel::sql_types; +use nexus_db_model::ProducerKindEnum; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; use uuid::Uuid; +/// Type returned when reassigning producers from an Oximeter collector. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CollectorReassignment { + /// Success: `n` producers were reassigned to other collector(s). + Complete(usize), + /// Reassignment could not complete because there are no other collectors + /// available. + NoCollectorsAvailable, +} + impl DataStore { /// Lookup an oximeter instance by its ID. + /// + /// Fails if the instance has been expunged. pub async fn oximeter_lookup( &self, opctx: &OpContext, @@ -34,6 +51,7 @@ impl DataStore { ) -> Result { use db::schema::oximeter::dsl; dsl::oximeter + .filter(dsl::time_expunged.is_null()) .find(*id) .first_async(&*self.pool_connection_authorized(opctx).await?) .await @@ -48,10 +66,19 @@ impl DataStore { ) -> Result<(), Error> { use db::schema::oximeter::dsl; - // If we get a conflict on the Oximeter ID, this means that collector instance was - // previously registered, and it's re-registering due to something like a service restart. - // In this case, we update the time modified and the service address, rather than - // propagating a constraint violation to the caller. + // If we get a conflict on the Oximeter ID, this means that collector + // instance was previously registered, and it's re-registering due to + // something like a service restart. In this case, we update the time + // modified and the service address, rather than propagating a + // constraint violation to the caller. + // + // TODO-completeness - We should return an error if `info.id()` maps to + // an existing row that has been expunged. We don't expect that to + // happen in practice (it would mean an expunged Oximeter zone has come + // back to life and reregistered itself). If it does happen, as written + // we'll update time_modified/ip/port but leave time_expunged set to + // whatever it was (which will leave the Oximeter in the "expunged" + // state). diesel::insert_into(dsl::oximeter) .values(*info) .on_conflict(dsl::id) @@ -75,7 +102,33 @@ impl DataStore { Ok(()) } + /// Mark an Oximeter instance as expunged + /// + /// This method is idempotent and has no effect if called with the ID for an + /// already-expunged Oximeter. + pub async fn oximeter_expunge( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result<(), Error> { + use db::schema::oximeter::dsl; + + let now = Utc::now(); + + diesel::update(dsl::oximeter) + .filter(dsl::time_expunged.is_null()) + .filter(dsl::id.eq(id)) + .set(dsl::time_expunged.eq(now)) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(()) + } + /// List the oximeter collector instances + /// + /// Omits expunged instances. pub async fn oximeter_list( &self, opctx: &OpContext, @@ -83,6 +136,7 @@ impl DataStore { ) -> ListResultVec { use db::schema::oximeter::dsl; paginated(dsl::oximeter, dsl::id, page_params) + .filter(dsl::time_expunged.is_null()) .load_async::( &*self.pool_connection_authorized(opctx).await?, ) @@ -90,17 +144,77 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// Reassign all metric producers currently assigned to Oximeter `id` + /// + /// The new Oximeter instance for each producer will be randomly selected + /// from all available Oximeters. On success, returns the number of metric + /// producers reassigned. Fails if there are no available Oximeter instances + /// (e.g., all Oximeter instances have been expunged). + pub async fn oximeter_reassign_all_producers( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result { + match queries::oximeter::reassign_producers_query(id) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + { + Ok(n) => Ok(CollectorReassignment::Complete(n)), + Err(DieselError::DatabaseError( + DatabaseErrorKind::NotNullViolation, + _, + )) => Ok(CollectorReassignment::NoCollectorsAvailable), + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + /// Create a record for a new producer endpoint pub async fn producer_endpoint_create( &self, opctx: &OpContext, producer: &ProducerEndpoint, ) -> Result<(), Error> { + // Our caller has already chosen an Oximeter instance for this producer, + // but we don't want to allow it to use a nonexistent or expunged + // Oximeter. This query turns into a `SELECT all_the_fields_of_producer + // WHERE producer.oximeter_id is legal` in a diesel-compatible way. I'm + // not aware of a helper method to generate "all the fields of + // `producer`", so instead we have a big tuple of its fields that must + // stay in sync with the `table!` definition and field ordering for the + // `metric_producer` table. The compiler will catch any mistakes + // _except_ incorrect orderings where the types still line up (e.g., + // swapping two Uuid columns), which is not ideal but is hopefully good + // enough. + let producer_subquery = { + use db::schema::oximeter::dsl; + + dsl::oximeter + .select(( + producer.id().into_sql::(), + producer + .time_created() + .into_sql::(), + producer + .time_modified() + .into_sql::(), + producer.kind.into_sql::(), + producer.ip.into_sql::(), + producer.port.into_sql::(), + producer.interval.into_sql::(), + producer.oximeter_id.into_sql::(), + )) + .filter( + dsl::id + .eq(producer.oximeter_id) + .and(dsl::time_expunged.is_null()), + ) + }; + use db::schema::metric_producer::dsl; // TODO: see https://github.com/oxidecomputer/omicron/issues/323 - diesel::insert_into(dsl::metric_producer) - .values(producer.clone()) + let n = diesel::insert_into(dsl::metric_producer) + .values(producer_subquery) .on_conflict(dsl::id) .do_update() .set(( @@ -121,7 +235,22 @@ impl DataStore { ), ) })?; - Ok(()) + + // We expect `n` to basically always be 1 (1 row was inserted or + // updated). It can be 0 if `producer.oximeter_id` doesn't exist or has + // been expunged. It can never be 2 or greater because + // `producer_subquery` filters on finding an exact row for its Oximeter + // instance's ID. + match n { + 0 => Err(Error::not_found_by_id( + ResourceType::Oximeter, + &producer.oximeter_id, + )), + 1 => Ok(()), + _ => Err(Error::internal_error(&format!( + "multiple rows inserted ({n}) in `producer_endpoint_create`" + ))), + } } /// Delete a record for a producer endpoint, by its ID. @@ -226,6 +355,7 @@ mod tests { use db::datastore::pub_test_utils::datastore_test; use nexus_test_utils::db::test_setup_database; use nexus_types::internal_api::params; + use omicron_common::api::external::LookupType; use omicron_common::api::internal::nexus; use omicron_test_utils::dev; use std::time::Duration; @@ -272,6 +402,406 @@ mod tests { expired_batched } + #[tokio::test] + async fn test_oximeter_expunge() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_expunge"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let mut collector_ids = + (0..4).map(|_| Uuid::new_v4()).collect::>(); + + // Sort the IDs for easier comparisons later. + collector_ids.sort(); + + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Ensure all our collectors exist and aren't expunged. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len()); + for (info, &expected_id) in all_collectors.iter().zip(&collector_ids) { + assert_eq!(info.id, expected_id); + assert!(info.time_expunged.is_none()); + } + + // Delete the first two of them. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + datastore + .oximeter_expunge(&opctx, collector_ids[1]) + .await + .expect("expunged collector"); + + // Ensure those two were expunged. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len() - 2); + for (info, &expected_id) in + all_collectors.iter().zip(&collector_ids[2..]) + { + assert_eq!(info.id, expected_id); + assert!(info.time_expunged.is_none()); + } + + // Deletion is idempotent. To test, we'll read the expunged rows + // directly, expunge them again, and confirm the row contents haven't + // changed. + let find_oximeter_ignoring_expunged = |id| { + let datastore = &datastore; + let opctx = &opctx; + async move { + let conn = datastore + .pool_connection_authorized(opctx) + .await + .expect("acquired connection"); + use db::schema::oximeter::dsl; + let info: OximeterInfo = dsl::oximeter + .find(id) + .first_async(&*conn) + .await + .expect("found Oximeter by ID"); + info + } + }; + let expunged0a = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1a = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert!(expunged0a.time_expunged.is_some()); + assert!(expunged1a.time_expunged.is_some()); + + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + datastore + .oximeter_expunge(&opctx, collector_ids[1]) + .await + .expect("expunged collector"); + + let expunged0b = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1b = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert_eq!(expunged0a, expunged0b); + assert_eq!(expunged1a, expunged1b); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_producer_endpoint_create_rejects_expunged_oximeters() { + // Setup + let logctx = dev::test_setup_log( + "test_producer_endpoint_create_rejects_expunged_oximeters", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // We can insert metric producers for each collector. + for &collector_id in &collector_ids { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Delete the first collector. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + + // Attempting to insert a producer assigned to the first collector + // should fail, now that it's expunged. + let err = { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_ids[0], + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect_err("producer creation fails") + }; + assert_eq!( + err, + Error::ObjectNotFound { + type_name: ResourceType::Oximeter, + lookup_type: LookupType::ById(collector_ids[0]) + } + ); + + // We can still insert metric producers for the other collectors... + for &collector_id in &collector_ids[1..] { + let mut producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + + // ... and we can update them. + producer.port = 100.into(); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassigns_randomly() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_reassigns_randomly"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 250 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..250 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete one collector. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged Oximeter"); + + // Reassign producers that belonged to that collector. + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_ids[0]) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(250)); + + // Check the distribution of producers for each of the remaining + // collectors. We don't know the exact count, so we'll check that: + // + // * Each of the three remaining collectors gained at least one (the + // probability that any of the three collectors gained zero is low + // enough that most calculators give up and call it 0) + // * All 1000 producers are assigned to one of the three collectors + // + // to guard against "the reassignment query gave all 250 to exactly one + // of the remaining collectors", which is an easy failure mode for this + // kind of SQL query, where the query engine only evaluates the + // randomness once instead of once for each producer. + let mut producer_counts = [0; 4]; + for i in 0..4 { + producer_counts[i] = datastore + .producers_list_by_oximeter_id( + &opctx, + collector_ids[i], + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + } + assert_eq!(producer_counts[0], 0); // all reassigned + assert!(producer_counts[1] > 250); // gained at least one + assert!(producer_counts[2] > 250); // gained at least one + assert!(producer_counts[3] > 250); // gained at least one + assert_eq!(producer_counts[1..].iter().sum::(), 1000); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassign_fails_if_no_collectors() { + // Setup + let logctx = dev::test_setup_log( + "test_oximeter_reassign_fails_if_no_collectors", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 10 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..10 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete all four collectors. + for &collector_id in &collector_ids { + datastore + .oximeter_expunge(&opctx, collector_id) + .await + .expect("expunged Oximeter"); + } + + // Try to reassign producers that belonged to each collector; this + // should fail, as all collectors have been expunged. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!( + num_reassigned, + CollectorReassignment::NoCollectorsAvailable + ); + } + + // Now insert a new collector. + let new_collector_id = Uuid::new_v4(); + datastore + .oximeter_create( + &opctx, + &OximeterInfo::new(¶ms::OximeterInfo { + collector_id: new_collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }), + ) + .await + .expect("inserted collector"); + + // Reassigning the original four collectors should now all succeed. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(10)); + } + + // All 40 producers should be assigned to our new collector. + let nproducers = datastore + .producers_list_by_oximeter_id( + &opctx, + new_collector_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + assert_eq!(nproducers, 40); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_producers_list_expired() { // Setup diff --git a/nexus/db-queries/src/db/queries/mod.rs b/nexus/db-queries/src/db/queries/mod.rs index f88b8fab6d..02800e3a3c 100644 --- a/nexus/db-queries/src/db/queries/mod.rs +++ b/nexus/db-queries/src/db/queries/mod.rs @@ -11,6 +11,7 @@ pub mod ip_pool; #[macro_use] mod next_item; pub mod network_interface; +pub mod oximeter; pub mod region_allocation; pub mod virtual_provisioning_collection_update; pub mod volume; diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs new file mode 100644 index 0000000000..40f7a2b493 --- /dev/null +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -0,0 +1,112 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Implementation of queries for Oximeter collectors and producers. + +use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use diesel::sql_types; +use uuid::Uuid; + +/// For a given Oximeter instance (which is presumably no longer running), +/// reassign any producers assigned to it to a different Oximeter. Each +/// assignment is randomly chosen from among the non-expunged Oximeter instances +/// recorded in the `oximeter` table. +pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { + let builder = QueryBuilder::new(); + + // Find all non-expunged Oximeter instances. + let builder = builder.sql( + "\ + WITH available_oximeters AS ( \ + SELECT ARRAY( \ + SELECT id FROM oximeter WHERE time_expunged IS NULL + ) AS ids \ + ), ", + ); + + // Create a mapping of producer ID <-> new, random, non-expunged Oximeter ID + // for every producer assigned to `oximeter_id`. If the `ids` array from the + // previous expression is empty, every `new_id` column in this expression + // will be NULL. We'll catch that in the update below. + let builder = builder + .sql( + "\ + new_assignments AS ( \ + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::float)::int] + AS new_id + FROM metric_producer + LEFT JOIN available_oximeters ON true + WHERE oximeter_id = ", + ) + .param() + .sql(")") + .bind::(oximeter_id); + + // Actually perform the update. If the `new_id` column from the previous + // step is `NULL` (because there aren't any non-expunged Oximeter + // instances), this will fail the `NOT NULL` constraint on the oximeter_id + // column. + let builder = builder + .sql( + "\ + UPDATE metric_producer SET oximeter_id = ( \ + SELECT new_id FROM new_assignments \ + WHERE new_assignments.producer_id = metric_producer.id \ + ) WHERE oximeter_id = ", + ) + .param() + .bind::(oximeter_id); + + builder.query() +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db::explain::ExplainableAsync; + use crate::db::raw_query_builder::expectorate_query_contents; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + use uuid::Uuid; + + // This test is a bit of a "change detector", but it's here to help with + // debugging too. If you change this query, it can be useful to see exactly + // how the output SQL has been altered. + #[tokio::test] + async fn expectorate_query() { + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + + expectorate_query_contents( + &query, + "tests/output/oximeter_reassign_producers.sql", + ) + .await; + } + + // Explain the SQL query to ensure that it creates a valid SQL string. + #[tokio::test] + async fn explainable() { + let logctx = dev::test_setup_log("explainable"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); + + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + let _ = query + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/tests/output/oximeter_reassign_producers.sql b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql new file mode 100644 index 0000000000..4ef88a800a --- /dev/null +++ b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql @@ -0,0 +1,20 @@ +WITH + available_oximeters + AS (SELECT ARRAY (SELECT id FROM oximeter WHERE time_expunged IS NULL) AS ids), + new_assignments + AS ( + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::FLOAT8)::INT8] AS new_id + FROM + metric_producer LEFT JOIN available_oximeters ON true + WHERE + oximeter_id = $1 + ) +UPDATE + metric_producer +SET + oximeter_id + = (SELECT new_id FROM new_assignments WHERE new_assignments.producer_id = metric_producer.id) +WHERE + oximeter_id = $2 diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 9089c0032a..e26626818d 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -1369,9 +1369,20 @@ CREATE TABLE IF NOT EXISTS omicron.public.oximeter ( time_created TIMESTAMPTZ NOT NULL, time_modified TIMESTAMPTZ NOT NULL, ip INET NOT NULL, - port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL + port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL, + time_expunged TIMESTAMPTZ ); +/* + * The query Nexus runs to choose an Oximeter instance for new metric producers + * involves listing the non-expunged instances sorted by ID, which would require + * a full table scan without this index. + */ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_expunged IS NULL; + /* * The kind of metric producer each record corresponds to. */ @@ -4288,7 +4299,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '97.0.0', NULL) + (TRUE, NOW(), NOW(), '98.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/oximeter-add-time-expunged/up1.sql b/schema/crdb/oximeter-add-time-expunged/up1.sql new file mode 100644 index 0000000000..f6915d5765 --- /dev/null +++ b/schema/crdb/oximeter-add-time-expunged/up1.sql @@ -0,0 +1 @@ +ALTER TABLE omicron.public.oximeter ADD COLUMN IF NOT EXISTS time_expunged TIMESTAMPTZ; diff --git a/schema/crdb/oximeter-add-time-expunged/up2.sql b/schema/crdb/oximeter-add-time-expunged/up2.sql new file mode 100644 index 0000000000..710fe99ffe --- /dev/null +++ b/schema/crdb/oximeter-add-time-expunged/up2.sql @@ -0,0 +1,4 @@ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_expunged IS NULL;