Skip to content

Commit

Permalink
Send oximeter all existing producer assignments on registration
Browse files Browse the repository at this point in the history
- Fixes #4498
- When `oximeter` registers with `nexus`, we send it any producer
  assignments so that it can continue collecting after a restart. The
  list of assignments is paginated from the database, but we previously
  only sent the first page. This ensures we send all records, and adds a
  regression test.
  • Loading branch information
bnaecker committed Nov 15, 2023
1 parent f513182 commit df14cd6
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 22 deletions.
54 changes: 32 additions & 22 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,43 @@ impl super::Nexus {
"address" => oximeter_info.address,
);

// Regardless, notify the collector of any assigned metric producers. This should be empty
// if this Oximeter collector is registering for the first time, but may not be if the
// service is re-registering after failure.
let pagparams = DataPageParams {
marker: None,
direction: PaginationOrder::Ascending,
limit: std::num::NonZeroU32::new(100).unwrap(),
};
let producers = self
.db_datastore
.producers_list_by_oximeter_id(
oximeter_info.collector_id,
&pagparams,
)
.await?;
if !producers.is_empty() {
// Regardless, notify the collector of any assigned metric producers.
//
// This should be empty if this Oximeter collector is registering for
// the first time, but may not be if the service is re-registering after
// failure.
let client = self.build_oximeter_client(
&oximeter_info.collector_id,
oximeter_info.address,
);
let mut last_producer_id = None;
loop {
let pagparams = DataPageParams {
marker: last_producer_id.as_ref(),
direction: PaginationOrder::Ascending,
limit: std::num::NonZeroU32::new(100).unwrap(),
};
let producers = self
.db_datastore
.producers_list_by_oximeter_id(
oximeter_info.collector_id,
&pagparams,
)
.await?;
if producers.is_empty() {
return Ok(());
}
debug!(
self.log,
"registered oximeter collector that is already assigned producers, re-assigning them to the collector";
"re-assigning existing metric producers to a collector";
"n_producers" => producers.len(),
"collector_id" => ?oximeter_info.collector_id,
);
let client = self.build_oximeter_client(
&oximeter_info.collector_id,
oximeter_info.address,
);
// Be sure to continue paginating from the last producer.
//
// Safety: We check just above if the list is empty, so there is a
// last element.
last_producer_id.replace(producers.last().unwrap().id());
for producer in producers.into_iter() {
let producer_info = oximeter_client::types::ProducerEndpoint {
id: producer.id(),
Expand All @@ -132,7 +143,6 @@ impl super::Nexus {
.map_err(Error::from)?;
}
}
Ok(())
}

/// Register as a metric producer with the oximeter metric collection server.
Expand Down
90 changes: 90 additions & 0 deletions nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

//! Integration tests for oximeter collectors and producers.
use dropshot::Method;
use http::StatusCode;
use nexus_test_interface::NexusServer;
use nexus_test_utils_macros::nexus_test;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
use oximeter_db::DbWrite;
use std::collections::BTreeSet;
use std::net;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -332,3 +338,87 @@ async fn test_oximeter_reregistration() {
);
context.teardown().await;
}

// A regression test for https://github.com/oxidecomputer/omicron/issues/4498
#[tokio::test]
async fn test_oximeter_collector_reregistration_gets_all_assignments() {
let mut context = nexus_test_utils::test_setup::<omicron_nexus::Server>(
"test_oximeter_collector_reregistration_gets_all_assignments",
)
.await;
let oximeter_id = nexus_test_utils::OXIMETER_UUID.parse().unwrap();

// Create a bunch of producer records.
//
// Note that the actual count is arbitrary, but it should be larger than the
// internal pagination limit used in `Nexus::upsert_oximeter_collector()`,
// which is currently 100.
const N_PRODUCERS: usize = 150;
let mut ids = BTreeSet::new();
for _ in 0..N_PRODUCERS {
let id = Uuid::new_v4();
ids.insert(id);
let info = ProducerEndpoint {
id,
address: SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 12345),
base_route: String::from("/collect"),
interval: Duration::from_secs(1),
};
context
.internal_client
.make_request(
Method::POST,
"/metrics/producers",
Some(&info),
StatusCode::NO_CONTENT,
)
.await
.expect("failed to register test producer");
}

// Check that `oximeter` has these registered.
let producers =
context.oximeter.list_producers(None, N_PRODUCERS * 2).await;
let actual_ids: BTreeSet<_> =
producers.iter().map(|info| info.id).collect();

// There is an additional producer that's created as part of the normal test
// setup, so we'll check that all of the new producers exist, and that
// there's exactly 1 additional one.
assert!(
ids.is_subset(&actual_ids),
"oximeter did not get the right set of producers"
);
assert_eq!(
ids.len(),
actual_ids.len() - 1,
"oximeter did not get the right set of producers"
);

// Drop and restart oximeter, which should result in the exact same set of
// producers again.
drop(context.oximeter);
context.oximeter = nexus_test_utils::start_oximeter(
context.logctx.log.new(o!("component" => "oximeter")),
context.server.get_http_server_internal_address().await,
context.clickhouse.port(),
oximeter_id,
)
.await
.expect("failed to restart oximeter");

let producers =
context.oximeter.list_producers(None, N_PRODUCERS * 2).await;
let actual_ids: BTreeSet<_> =
producers.iter().map(|info| info.id).collect();
assert!(
ids.is_subset(&actual_ids),
"oximeter did not get the right set of producers after re-registering"
);
assert_eq!(
ids.len(),
actual_ids.len() - 1,
"oximeter did not get the right set of producers after re-registering"
);
context.teardown().await;
}

0 comments on commit df14cd6

Please sign in to comment.