diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 03f833b087..7dfa2fb68b 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -87,32 +87,43 @@ impl super::Nexus { "address" => oximeter_info.address, ); - // Regardless, notify the collector of any assigned metric producers. This should be empty - // if this Oximeter collector is registering for the first time, but may not be if the - // service is re-registering after failure. - let pagparams = DataPageParams { - marker: None, - direction: PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(100).unwrap(), - }; - let producers = self - .db_datastore - .producers_list_by_oximeter_id( - oximeter_info.collector_id, - &pagparams, - ) - .await?; - if !producers.is_empty() { + // Regardless, notify the collector of any assigned metric producers. + // + // This should be empty if this Oximeter collector is registering for + // the first time, but may not be if the service is re-registering after + // failure. + let client = self.build_oximeter_client( + &oximeter_info.collector_id, + oximeter_info.address, + ); + let mut last_producer_id = None; + loop { + let pagparams = DataPageParams { + marker: last_producer_id.as_ref(), + direction: PaginationOrder::Ascending, + limit: std::num::NonZeroU32::new(100).unwrap(), + }; + let producers = self + .db_datastore + .producers_list_by_oximeter_id( + oximeter_info.collector_id, + &pagparams, + ) + .await?; + if producers.is_empty() { + return Ok(()); + } debug!( self.log, - "registered oximeter collector that is already assigned producers, re-assigning them to the collector"; + "re-assigning existing metric producers to a collector"; "n_producers" => producers.len(), "collector_id" => ?oximeter_info.collector_id, ); - let client = self.build_oximeter_client( - &oximeter_info.collector_id, - oximeter_info.address, - ); + // Be sure to continue paginating from the last producer. + // + // Safety: We check just above if the list is empty, so there is a + // last element. + last_producer_id.replace(producers.last().unwrap().id()); for producer in producers.into_iter() { let producer_info = oximeter_client::types::ProducerEndpoint { id: producer.id(), @@ -132,7 +143,6 @@ impl super::Nexus { .map_err(Error::from)?; } } - Ok(()) } /// Register as a metric producer with the oximeter metric collection server. diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 2cda594e18..65aaa18642 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -4,11 +4,17 @@ //! Integration tests for oximeter collectors and producers. +use dropshot::Method; +use http::StatusCode; use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; +use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use oximeter_db::DbWrite; +use std::collections::BTreeSet; use std::net; +use std::net::Ipv6Addr; +use std::net::SocketAddr; use std::time::Duration; use uuid::Uuid; @@ -332,3 +338,87 @@ async fn test_oximeter_reregistration() { ); context.teardown().await; } + +// A regression test for https://github.com/oxidecomputer/omicron/issues/4498 +#[tokio::test] +async fn test_oximeter_collector_reregistration_gets_all_assignments() { + let mut context = nexus_test_utils::test_setup::( + "test_oximeter_collector_reregistration_gets_all_assignments", + ) + .await; + let oximeter_id = nexus_test_utils::OXIMETER_UUID.parse().unwrap(); + + // Create a bunch of producer records. + // + // Note that the actual count is arbitrary, but it should be larger than the + // internal pagination limit used in `Nexus::upsert_oximeter_collector()`, + // which is currently 100. + const N_PRODUCERS: usize = 150; + let mut ids = BTreeSet::new(); + for _ in 0..N_PRODUCERS { + let id = Uuid::new_v4(); + ids.insert(id); + let info = ProducerEndpoint { + id, + address: SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 12345), + base_route: String::from("/collect"), + interval: Duration::from_secs(1), + }; + context + .internal_client + .make_request( + Method::POST, + "/metrics/producers", + Some(&info), + StatusCode::NO_CONTENT, + ) + .await + .expect("failed to register test producer"); + } + + // Check that `oximeter` has these registered. + let producers = + context.oximeter.list_producers(None, N_PRODUCERS * 2).await; + let actual_ids: BTreeSet<_> = + producers.iter().map(|info| info.id).collect(); + + // There is an additional producer that's created as part of the normal test + // setup, so we'll check that all of the new producers exist, and that + // there's exactly 1 additional one. + assert!( + ids.is_subset(&actual_ids), + "oximeter did not get the right set of producers" + ); + assert_eq!( + ids.len(), + actual_ids.len() - 1, + "oximeter did not get the right set of producers" + ); + + // Drop and restart oximeter, which should result in the exact same set of + // producers again. + drop(context.oximeter); + context.oximeter = nexus_test_utils::start_oximeter( + context.logctx.log.new(o!("component" => "oximeter")), + context.server.get_http_server_internal_address().await, + context.clickhouse.port(), + oximeter_id, + ) + .await + .expect("failed to restart oximeter"); + + let producers = + context.oximeter.list_producers(None, N_PRODUCERS * 2).await; + let actual_ids: BTreeSet<_> = + producers.iter().map(|info| info.id).collect(); + assert!( + ids.is_subset(&actual_ids), + "oximeter did not get the right set of producers after re-registering" + ); + assert_eq!( + ids.len(), + actual_ids.len() - 1, + "oximeter did not get the right set of producers after re-registering" + ); + context.teardown().await; +}