Skip to content

Commit

Permalink
Don't send producers to oximeter when it registers.
Browse files Browse the repository at this point in the history
The oximeter collector already requests all its producers whenever it
restarts (and periodically to stay up-to-date). Let that mechanism do
the work, rather than making many small requests from Nexus to the
collector when it registers.
  • Loading branch information
bnaecker committed Mar 26, 2024
1 parent f44da6f commit 633eb28
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 73 deletions.
62 changes: 1 addition & 61 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ use internal_dns::resolver::{ResolveError, Resolver};
use internal_dns::ServiceName;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::identity::Asset;
use omicron_common::address::CLICKHOUSE_PORT;
use omicron_common::api::external::Error;
use omicron_common::api::external::PaginationOrder;
use omicron_common::api::external::{DataPageParams, ListResultVec};
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use omicron_common::backoff;
Expand Down Expand Up @@ -88,65 +86,7 @@ impl super::Nexus {
"collector_id" => ?oximeter_info.collector_id,
"address" => oximeter_info.address,
);

// Regardless, notify the collector of any assigned metric producers.
//
// This should be empty if this Oximeter collector is registering for
// the first time, but may not be if the service is re-registering after
// failure.
let client = self.build_oximeter_client(
&oximeter_info.collector_id,
oximeter_info.address,
);
let mut last_producer_id = None;
loop {
let pagparams = DataPageParams {
marker: last_producer_id.as_ref(),
direction: PaginationOrder::Ascending,
limit: std::num::NonZeroU32::new(100).unwrap(),
};
let producers = self
.db_datastore
.producers_list_by_oximeter_id(
opctx,
oximeter_info.collector_id,
&pagparams,
)
.await?;
if producers.is_empty() {
return Ok(());
}
debug!(
self.log,
"re-assigning existing metric producers to a collector";
"n_producers" => producers.len(),
"collector_id" => ?oximeter_info.collector_id,
);
// Be sure to continue paginating from the last producer.
//
// Safety: We check just above if the list is empty, so there is a
// last element.
last_producer_id.replace(producers.last().unwrap().id());
for producer in producers.into_iter() {
let producer_info = oximeter_client::types::ProducerEndpoint {
id: producer.id(),
kind: nexus::ProducerKind::from(producer.kind).into(),
address: SocketAddr::new(
producer.ip.ip(),
producer.port.try_into().unwrap(),
)
.to_string(),
base_route: producer.base_route,
interval: oximeter_client::types::Duration::from(
Duration::from_secs_f64(producer.interval),
),
};
client
.producers_post(&producer_info)
.await
.map_err(Error::from)?;
}
}
Ok(())
}

/// List the producers assigned to an oximeter collector.
Expand Down
27 changes: 15 additions & 12 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ pub struct OximeterAgent {
Arc<Mutex<BTreeMap<Uuid, (ProducerEndpoint, CollectionTask)>>>,
// The interval on which we refresh our list of producers from Nexus
refresh_interval: Duration,
// Handle to the task used to periodically refresh the list of producers.
refresh_task: Arc<StdMutex<Option<tokio::task::JoinHandle<()>>>>,
/// The last time we've refreshed our list of producers from Nexus.
pub last_refresh_time: Arc<StdMutex<Option<DateTime<Utc>>>>,
}
Expand Down Expand Up @@ -458,23 +460,24 @@ impl OximeterAgent {
result_sender,
collection_tasks: Arc::new(Mutex::new(BTreeMap::new())),
refresh_interval,
refresh_task: Arc::new(StdMutex::new(None)),
last_refresh_time: Arc::new(StdMutex::new(None)),
};

// And spawn our task for periodically updating our list of producers
// from Nexus.
//
// This is part of a coordination mechansim between Nexus, the
// producers, and us, to ensure that we have a reasonably up-to-date
// list of producers. Producers are required to re-register periodically
// with Nexus as a deadman -- if they fail to do so, Nexus will remove
// them. We fetch the list every so often to make sure that gets to us
// too.
tokio::spawn(refresh_producer_list(self_.clone(), resolver.clone()));

Ok(self_)
}

/// Ensure the backgrouund task that polls Nexus periodically for our list of
/// assigned producers is running.
pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) {
let mut task = self.refresh_task.lock().unwrap();
if task.is_none() {
let refresh_task =
tokio::spawn(refresh_producer_list(self.clone(), resolver));
*task = Some(refresh_task);
}
}

/// Construct a new standalone `oximeter` collector.
///
/// In this mode, `oximeter` can be used to test the collection of metrics
Expand Down Expand Up @@ -548,6 +551,7 @@ impl OximeterAgent {
result_sender,
collection_tasks: Arc::new(Mutex::new(BTreeMap::new())),
refresh_interval,
refresh_task: Arc::new(StdMutex::new(None)),
last_refresh_time,
})
}
Expand Down Expand Up @@ -742,7 +746,6 @@ impl OximeterAgent {
// A task which periodically updates our list of producers from Nexus.
async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) {
let mut interval = tokio::time::interval(agent.refresh_interval);
interval.tick().await; // Completes immediately.
let page_size = Some(NonZeroU32::new(100).unwrap());
loop {
interval.tick().await;
Expand Down
4 changes: 4 additions & 0 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ impl Oximeter {
.await
.expect("Expected an infinite retry loop contacting Nexus");

// Now that we've successfully registered, we'll start periodically
// polling for our list of producers from Nexus.
agent.ensure_producer_refresh_task(resolver);

info!(log, "oximeter registered with nexus"; "id" => ?agent.id);
Ok(Self { agent, server })
}
Expand Down

0 comments on commit 633eb28

Please sign in to comment.