Skip to content

Commit

Permalink
Refresh oximeter producer list more aggressively (#6926)
Browse files Browse the repository at this point in the history
- Remove `oximeter` producer HTTP endpoint for registering individual
producers.
- Dramatically reduce interval on which `oximeter` collector refreshes
its list of producers. This is now the only way the collector learns of
producers. The interval is also much smaller in tests to ensure pretty
snappy registrations
- Remove calls from both Nexus and the `oximeter` standalone mock Nexus
for registering producers
- Have `oximeter` collector start polling producers immediately, rather
than waiting for the first polling interval to expire.
- Closes #6916, #6895, and #6901
  • Loading branch information
bnaecker authored Oct 25, 2024
1 parent 78ee981 commit 62916d4
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 117 deletions.
18 changes: 3 additions & 15 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use oximeter_client::Client as OximeterClient;
use oximeter_db::query::Timestamp;
use oximeter_db::Measurement;
use std::convert::TryInto;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::time::Duration;
Expand Down Expand Up @@ -62,6 +61,9 @@ impl super::Nexus {
}

/// Assign a newly-registered metric producer to an oximeter collector server.
///
/// Note that we don't send the registration to the collector, the collector
/// polls for its list of producers periodically.
pub(crate) async fn assign_producer(
&self,
opctx: &OpContext,
Expand All @@ -71,20 +73,6 @@ impl super::Nexus {
.db_datastore
.producer_endpoint_upsert_and_assign(opctx, &producer_info)
.await?;

let address = SocketAddr::from((
collector_info.ip.ip(),
collector_info.port.try_into().unwrap(),
));
let collector =
build_oximeter_client(&self.log, &collector_info.id, address);

collector
.producers_post(&oximeter_client::types::ProducerEndpoint::from(
&producer_info,
))
.await
.map_err(Error::from)?;
info!(
self.log,
"assigned collector to new producer";
Expand Down
6 changes: 5 additions & 1 deletion nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,11 @@ pub async fn start_oximeter(
let config = oximeter_collector::Config {
nexus_address: Some(nexus_address),
db,
refresh_interval: oximeter_collector::default_refresh_interval(),
// The collector only learns about producers when it refreshes its list
// from Nexus. This interval is quite short, and much smaller than the
// one we use in production. That's important for test latency, but not
// strictly required for correctness.
refresh_interval: Duration::from_secs(2),
log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error },
};
let args = oximeter_collector::OximeterArguments {
Expand Down
10 changes: 7 additions & 3 deletions nexus/tests/integration_tests/disks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

//! Tests basic disk support in the API
use crate::integration_tests::metrics::wait_for_producer;

use super::instances::instance_wait_for_state;
use super::metrics::{get_latest_silo_metric, query_for_metrics};
use chrono::Utc;
Expand Down Expand Up @@ -34,14 +36,14 @@ use nexus_test_utils::SLED_AGENT_UUID;
use nexus_test_utils_macros::nexus_test;
use nexus_types::external_api::params;
use nexus_types::silo::DEFAULT_SILO_ID;
use omicron_common::api::external::ByteCount;
use omicron_common::api::external::Disk;
use omicron_common::api::external::DiskState;
use omicron_common::api::external::IdentityMetadataCreateParams;
use omicron_common::api::external::Instance;
use omicron_common::api::external::InstanceState;
use omicron_common::api::external::Name;
use omicron_common::api::external::NameOrId;
use omicron_common::api::external::{ByteCount, SimpleIdentity as _};
use omicron_nexus::app::{MAX_DISK_SIZE_BYTES, MIN_DISK_SIZE_BYTES};
use omicron_nexus::Nexus;
use omicron_nexus::TestInterfaces as _;
Expand Down Expand Up @@ -1802,7 +1804,6 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) {
DiskTest::new(&cptestctx).await;
let project_id = create_project_and_pool(client).await;
let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await;
oximeter.force_collect().await;

// When grabbing a metric, we look for data points going back to the
// start of this test all the way up to the current time.
Expand All @@ -1826,6 +1827,7 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) {
.await;
assert!(measurements.items.is_empty());

oximeter.force_collect().await;
assert_eq!(
get_latest_silo_metric(
cptestctx,
Expand All @@ -1838,6 +1840,7 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) {

// Create an instance, attach the disk to it.
create_instance_with_disk(client).await;
wait_for_producer(&cptestctx.oximeter, disk.id()).await;
oximeter.force_collect().await;

for metric in &ALL_METRICS {
Expand Down Expand Up @@ -1870,8 +1873,9 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) {
let client = &cptestctx.external_client;
DiskTest::new(&cptestctx).await;
create_project_and_pool(client).await;
create_disk(&client, PROJECT_NAME, DISK_NAME).await;
let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await;
create_instance_with_disk(client).await;
wait_for_producer(&cptestctx.oximeter, disk.id()).await;

let oximeter = &cptestctx.oximeter;
oximeter.force_collect().await;
Expand Down
25 changes: 0 additions & 25 deletions openapi/oximeter.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,6 @@
"x-dropshot-pagination": {
"required": []
}
},
"post": {
"summary": "Handle a request from Nexus to register a new producer with this collector.",
"operationId": "producers_post",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProducerEndpoint"
}
}
},
"required": true
},
"responses": {
"204": {
"description": "resource updated"
},
"4XX": {
"$ref": "#/components/responses/Error"
},
"5XX": {
"$ref": "#/components/responses/Error"
}
}
}
},
"/producers/{producer_id}": {
Expand Down
13 changes: 1 addition & 12 deletions oximeter/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
use chrono::{DateTime, Utc};
use dropshot::{
EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk,
HttpResponseUpdatedNoContent, PaginationParams, Query, RequestContext,
ResultsPage, TypedBody,
PaginationParams, Query, RequestContext, ResultsPage,
};
use omicron_common::api::internal::nexus::ProducerEndpoint;
use schemars::JsonSchema;
Expand All @@ -17,16 +16,6 @@ use uuid::Uuid;
pub trait OximeterApi {
type Context;

/// Handle a request from Nexus to register a new producer with this collector.
#[endpoint {
method = POST,
path = "/producers",
}]
async fn producers_post(
request_context: RequestContext<Self::Context>,
body: TypedBody<ProducerEndpoint>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// List all producers.
#[endpoint {
method = GET,
Expand Down
4 changes: 1 addition & 3 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ async fn collection_task(
let mut log = orig_log.new(o!("address" => producer.address));
let client = reqwest::Client::new();
let mut collection_timer = interval(producer.interval);
collection_timer.tick().await; // completes immediately
debug!(
log,
"starting oximeter collection task";
Expand Down Expand Up @@ -885,8 +884,7 @@ mod tests {
// Period these tests wait using `tokio::time::advance()` before checking
// their test conditions.
const TEST_WAIT_PERIOD: Duration = Duration::from_millis(
COLLECTION_INTERVAL.as_millis() as u64 * N_COLLECTIONS
+ COLLECTION_INTERVAL.as_millis() as u64 / 2,
COLLECTION_INTERVAL.as_millis() as u64 * N_COLLECTIONS,
);

// Test that we count successful collections from a target correctly.
Expand Down
17 changes: 1 addition & 16 deletions oximeter/collector/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@

//! Oximeter collector server HTTP API
// Copyright 2023 Oxide Computer Company
// Copyright 2024 Oxide Computer Company

use crate::OximeterAgent;
use dropshot::ApiDescription;
use dropshot::EmptyScanParams;
use dropshot::HttpError;
use dropshot::HttpResponseDeleted;
use dropshot::HttpResponseOk;
use dropshot::HttpResponseUpdatedNoContent;
use dropshot::PaginationParams;
use dropshot::Query;
use dropshot::RequestContext;
use dropshot::ResultsPage;
use dropshot::TypedBody;
use dropshot::WhichPage;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use oximeter_api::*;
Expand All @@ -34,19 +32,6 @@ enum OximeterApiImpl {}
impl OximeterApi for OximeterApiImpl {
type Context = Arc<OximeterAgent>;

async fn producers_post(
request_context: RequestContext<Self::Context>,
body: TypedBody<ProducerEndpoint>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let agent = request_context.context();
let producer_info = body.into_inner();
agent
.register_producer(producer_info)
.await
.map_err(HttpError::from)
.map(|_| HttpResponseUpdatedNoContent())
}

async fn producers_list(
request_context: RequestContext<Arc<OximeterAgent>>,
query: Query<PaginationParams<EmptyScanParams, ProducerPage>>,
Expand Down
2 changes: 1 addition & 1 deletion oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl DbConfig {

/// Default interval on which we refresh our list of producers from Nexus.
pub const fn default_refresh_interval() -> Duration {
Duration::from_secs(60 * 10)
Duration::from_secs(15)
}

/// Configuration used to initialize an oximeter server
Expand Down
47 changes: 6 additions & 41 deletions oximeter/collector/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use nexus_types::internal_api::params::OximeterInfo;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::api::internal::nexus::ProducerRegistrationResponse;
use omicron_common::FileKv;
use oximeter_client::Client;
use rand::seq::IteratorRandom;
use slog::debug;
use slog::error;
Expand Down Expand Up @@ -104,21 +103,14 @@ impl StandaloneNexus {
//
// Select a random collector, and assign it to the producer.
// We'll return the assignment from this match block.
let Some((collector_id, collector_info)) =
let Some((collector_id, _collector_info)) =
inner.random_collector()
else {
return Err(HttpError::for_unavail(
None,
String::from("No collectors available"),
));
};
let client = Client::new(
format!("http://{}", collector_info.address).as_str(),
self.log.clone(),
);
client.producers_post(&info.into()).await.map_err(|e| {
HttpError::for_internal_error(e.to_string())
})?;
let assignment =
ProducerAssignment { producer: info.clone(), collector_id };
assignment
Expand All @@ -131,18 +123,9 @@ impl StandaloneNexus {
}

// This appears to be a re-registration, e.g., the producer
// changed its IP address. Re-register it with the collector to
// which it's already assigned.
// changed its IP address. The collector will learn of this when
// it next fetches its list.
let collector_id = existing_assignment.collector_id;
let collector_info =
inner.collectors.get(&collector_id).unwrap();
let client = Client::new(
format!("http://{}", collector_info.address).as_str(),
self.log.clone(),
);
client.producers_post(&info.into()).await.map_err(|e| {
HttpError::for_internal_error(e.to_string())
})?;
ProducerAssignment { producer: info.clone(), collector_id }
}
};
Expand All @@ -154,27 +137,9 @@ impl StandaloneNexus {
&self,
info: OximeterInfo,
) -> Result<(), HttpError> {
// If this is being registered again, send all its assignments again.
let mut inner = self.inner.lock().await;
if inner.collectors.insert(info.collector_id, info).is_some() {
let client = Client::new(
format!("http://{}", info.address).as_str(),
self.log.clone(),
);
for producer_info in
inner.producers.values().filter_map(|assignment| {
if assignment.collector_id == info.collector_id {
Some(&assignment.producer)
} else {
None
}
})
{
client.producers_post(&producer_info.into()).await.map_err(
|e| HttpError::for_internal_error(e.to_string()),
)?;
}
}
// No-op if this is being re-registered. It will fetch its list of
// producers again if needed.
self.inner.lock().await.collectors.insert(info.collector_id, info);
Ok(())
}
}
Expand Down

0 comments on commit 62916d4

Please sign in to comment.