From 5678b3a5ee660348b6937fd204f0e4b2f93b1e05 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 28 Aug 2024 11:47:28 -0400 Subject: [PATCH 1/6] [nexus] Add DataStore methods to delete and reassign Oximeter collectors --- nexus/db-model/src/oximeter_info.rs | 5 +- nexus/db-model/src/schema.rs | 1 + nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-queries/src/db/datastore/mod.rs | 1 + nexus/db-queries/src/db/datastore/oximeter.rs | 360 ++++++++++++++++++ nexus/db-queries/src/db/queries/mod.rs | 1 + nexus/db-queries/src/db/queries/oximeter.rs | 111 ++++++ .../output/oximeter_reassign_producers.sql | 19 + schema/crdb/dbinit.sql | 10 +- schema/crdb/oximeter-add-time-deleted/up1.sql | 1 + schema/crdb/oximeter-add-time-deleted/up2.sql | 4 + 11 files changed, 512 insertions(+), 4 deletions(-) create mode 100644 nexus/db-queries/src/db/queries/oximeter.rs create mode 100644 nexus/db-queries/tests/output/oximeter_reassign_producers.sql create mode 100644 schema/crdb/oximeter-add-time-deleted/up1.sql create mode 100644 schema/crdb/oximeter-add-time-deleted/up2.sql diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index 39bde98ea8..01570d64c9 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -9,7 +9,7 @@ use nexus_types::internal_api; use uuid::Uuid; /// A record representing a registered `oximeter` collector. -#[derive(Queryable, Insertable, Debug, Clone, Copy)] +#[derive(Queryable, Insertable, Debug, Clone, Copy, PartialEq, Eq)] #[diesel(table_name = oximeter)] pub struct OximeterInfo { /// The ID for this oximeter instance. @@ -18,6 +18,8 @@ pub struct OximeterInfo { pub time_created: DateTime, /// When this resource was last modified. pub time_modified: DateTime, + /// When this resource was deleted. + pub time_deleted: Option>, /// The address on which this `oximeter` instance listens for requests. pub ip: ipnetwork::IpNetwork, /// The port on which this `oximeter` instance listens for requests. @@ -31,6 +33,7 @@ impl OximeterInfo { id: info.collector_id, time_created: now, time_modified: now, + time_deleted: None, ip: info.address.ip().into(), port: info.address.port().into(), } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 5d9b3da78f..4669178cd3 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -794,6 +794,7 @@ table! { id -> Uuid, time_created -> Timestamptz, time_modified -> Timestamptz, + time_deleted -> Nullable, ip -> Inet, port -> Int4, } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 54718bbddb..4c60000507 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(94, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(95, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(95, "oximeter-add-time-deleted"), KnownVersion::new(94, "put-back-creating-vmm-state"), KnownVersion::new(93, "dataset-kinds-zone-and-debug"), KnownVersion::new(92, "lldp-link-config-nullable"), diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 5b1163dc8b..6eec9500dc 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -115,6 +115,7 @@ pub use dns::DnsVersionUpdateBuilder; pub use instance::{InstanceAndActiveVmm, InstanceGestalt}; pub use inventory::DataStoreInventoryTest; use nexus_db_model::AllSchemaVersions; +pub use oximeter::CollectorReassignment; pub use rack::RackInit; pub use rack::SledUnderlayAllocationResult; pub use region::RegionAllocationFor; diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 1aa3435cb6..6ab045ef45 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -15,16 +15,30 @@ use crate::db::model::OximeterInfo; use crate::db::model::ProducerEndpoint; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; +use crate::db::queries; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; use chrono::Utc; use diesel::prelude::*; +use diesel::result::DatabaseErrorKind; +use diesel::result::Error as DieselError; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; use uuid::Uuid; +/// Type returned when reassigning producers from a (presumably defunct) +/// Oximeter collector. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CollectorReassignment { + /// Success: `n` producers were reassigned to other collector(s). + Complete(usize), + /// Reassignment could not complete because there are no other collectors + /// available. + NoCollectorsAvailable, +} + impl DataStore { /// Lookup an oximeter instance by its ID. pub async fn oximeter_lookup( @@ -34,6 +48,7 @@ impl DataStore { ) -> Result { use db::schema::oximeter::dsl; dsl::oximeter + .filter(dsl::time_deleted.is_null()) .find(*id) .first_async(&*self.pool_connection_authorized(opctx).await?) .await @@ -75,6 +90,30 @@ impl DataStore { Ok(()) } + /// Mark an Oximeter instance as deleted + /// + /// This method is idempotent and has no effect if called with the ID for an + /// already-deleted Oximeter. + pub async fn oximeter_delete( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result<(), Error> { + use db::schema::oximeter::dsl; + + let now = Utc::now(); + + diesel::update(dsl::oximeter) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(id)) + .set(dsl::time_deleted.eq(now)) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(()) + } + /// List the oximeter collector instances pub async fn oximeter_list( &self, @@ -83,6 +122,7 @@ impl DataStore { ) -> ListResultVec { use db::schema::oximeter::dsl; paginated(dsl::oximeter, dsl::id, page_params) + .filter(dsl::time_deleted.is_null()) .load_async::( &*self.pool_connection_authorized(opctx).await?, ) @@ -90,6 +130,30 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// Reassign all metric producers currently assigned to Oximeter `id` + /// + /// The new Oximeter instance for each producer will be randomly selected + /// from all available Oximeters. On success, returns the number of metric + /// producers reassigned. Fails if there are no available Oximeter instances + /// (e.g., all Oximeter instances have been deleted). + pub async fn oximeter_reassign_all_producers( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result { + match queries::oximeter::reassign_producers_query(id) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + { + Ok(n) => Ok(CollectorReassignment::Complete(n)), + Err(DieselError::DatabaseError( + DatabaseErrorKind::NotNullViolation, + _, + )) => Ok(CollectorReassignment::NoCollectorsAvailable), + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + /// Create a record for a new producer endpoint pub async fn producer_endpoint_create( &self, @@ -272,6 +336,302 @@ mod tests { expired_batched } + #[tokio::test] + async fn test_oximeter_delete() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_delete"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let mut collector_ids = + (0..4).map(|_| Uuid::new_v4()).collect::>(); + + // Sort the IDs for easier comparisons later. + collector_ids.sort(); + + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Ensure all our collectors exist and aren't deleted. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len()); + for (info, &expected_id) in all_collectors.iter().zip(&collector_ids) { + assert_eq!(info.id, expected_id); + assert!(info.time_deleted.is_none()); + } + + // Delete the first two of them. + datastore + .oximeter_delete(&opctx, collector_ids[0]) + .await + .expect("deleted collector"); + datastore + .oximeter_delete(&opctx, collector_ids[1]) + .await + .expect("deleted collector"); + + // Ensure those two were deleted. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len() - 2); + for (info, &expected_id) in + all_collectors.iter().zip(&collector_ids[2..]) + { + assert_eq!(info.id, expected_id); + assert!(info.time_deleted.is_none()); + } + + // Deletion is idempotent. To test, we'll read the deleted rows + // directly, delete them again, and confirm the row contents haven't + // changed. + let find_oximeter_ignoring_deleted = |id| { + let datastore = &datastore; + let opctx = &opctx; + async move { + let conn = datastore + .pool_connection_authorized(opctx) + .await + .expect("acquired connection"); + use db::schema::oximeter::dsl; + let info: OximeterInfo = dsl::oximeter + .find(id) + .first_async(&*conn) + .await + .expect("found Oximeter by ID"); + info + } + }; + let deleted0a = find_oximeter_ignoring_deleted(collector_ids[0]).await; + let deleted1a = find_oximeter_ignoring_deleted(collector_ids[1]).await; + assert!(deleted0a.time_deleted.is_some()); + assert!(deleted1a.time_deleted.is_some()); + + datastore + .oximeter_delete(&opctx, collector_ids[0]) + .await + .expect("deleted collector"); + datastore + .oximeter_delete(&opctx, collector_ids[1]) + .await + .expect("deleted collector"); + + let deleted0b = find_oximeter_ignoring_deleted(collector_ids[0]).await; + let deleted1b = find_oximeter_ignoring_deleted(collector_ids[1]).await; + assert_eq!(deleted0a, deleted0b); + assert_eq!(deleted1a, deleted1b); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassigns_randomly() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_reassigns_randomly"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 250 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..250 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete one collector. + datastore + .oximeter_delete(&opctx, collector_ids[0]) + .await + .expect("deleted Oximeter"); + + // Reassign producers that belonged to that collector. + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_ids[0]) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(250)); + + // Check the distribution of producers for each of the remaining + // collectors. We don't know the exact count, so we'll check that: + // + // * Each of the three remaining collectors gained at least one (the + // probability that any of the three collectors gained zero is low + // enough that most calculators give up and call it 0) + // * All 1000 producers are assigned to one of the three collectors + // + // to guard against "the reassignment query gave all 250 to exactly one + // of the remaining collectors", which is an easy failure mode for this + // kind of SQL query, where the query engine only evaluates the + // randomness once instead of once for each producer. + let mut producer_counts = [0; 4]; + for i in 0..4 { + producer_counts[i] = datastore + .producers_list_by_oximeter_id( + &opctx, + collector_ids[i], + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + } + assert_eq!(producer_counts[0], 0); // all reassigned + assert!(producer_counts[1] > 250); // gained at least one + assert!(producer_counts[2] > 250); // gained at least one + assert!(producer_counts[3] > 250); // gained at least one + assert_eq!(producer_counts[1..].iter().sum::(), 1000); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassign_fails_if_no_collectors() { + // Setup + let logctx = dev::test_setup_log( + "test_oximeter_reassign_fails_if_no_collectors", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 10 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..10 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete all four collectors. + for &collector_id in &collector_ids { + datastore + .oximeter_delete(&opctx, collector_id) + .await + .expect("deleted Oximeter"); + } + + // Try to reassign producers that belonged to each collector; this + // should fail, as all collectors have been deleted. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!( + num_reassigned, + CollectorReassignment::NoCollectorsAvailable + ); + } + + // Now insert a new collector. + let new_collector_id = Uuid::new_v4(); + datastore + .oximeter_create( + &opctx, + &OximeterInfo::new(¶ms::OximeterInfo { + collector_id: new_collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }), + ) + .await + .expect("inserted collector"); + + // Reassigning the original four collectors should now all succeed. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(10)); + } + + // All 40 producers should be assigned to our new collector. + let nproducers = datastore + .producers_list_by_oximeter_id( + &opctx, + new_collector_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + assert_eq!(nproducers, 40); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_producers_list_expired() { // Setup diff --git a/nexus/db-queries/src/db/queries/mod.rs b/nexus/db-queries/src/db/queries/mod.rs index f88b8fab6d..02800e3a3c 100644 --- a/nexus/db-queries/src/db/queries/mod.rs +++ b/nexus/db-queries/src/db/queries/mod.rs @@ -11,6 +11,7 @@ pub mod ip_pool; #[macro_use] mod next_item; pub mod network_interface; +pub mod oximeter; pub mod region_allocation; pub mod virtual_provisioning_collection_update; pub mod volume; diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs new file mode 100644 index 0000000000..f9d59a4484 --- /dev/null +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -0,0 +1,111 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Implementation of queries for Oximeter collectors and producers. + +use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use diesel::sql_types; +use uuid::Uuid; + +/// For a given Oximeter instance (which is presumably no longer running), +/// reassign any collectors assigned to it to a different Oximeter. Each +/// assignment is randomly chosen from among the non-deleted Oximeter instances +/// recorded in the `oximeter` table. +pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { + let builder = QueryBuilder::new(); + + // Find all non-deleted Oximeter instances. + let builder = builder.sql( + "\ + WITH available_oximeters AS ( \ + SELECT ARRAY( \ + SELECT id FROM oximeter WHERE time_deleted IS NULL + ) AS ids \ + ), ", + ); + + // Create a mapping of producer ID <-> new, random, non-deleted Oximeter ID + // for every producer assigned to `oximeter_id`. If the `ids` array from the + // previous expression is empty, every `new_id` column in this expression + // will be NULL. We'll catch that in the update below. + let builder = builder + .sql( + "\ + new_assignments AS ( \ + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::float)::int] + AS new_id + FROM metric_producer + LEFT JOIN available_oximeters ON true + WHERE oximeter_id = ", + ) + .param() + .sql(")") + .bind::(oximeter_id); + + // Actually perform the update. If the `new_id` column from the previous + // step is `NULL` (because there aren't any non-deleted Oximeter instances), + // this will fail the `NOT NULL` constraint on the oximeter_id column. + let builder = builder + .sql( + "\ + UPDATE metric_producer SET oximeter_id = ( \ + SELECT new_id FROM new_assignments \ + WHERE new_assignments.producer_id = metric_producer.id \ + ) WHERE oximeter_id = ", + ) + .param() + .bind::(oximeter_id); + + builder.query() +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db::explain::ExplainableAsync; + use crate::db::raw_query_builder::expectorate_query_contents; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + use uuid::Uuid; + + // This test is a bit of a "change detector", but it's here to help with + // debugging too. If you change this query, it can be useful to see exactly + // how the output SQL has been altered. + #[tokio::test] + async fn expectorate_query() { + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + + expectorate_query_contents( + &query, + "tests/output/oximeter_reassign_producers.sql", + ) + .await; + } + + // Explain the SQL query to ensure that it creates a valid SQL string. + #[tokio::test] + async fn explainable() { + let logctx = dev::test_setup_log("explainable"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); + + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + let _ = query + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/tests/output/oximeter_reassign_producers.sql b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql new file mode 100644 index 0000000000..131575722f --- /dev/null +++ b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql @@ -0,0 +1,19 @@ +WITH + available_oximeters AS (SELECT ARRAY (SELECT id FROM oximeter WHERE time_deleted IS NULL) AS ids), + new_assignments + AS ( + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::FLOAT8)::INT8] AS new_id + FROM + metric_producer LEFT JOIN available_oximeters ON true + WHERE + oximeter_id = $1 + ) +UPDATE + metric_producer +SET + oximeter_id + = (SELECT new_id FROM new_assignments WHERE new_assignments.producer_id = metric_producer.id) +WHERE + oximeter_id = $2 diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 9906a94ac6..8528ec2921 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -1339,9 +1339,15 @@ CREATE TABLE IF NOT EXISTS omicron.public.oximeter ( time_created TIMESTAMPTZ NOT NULL, time_modified TIMESTAMPTZ NOT NULL, ip INET NOT NULL, - port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL + port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL, + time_deleted TIMESTAMPTZ ); +CREATE UNIQUE INDEX IF NOT EXISTS lookup_non_deleted_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_deleted IS NULL; + /* * The kind of metric producer each record corresponds to. */ @@ -4233,7 +4239,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '94.0.0', NULL) + (TRUE, NOW(), NOW(), '95.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/oximeter-add-time-deleted/up1.sql b/schema/crdb/oximeter-add-time-deleted/up1.sql new file mode 100644 index 0000000000..e722a0d78c --- /dev/null +++ b/schema/crdb/oximeter-add-time-deleted/up1.sql @@ -0,0 +1 @@ +ALTER TABLE omicron.public.oximeter ADD COLUMN IF NOT EXISTS time_deleted TIMESTAMPTZ; diff --git a/schema/crdb/oximeter-add-time-deleted/up2.sql b/schema/crdb/oximeter-add-time-deleted/up2.sql new file mode 100644 index 0000000000..3cb4d764b5 --- /dev/null +++ b/schema/crdb/oximeter-add-time-deleted/up2.sql @@ -0,0 +1,4 @@ +CREATE UNIQUE INDEX IF NOT EXISTS lookup_non_deleted_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_deleted IS NULL; From 2bddf0531d895d1ef509ae37535d274bfc78967f Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 4 Sep 2024 15:15:48 -0400 Subject: [PATCH 2/6] Reject producer creation requests with bad oximeter IDs --- nexus/db-queries/src/db/datastore/oximeter.rs | 176 +++++++++++++++++- 1 file changed, 169 insertions(+), 7 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 6ab045ef45..ed9cce2f56 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -22,6 +22,8 @@ use chrono::Utc; use diesel::prelude::*; use diesel::result::DatabaseErrorKind; use diesel::result::Error as DieselError; +use diesel::sql_types; +use nexus_db_model::ProducerKindEnum; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; @@ -63,10 +65,18 @@ impl DataStore { ) -> Result<(), Error> { use db::schema::oximeter::dsl; - // If we get a conflict on the Oximeter ID, this means that collector instance was - // previously registered, and it's re-registering due to something like a service restart. - // In this case, we update the time modified and the service address, rather than - // propagating a constraint violation to the caller. + // If we get a conflict on the Oximeter ID, this means that collector + // instance was previously registered, and it's re-registering due to + // something like a service restart. In this case, we update the time + // modified and the service address, rather than propagating a + // constraint violation to the caller. + // + // TODO(john) - Should we ignore time_deleted or explicitly set it to + // NULL? We _shouldn't_ be called by an Oximeter that's been deleted, + // and if we are that indicates a bug somewhere else in the system + // (probably in reconfigurator or its related tasks). For now, we refuse + // to resurrect a deleted Oximeter: if we're called with a deleted + // instance, we'll leave it deleted. diesel::insert_into(dsl::oximeter) .values(*info) .on_conflict(dsl::id) @@ -160,11 +170,47 @@ impl DataStore { opctx: &OpContext, producer: &ProducerEndpoint, ) -> Result<(), Error> { + // Our caller has already chosen an Oximeter instance for this producer, + // but we don't want to allow it to use a nonexistent or deleted + // Oximeter. This query turns into a `SELECT all_the_fields_of_producer + // WHERE producer.oximeter_id is legal` in a diesel-compatible way. I'm + // not aware of a helper method to generate "all the fields of + // `producer`", so instead we have a big tuple of its fields that must + // stay in sync with the `table!` definition and field ordering for the + // `metric_producer` table. The compiler will catch any mistakes + // _except_ incorrect orderings where the types still line up (e.g., + // swapping two Uuid columns), which is not ideal but is hopefully good + // enough. + let producer_subquery = { + use db::schema::oximeter::dsl; + + dsl::oximeter + .select(( + producer.id().into_sql::(), + producer + .time_created() + .into_sql::(), + producer + .time_modified() + .into_sql::(), + producer.kind.into_sql::(), + producer.ip.into_sql::(), + producer.port.into_sql::(), + producer.interval.into_sql::(), + producer.oximeter_id.into_sql::(), + )) + .filter( + dsl::id + .eq(producer.oximeter_id) + .and(dsl::time_deleted.is_null()), + ) + }; + use db::schema::metric_producer::dsl; // TODO: see https://github.com/oxidecomputer/omicron/issues/323 - diesel::insert_into(dsl::metric_producer) - .values(producer.clone()) + let n = diesel::insert_into(dsl::metric_producer) + .values(producer_subquery) .on_conflict(dsl::id) .do_update() .set(( @@ -185,7 +231,22 @@ impl DataStore { ), ) })?; - Ok(()) + + // We expect `n` to basically always be 1 (1 row was inserted or + // updated). It can be 0 if `producer.oximeter_id` doesn't exist or has + // been deleted. It can never be 2 or greater because + // `producer_subquery` filters on finding an exact row for its Oximeter + // instance's ID. + match n { + 0 => Err(Error::not_found_by_id( + ResourceType::Oximeter, + &producer.oximeter_id, + )), + 1 => Ok(()), + _ => Err(Error::internal_error(&format!( + "multiple rows inserted ({n}) in `producer_endpoint_create`" + ))), + } } /// Delete a record for a producer endpoint, by its ID. @@ -290,6 +351,7 @@ mod tests { use db::datastore::pub_test_utils::datastore_test; use nexus_test_utils::db::test_setup_database; use nexus_types::internal_api::params; + use omicron_common::api::external::LookupType; use omicron_common::api::internal::nexus; use omicron_test_utils::dev; use std::time::Duration; @@ -442,6 +504,106 @@ mod tests { logctx.cleanup_successful(); } + #[tokio::test] + async fn test_producer_endpoint_create_rejects_deleted_oximeters() { + // Setup + let logctx = dev::test_setup_log( + "test_producer_endpoint_create_rejects_deleted_oximeters", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // We can insert metric producers for each collector. + for &collector_id in &collector_ids { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Delete the first collector. + datastore + .oximeter_delete(&opctx, collector_ids[0]) + .await + .expect("deleted collector"); + + // Attempting to insert a producer assigned to the first collector + // should fail, now that it's deleted. + let err = { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_ids[0], + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect_err("producer creation fails") + }; + assert_eq!( + err, + Error::ObjectNotFound { + type_name: ResourceType::Oximeter, + lookup_type: LookupType::ById(collector_ids[0]) + } + ); + + // We can still insert metric producers for the other collectors... + for &collector_id in &collector_ids[1..] { + let mut producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + + // ... and we can update them. + producer.port = 100.into(); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_oximeter_reassigns_randomly() { // Setup From 74f69ae713071ad037a4d523da0ee1f47e712043 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 6 Sep 2024 11:52:55 -0400 Subject: [PATCH 3/6] review feedback --- nexus/db-queries/src/db/datastore/oximeter.rs | 12 ++++++------ schema/crdb/dbinit.sql | 7 ++++++- schema/crdb/oximeter-add-time-deleted/up2.sql | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index ed9cce2f56..8896163908 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -71,12 +71,12 @@ impl DataStore { // modified and the service address, rather than propagating a // constraint violation to the caller. // - // TODO(john) - Should we ignore time_deleted or explicitly set it to - // NULL? We _shouldn't_ be called by an Oximeter that's been deleted, - // and if we are that indicates a bug somewhere else in the system - // (probably in reconfigurator or its related tasks). For now, we refuse - // to resurrect a deleted Oximeter: if we're called with a deleted - // instance, we'll leave it deleted. + // TODO-completeness - We should return an error if `info.id()` maps to + // an existing row that has been deleted. We don't expect that to happen + // in practice (it would mean an expunged Oximeter zone has come back to + // life and reregistered itself). If it does happen, as written we'll + // update time_modified/ip/port but leave time_deleted set to whatever + // it was (which will leave the Oximeter in the "deleted" state). diesel::insert_into(dsl::oximeter) .values(*info) .on_conflict(dsl::id) diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 8528ec2921..93ac7b0cb7 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -1343,7 +1343,12 @@ CREATE TABLE IF NOT EXISTS omicron.public.oximeter ( time_deleted TIMESTAMPTZ ); -CREATE UNIQUE INDEX IF NOT EXISTS lookup_non_deleted_oximeter ON omicron.public.oximeter ( +/* + * The query Nexus runs to choose an Oximeter instance for new metric producers + * involves listing the non-deleted instances sorted by ID, which would require + * a full table scan without this index. + */ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_deleted_oximeter ON omicron.public.oximeter ( id ) WHERE time_deleted IS NULL; diff --git a/schema/crdb/oximeter-add-time-deleted/up2.sql b/schema/crdb/oximeter-add-time-deleted/up2.sql index 3cb4d764b5..ce2315b74d 100644 --- a/schema/crdb/oximeter-add-time-deleted/up2.sql +++ b/schema/crdb/oximeter-add-time-deleted/up2.sql @@ -1,4 +1,4 @@ -CREATE UNIQUE INDEX IF NOT EXISTS lookup_non_deleted_oximeter ON omicron.public.oximeter ( +CREATE UNIQUE INDEX IF NOT EXISTS list_non_deleted_oximeter ON omicron.public.oximeter ( id ) WHERE time_deleted IS NULL; From 1bdb6704146266a3a1a5f5df5f9e056e11d43809 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 13 Sep 2024 15:35:46 -0400 Subject: [PATCH 4/6] time_deleted -> time_expunged --- gateway-test-utils/configs/config.test.toml | 12 +- nexus/db-model/src/oximeter_info.rs | 15 ++- nexus/db-model/src/schema.rs | 2 +- nexus/db-model/src/schema_versions.rs | 2 +- nexus/db-queries/src/db/datastore/oximeter.rs | 107 +++++++++--------- nexus/db-queries/src/db/queries/oximeter.rs | 13 ++- .../output/oximeter_reassign_producers.sql | 3 +- schema/crdb/dbinit.sql | 8 +- schema/crdb/oximeter-add-time-deleted/up2.sql | 4 - .../up1.sql | 2 +- .../crdb/oximeter-add-time-expunged/up2.sql | 4 + 11 files changed, 96 insertions(+), 76 deletions(-) delete mode 100644 schema/crdb/oximeter-add-time-deleted/up2.sql rename schema/crdb/{oximeter-add-time-deleted => oximeter-add-time-expunged}/up1.sql (72%) create mode 100644 schema/crdb/oximeter-add-time-expunged/up2.sql diff --git a/gateway-test-utils/configs/config.test.toml b/gateway-test-utils/configs/config.test.toml index 4e3e9c6e6e..b3401ddf09 100644 --- a/gateway-test-utils/configs/config.test.toml +++ b/gateway-test-utils/configs/config.test.toml @@ -23,11 +23,15 @@ udp_listen_port = 0 # as our contact to the ignition controller)? local_ignition_controller_interface = "fake-switch0" -# When sending UDP RPC packets to an SP, how many total attempts do we make -# before giving up? -rpc_max_attempts = 3 +# When sending UDP RPC packets to an SP (other than to reset it), how many total +# attempts do we make before giving up? +rpc_max_attempts_general = 3 -# sleep time between UDP RPC resends (up to `rpc_max_attempts`) +# When sending UDP RPC packets to an SP (to reset it), how many total attempts +# do we make before giving up? +rpc_max_attempts_reset = 10 + +# sleep time between UDP RPC resends (up to `rpc_max_attempts_*`) rpc_per_attempt_timeout_millis = 1000 [switch.location] diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index 01570d64c9..5579425a63 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -18,8 +18,17 @@ pub struct OximeterInfo { pub time_created: DateTime, /// When this resource was last modified. pub time_modified: DateTime, - /// When this resource was deleted. - pub time_deleted: Option>, + /// When this resource was expunged. + // + // We typically refer to _zones_ as expunged; this isn't quite the same + // thing since this is the record of a running Oximeter instance. Some time + // after an Oximeter zone has been expunged (usually not very long!), the + // blueprint_executor RPW will mark the Oximeter instance that was running + // in that zone as expunged, setting this field to a non-None value, which + // will cause it to no longer be chosen as a potential collector for + // producers (and will result in any producers it had been assigned being + // reassigned to some other collector). + pub time_expunged: Option>, /// The address on which this `oximeter` instance listens for requests. pub ip: ipnetwork::IpNetwork, /// The port on which this `oximeter` instance listens for requests. @@ -33,7 +42,7 @@ impl OximeterInfo { id: info.collector_id, time_created: now, time_modified: now, - time_deleted: None, + time_expunged: None, ip: info.address.ip().into(), port: info.address.port().into(), } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 4669178cd3..de9ca6f7db 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -794,7 +794,7 @@ table! { id -> Uuid, time_created -> Timestamptz, time_modified -> Timestamptz, - time_deleted -> Nullable, + time_expunged -> Nullable, ip -> Inet, port -> Int4, } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 4c60000507..e74268bfe7 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -29,7 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), - KnownVersion::new(95, "oximeter-add-time-deleted"), + KnownVersion::new(95, "oximeter-add-time-expunged"), KnownVersion::new(94, "put-back-creating-vmm-state"), KnownVersion::new(93, "dataset-kinds-zone-and-debug"), KnownVersion::new(92, "lldp-link-config-nullable"), diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 8896163908..4cce1f9c10 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -50,7 +50,7 @@ impl DataStore { ) -> Result { use db::schema::oximeter::dsl; dsl::oximeter - .filter(dsl::time_deleted.is_null()) + .filter(dsl::time_expunged.is_null()) .find(*id) .first_async(&*self.pool_connection_authorized(opctx).await?) .await @@ -72,11 +72,12 @@ impl DataStore { // constraint violation to the caller. // // TODO-completeness - We should return an error if `info.id()` maps to - // an existing row that has been deleted. We don't expect that to happen - // in practice (it would mean an expunged Oximeter zone has come back to - // life and reregistered itself). If it does happen, as written we'll - // update time_modified/ip/port but leave time_deleted set to whatever - // it was (which will leave the Oximeter in the "deleted" state). + // an existing row that has been expunged. We don't expect that to + // happen in practice (it would mean an expunged Oximeter zone has come + // back to life and reregistered itself). If it does happen, as written + // we'll update time_modified/ip/port but leave time_expunged set to + // whatever it was (which will leave the Oximeter in the "expunged" + // state). diesel::insert_into(dsl::oximeter) .values(*info) .on_conflict(dsl::id) @@ -100,11 +101,11 @@ impl DataStore { Ok(()) } - /// Mark an Oximeter instance as deleted + /// Mark an Oximeter instance as expunged /// /// This method is idempotent and has no effect if called with the ID for an - /// already-deleted Oximeter. - pub async fn oximeter_delete( + /// already-expunged Oximeter. + pub async fn oximeter_expunge( &self, opctx: &OpContext, id: Uuid, @@ -114,9 +115,9 @@ impl DataStore { let now = Utc::now(); diesel::update(dsl::oximeter) - .filter(dsl::time_deleted.is_null()) + .filter(dsl::time_expunged.is_null()) .filter(dsl::id.eq(id)) - .set(dsl::time_deleted.eq(now)) + .set(dsl::time_expunged.eq(now)) .execute_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; @@ -132,7 +133,7 @@ impl DataStore { ) -> ListResultVec { use db::schema::oximeter::dsl; paginated(dsl::oximeter, dsl::id, page_params) - .filter(dsl::time_deleted.is_null()) + .filter(dsl::time_expunged.is_null()) .load_async::( &*self.pool_connection_authorized(opctx).await?, ) @@ -145,7 +146,7 @@ impl DataStore { /// The new Oximeter instance for each producer will be randomly selected /// from all available Oximeters. On success, returns the number of metric /// producers reassigned. Fails if there are no available Oximeter instances - /// (e.g., all Oximeter instances have been deleted). + /// (e.g., all Oximeter instances have been expunged). pub async fn oximeter_reassign_all_producers( &self, opctx: &OpContext, @@ -171,7 +172,7 @@ impl DataStore { producer: &ProducerEndpoint, ) -> Result<(), Error> { // Our caller has already chosen an Oximeter instance for this producer, - // but we don't want to allow it to use a nonexistent or deleted + // but we don't want to allow it to use a nonexistent or expunged // Oximeter. This query turns into a `SELECT all_the_fields_of_producer // WHERE producer.oximeter_id is legal` in a diesel-compatible way. I'm // not aware of a helper method to generate "all the fields of @@ -202,7 +203,7 @@ impl DataStore { .filter( dsl::id .eq(producer.oximeter_id) - .and(dsl::time_deleted.is_null()), + .and(dsl::time_expunged.is_null()), ) }; @@ -234,7 +235,7 @@ impl DataStore { // We expect `n` to basically always be 1 (1 row was inserted or // updated). It can be 0 if `producer.oximeter_id` doesn't exist or has - // been deleted. It can never be 2 or greater because + // been expunged. It can never be 2 or greater because // `producer_subquery` filters on finding an exact row for its Oximeter // instance's ID. match n { @@ -399,9 +400,9 @@ mod tests { } #[tokio::test] - async fn test_oximeter_delete() { + async fn test_oximeter_expunge() { // Setup - let logctx = dev::test_setup_log("test_oximeter_delete"); + let logctx = dev::test_setup_log("test_oximeter_expunge"); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db, Uuid::new_v4()).await; @@ -424,7 +425,7 @@ mod tests { .expect("inserted collector"); } - // Ensure all our collectors exist and aren't deleted. + // Ensure all our collectors exist and aren't expunged. let mut all_collectors = datastore .oximeter_list(&opctx, &DataPageParams::max_page()) .await @@ -433,20 +434,20 @@ mod tests { assert_eq!(all_collectors.len(), collector_ids.len()); for (info, &expected_id) in all_collectors.iter().zip(&collector_ids) { assert_eq!(info.id, expected_id); - assert!(info.time_deleted.is_none()); + assert!(info.time_expunged.is_none()); } // Delete the first two of them. datastore - .oximeter_delete(&opctx, collector_ids[0]) + .oximeter_expunge(&opctx, collector_ids[0]) .await - .expect("deleted collector"); + .expect("expunged collector"); datastore - .oximeter_delete(&opctx, collector_ids[1]) + .oximeter_expunge(&opctx, collector_ids[1]) .await - .expect("deleted collector"); + .expect("expunged collector"); - // Ensure those two were deleted. + // Ensure those two were expunged. let mut all_collectors = datastore .oximeter_list(&opctx, &DataPageParams::max_page()) .await @@ -457,13 +458,13 @@ mod tests { all_collectors.iter().zip(&collector_ids[2..]) { assert_eq!(info.id, expected_id); - assert!(info.time_deleted.is_none()); + assert!(info.time_expunged.is_none()); } - // Deletion is idempotent. To test, we'll read the deleted rows - // directly, delete them again, and confirm the row contents haven't + // Deletion is idempotent. To test, we'll read the expunged rows + // directly, expunge them again, and confirm the row contents haven't // changed. - let find_oximeter_ignoring_deleted = |id| { + let find_oximeter_ignoring_expunged = |id| { let datastore = &datastore; let opctx = &opctx; async move { @@ -480,24 +481,28 @@ mod tests { info } }; - let deleted0a = find_oximeter_ignoring_deleted(collector_ids[0]).await; - let deleted1a = find_oximeter_ignoring_deleted(collector_ids[1]).await; - assert!(deleted0a.time_deleted.is_some()); - assert!(deleted1a.time_deleted.is_some()); + let expunged0a = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1a = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert!(expunged0a.time_expunged.is_some()); + assert!(expunged1a.time_expunged.is_some()); datastore - .oximeter_delete(&opctx, collector_ids[0]) + .oximeter_expunge(&opctx, collector_ids[0]) .await - .expect("deleted collector"); + .expect("expunged collector"); datastore - .oximeter_delete(&opctx, collector_ids[1]) + .oximeter_expunge(&opctx, collector_ids[1]) .await - .expect("deleted collector"); + .expect("expunged collector"); - let deleted0b = find_oximeter_ignoring_deleted(collector_ids[0]).await; - let deleted1b = find_oximeter_ignoring_deleted(collector_ids[1]).await; - assert_eq!(deleted0a, deleted0b); - assert_eq!(deleted1a, deleted1b); + let expunged0b = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1b = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert_eq!(expunged0a, expunged0b); + assert_eq!(expunged1a, expunged1b); // Cleanup db.cleanup().await.unwrap(); @@ -505,10 +510,10 @@ mod tests { } #[tokio::test] - async fn test_producer_endpoint_create_rejects_deleted_oximeters() { + async fn test_producer_endpoint_create_rejects_expunged_oximeters() { // Setup let logctx = dev::test_setup_log( - "test_producer_endpoint_create_rejects_deleted_oximeters", + "test_producer_endpoint_create_rejects_expunged_oximeters", ); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = @@ -546,12 +551,12 @@ mod tests { // Delete the first collector. datastore - .oximeter_delete(&opctx, collector_ids[0]) + .oximeter_expunge(&opctx, collector_ids[0]) .await - .expect("deleted collector"); + .expect("expunged collector"); // Attempting to insert a producer assigned to the first collector - // should fail, now that it's deleted. + // should fail, now that it's expunged. let err = { let producer = ProducerEndpoint::new( &nexus::ProducerEndpoint { @@ -646,9 +651,9 @@ mod tests { // Delete one collector. datastore - .oximeter_delete(&opctx, collector_ids[0]) + .oximeter_expunge(&opctx, collector_ids[0]) .await - .expect("deleted Oximeter"); + .expect("expunged Oximeter"); // Reassign producers that belonged to that collector. let num_reassigned = datastore @@ -737,13 +742,13 @@ mod tests { // Delete all four collectors. for &collector_id in &collector_ids { datastore - .oximeter_delete(&opctx, collector_id) + .oximeter_expunge(&opctx, collector_id) .await - .expect("deleted Oximeter"); + .expect("expunged Oximeter"); } // Try to reassign producers that belonged to each collector; this - // should fail, as all collectors have been deleted. + // should fail, as all collectors have been expunged. for &collector_id in &collector_ids { let num_reassigned = datastore .oximeter_reassign_all_producers(&opctx, collector_id) diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs index f9d59a4484..fb51ee5362 100644 --- a/nexus/db-queries/src/db/queries/oximeter.rs +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -10,22 +10,22 @@ use uuid::Uuid; /// For a given Oximeter instance (which is presumably no longer running), /// reassign any collectors assigned to it to a different Oximeter. Each -/// assignment is randomly chosen from among the non-deleted Oximeter instances +/// assignment is randomly chosen from among the non-expunged Oximeter instances /// recorded in the `oximeter` table. pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { let builder = QueryBuilder::new(); - // Find all non-deleted Oximeter instances. + // Find all non-expunged Oximeter instances. let builder = builder.sql( "\ WITH available_oximeters AS ( \ SELECT ARRAY( \ - SELECT id FROM oximeter WHERE time_deleted IS NULL + SELECT id FROM oximeter WHERE time_expunged IS NULL ) AS ids \ ), ", ); - // Create a mapping of producer ID <-> new, random, non-deleted Oximeter ID + // Create a mapping of producer ID <-> new, random, non-expunged Oximeter ID // for every producer assigned to `oximeter_id`. If the `ids` array from the // previous expression is empty, every `new_id` column in this expression // will be NULL. We'll catch that in the update below. @@ -46,8 +46,9 @@ pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { .bind::(oximeter_id); // Actually perform the update. If the `new_id` column from the previous - // step is `NULL` (because there aren't any non-deleted Oximeter instances), - // this will fail the `NOT NULL` constraint on the oximeter_id column. + // step is `NULL` (because there aren't any non-expunged Oximeter + // instances), this will fail the `NOT NULL` constraint on the oximeter_id + // column. let builder = builder .sql( "\ diff --git a/nexus/db-queries/tests/output/oximeter_reassign_producers.sql b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql index 131575722f..4ef88a800a 100644 --- a/nexus/db-queries/tests/output/oximeter_reassign_producers.sql +++ b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql @@ -1,5 +1,6 @@ WITH - available_oximeters AS (SELECT ARRAY (SELECT id FROM oximeter WHERE time_deleted IS NULL) AS ids), + available_oximeters + AS (SELECT ARRAY (SELECT id FROM oximeter WHERE time_expunged IS NULL) AS ids), new_assignments AS ( SELECT diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 93ac7b0cb7..3d8fefe099 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -1340,18 +1340,18 @@ CREATE TABLE IF NOT EXISTS omicron.public.oximeter ( time_modified TIMESTAMPTZ NOT NULL, ip INET NOT NULL, port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL, - time_deleted TIMESTAMPTZ + time_expunged TIMESTAMPTZ ); /* * The query Nexus runs to choose an Oximeter instance for new metric producers - * involves listing the non-deleted instances sorted by ID, which would require + * involves listing the non-expunged instances sorted by ID, which would require * a full table scan without this index. */ -CREATE UNIQUE INDEX IF NOT EXISTS list_non_deleted_oximeter ON omicron.public.oximeter ( +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( id ) WHERE - time_deleted IS NULL; + time_expunged IS NULL; /* * The kind of metric producer each record corresponds to. diff --git a/schema/crdb/oximeter-add-time-deleted/up2.sql b/schema/crdb/oximeter-add-time-deleted/up2.sql deleted file mode 100644 index ce2315b74d..0000000000 --- a/schema/crdb/oximeter-add-time-deleted/up2.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE UNIQUE INDEX IF NOT EXISTS list_non_deleted_oximeter ON omicron.public.oximeter ( - id -) WHERE - time_deleted IS NULL; diff --git a/schema/crdb/oximeter-add-time-deleted/up1.sql b/schema/crdb/oximeter-add-time-expunged/up1.sql similarity index 72% rename from schema/crdb/oximeter-add-time-deleted/up1.sql rename to schema/crdb/oximeter-add-time-expunged/up1.sql index e722a0d78c..f6915d5765 100644 --- a/schema/crdb/oximeter-add-time-deleted/up1.sql +++ b/schema/crdb/oximeter-add-time-expunged/up1.sql @@ -1 +1 @@ -ALTER TABLE omicron.public.oximeter ADD COLUMN IF NOT EXISTS time_deleted TIMESTAMPTZ; +ALTER TABLE omicron.public.oximeter ADD COLUMN IF NOT EXISTS time_expunged TIMESTAMPTZ; diff --git a/schema/crdb/oximeter-add-time-expunged/up2.sql b/schema/crdb/oximeter-add-time-expunged/up2.sql new file mode 100644 index 0000000000..710fe99ffe --- /dev/null +++ b/schema/crdb/oximeter-add-time-expunged/up2.sql @@ -0,0 +1,4 @@ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_expunged IS NULL; From e8aaf3b43ec1e682d2afb3ef06828b845c97969f Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 13 Sep 2024 15:50:16 -0400 Subject: [PATCH 5/6] comment cleanup --- nexus/db-queries/src/db/datastore/oximeter.rs | 7 +++++-- nexus/db-queries/src/db/queries/oximeter.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 4cce1f9c10..0c4b5077f2 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -30,8 +30,7 @@ use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; use uuid::Uuid; -/// Type returned when reassigning producers from a (presumably defunct) -/// Oximeter collector. +/// Type returned when reassigning producers from an Oximeter collector. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CollectorReassignment { /// Success: `n` producers were reassigned to other collector(s). @@ -43,6 +42,8 @@ pub enum CollectorReassignment { impl DataStore { /// Lookup an oximeter instance by its ID. + /// + /// Fails if the instance has been expunged. pub async fn oximeter_lookup( &self, opctx: &OpContext, @@ -126,6 +127,8 @@ impl DataStore { } /// List the oximeter collector instances + /// + /// Omits expunged instances. pub async fn oximeter_list( &self, opctx: &OpContext, diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs index fb51ee5362..40f7a2b493 100644 --- a/nexus/db-queries/src/db/queries/oximeter.rs +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -9,7 +9,7 @@ use diesel::sql_types; use uuid::Uuid; /// For a given Oximeter instance (which is presumably no longer running), -/// reassign any collectors assigned to it to a different Oximeter. Each +/// reassign any producers assigned to it to a different Oximeter. Each /// assignment is randomly chosen from among the non-expunged Oximeter instances /// recorded in the `oximeter` table. pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { From e5c04d705d4c452368ecff823094c0a4a19ecfac Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 13 Sep 2024 15:57:26 -0400 Subject: [PATCH 6/6] remove unrelated changes --- gateway-test-utils/configs/config.test.toml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/gateway-test-utils/configs/config.test.toml b/gateway-test-utils/configs/config.test.toml index b3401ddf09..4e3e9c6e6e 100644 --- a/gateway-test-utils/configs/config.test.toml +++ b/gateway-test-utils/configs/config.test.toml @@ -23,15 +23,11 @@ udp_listen_port = 0 # as our contact to the ignition controller)? local_ignition_controller_interface = "fake-switch0" -# When sending UDP RPC packets to an SP (other than to reset it), how many total -# attempts do we make before giving up? -rpc_max_attempts_general = 3 +# When sending UDP RPC packets to an SP, how many total attempts do we make +# before giving up? +rpc_max_attempts = 3 -# When sending UDP RPC packets to an SP (to reset it), how many total attempts -# do we make before giving up? -rpc_max_attempts_reset = 10 - -# sleep time between UDP RPC resends (up to `rpc_max_attempts_*`) +# sleep time between UDP RPC resends (up to `rpc_max_attempts`) rpc_per_attempt_timeout_millis = 1000 [switch.location]