From 54bda13797c52d9c92e646ec3cf9f4520188a8a5 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 26 Oct 2023 21:16:58 +0000 Subject: [PATCH] Report statistics about oximeter collections - Some oximeter collector crate reorg and cleanup - Add target representing an oximeter collector service itself - Add metrics representing the number of successful collections, and number of failed collections broken out by a few reasons --- Cargo.lock | 2 + oximeter/collector/Cargo.toml | 2 + oximeter/collector/src/agent.rs | 612 ++++++++++++++++++ oximeter/collector/src/http_entrypoints.rs | 133 ++++ oximeter/collector/src/lib.rs | 714 +-------------------- oximeter/collector/src/self_stats.rs | 154 +++++ 6 files changed, 929 insertions(+), 688 deletions(-) create mode 100644 oximeter/collector/src/agent.rs create mode 100644 oximeter/collector/src/http_entrypoints.rs create mode 100644 oximeter/collector/src/self_stats.rs diff --git a/Cargo.lock b/Cargo.lock index 5c51c23a10..055db6079a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5804,6 +5804,7 @@ version = "0.1.0" dependencies = [ "anyhow", "camino", + "chrono", "clap 4.4.3", "dropshot", "expectorate", @@ -5828,6 +5829,7 @@ dependencies = [ "slog-async", "slog-dtrace", "slog-term", + "strum", "subprocess", "thiserror", "tokio", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index ad0ae8e330..43c90ddf02 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -8,6 +8,7 @@ license = "MPL-2.0" [dependencies] anyhow.workspace = true camino.workspace = true +chrono.workspace = true clap.workspace = true dropshot.workspace = true futures.workspace = true @@ -26,6 +27,7 @@ slog.workspace = true slog-async.workspace = true slog-dtrace.workspace = true slog-term.workspace = true +strum.workspace = true thiserror.workspace = true tokio.workspace = true toml.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs new file mode 100644 index 0000000000..c49a12196e --- /dev/null +++ b/oximeter/collector/src/agent.rs @@ -0,0 +1,612 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! The oximeter agent handles collection tasks for each producer. + +// Copyright 2023 Oxide Computer Company + +use crate::self_stats; +use crate::DbConfig; +use crate::Error; +use crate::ProducerEndpoint; +use anyhow::anyhow; +use internal_dns::resolver::Resolver; +use internal_dns::ServiceName; +use omicron_common::address::CLICKHOUSE_PORT; +use oximeter::types::ProducerResults; +use oximeter::types::ProducerResultsItem; +use oximeter_db::Client; +use oximeter_db::DbWrite; +use slog::debug; +use slog::error; +use slog::info; +use slog::o; +use slog::trace; +use slog::warn; +use slog::Logger; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::net::SocketAddrV6; +use std::ops::Bound; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time::interval; +use uuid::Uuid; + +type CollectionToken = oneshot::Sender<()>; + +// Messages for controlling a collection task +#[derive(Debug)] +enum CollectionMessage { + // Explicit request that the task collect data from its producer + // + // Also sends a oneshot that is signalled once the task scrapes + // data from the Producer, and places it in the Clickhouse server. + Collect(CollectionToken), + // Request that the task update its interval and the socket address on which it collects data + // from its producer. + Update(ProducerEndpoint), + // Request that the task exit + Shutdown, +} + +async fn perform_collection( + log: &Logger, + self_target: &mut self_stats::CollectionTaskStats, + client: &reqwest::Client, + producer: &ProducerEndpoint, + outbox: &mpsc::Sender<(Option, ProducerResults)>, + token: Option, +) { + debug!(log, "collecting from producer"); + let res = client + .get(format!( + "http://{}{}", + producer.address, + producer.collection_route() + )) + .send() + .await; + match res { + Ok(res) => { + if res.status().is_success() { + match res.json::().await { + Ok(results) => { + debug!( + log, + "collected results from producer"; + "n_results" => results.len() + ); + self_target.collections.datum.increment(); + outbox.send((token, results)).await.unwrap(); + } + Err(e) => { + warn!( + log, + "failed to collect results from producer"; + "error" => ?e, + ); + self_target + .failures_for_reason( + self_stats::FailureReason::Deserialization, + ) + .datum + .increment() + } + } + } else { + warn!( + log, + "failed to receive metric results from producer"; + "status_code" => res.status().as_u16(), + ); + self_target + .failures_for_reason( + self_stats::FailureReason::Deserialization, + ) + .datum + .increment() + } + } + Err(e) => { + warn!( + log, + "failed to send collection request to producer"; + "error" => ?e + ); + } + } +} + +// Background task used to collect metrics from one producer on an interval. +// +// This function is started by the `OximeterAgent`, when a producer is registered. The task loops +// endlessly, and collects metrics from the assigned producer on a timeout. The assigned agent can +// also send a `CollectionMessage`, for example to update the collection interval. This is not +// currently used, but will likely be exposed via control plane interfaces in the future. +async fn collection_task( + log: Logger, + collector: self_stats::OximeterCollector, + mut producer: ProducerEndpoint, + mut inbox: mpsc::Receiver, + outbox: mpsc::Sender<(Option, ProducerResults)>, +) { + let client = reqwest::Client::new(); + let mut collection_timer = interval(producer.interval); + collection_timer.tick().await; // completes immediately + debug!( + log, + "starting oximeter collection task"; + "interval" => ?producer.interval, + ); + + // Set up the collection of self statistics about this collection task. + let mut stats = self_stats::CollectionTaskStats::new(collector, &producer); + let mut self_collection_timer = interval(self_stats::COLLECTION_INTERVAL); + self_collection_timer.tick().await; + + loop { + tokio::select! { + message = inbox.recv() => { + match message { + None => { + debug!(log, "collection task inbox closed, shutting down"); + return; + } + Some(CollectionMessage::Shutdown) => { + debug!(log, "collection task received shutdown request"); + return; + }, + Some(CollectionMessage::Collect(token)) => { + debug!(log, "collection task received explicit request to collect"); + perform_collection(&log, &mut stats, &client, &producer, &outbox, Some(token)).await; + }, + Some(CollectionMessage::Update(new_info)) => { + producer = new_info; + debug!( + log, + "collection task received request to update its producer information"; + "interval" => ?producer.interval, + "address" => producer.address, + ); + collection_timer = interval(producer.interval); + collection_timer.tick().await; // completes immediately + } + } + } + _ = self_collection_timer.tick() => { + debug!(log, "reporting oximeter self-collection statistics"); + outbox.send((None, stats.sample())).await.unwrap(); + } + _ = collection_timer.tick() => { + perform_collection(&log, &mut stats, &client, &producer, &outbox, None).await; + } + } + } +} + +// Struct representing a task for collecting metric data from a single producer +#[derive(Debug)] +struct CollectionTask { + // Channel used to send messages from the agent to the actual task. The task owns the other + // side. + pub inbox: mpsc::Sender, + // Handle to the actual tokio task running the collection loop. + #[allow(dead_code)] + pub task: JoinHandle<()>, +} + +// A task run by `oximeter` in standalone mode, which simply prints results as +// they're received. +async fn results_printer( + log: Logger, + mut rx: mpsc::Receiver<(Option, ProducerResults)>, +) { + loop { + match rx.recv().await { + Some((_, results)) => { + for res in results.into_iter() { + match res { + ProducerResultsItem::Ok(samples) => { + for sample in samples.into_iter() { + info!( + log, + ""; + "sample" => ?sample, + ); + } + } + ProducerResultsItem::Err(e) => { + error!( + log, + "received error from a producer"; + "err" => ?e, + ); + } + } + } + } + None => { + debug!(log, "result queue closed, exiting"); + return; + } + } + } +} + +// Aggregation point for all results, from all collection tasks. +async fn results_sink( + log: Logger, + client: Client, + batch_size: usize, + batch_interval: Duration, + mut rx: mpsc::Receiver<(Option, ProducerResults)>, +) { + let mut timer = interval(batch_interval); + timer.tick().await; // completes immediately + let mut batch = Vec::with_capacity(batch_size); + loop { + let mut collection_token = None; + let insert = tokio::select! { + _ = timer.tick() => { + if batch.is_empty() { + trace!(log, "batch interval expired, but no samples to insert"); + false + } else { + true + } + } + results = rx.recv() => { + match results { + Some((token, results)) => { + let flattened_results = { + let mut flattened = Vec::with_capacity(results.len()); + for inner_batch in results.into_iter() { + match inner_batch { + ProducerResultsItem::Ok(samples) => flattened.extend(samples.into_iter()), + ProducerResultsItem::Err(e) => { + debug!( + log, + "received error (not samples) from a producer: {}", + e.to_string() + ); + } + } + } + flattened + }; + batch.extend(flattened_results); + + collection_token = token; + if collection_token.is_some() { + true + } else { + batch.len() >= batch_size + } + } + None => { + warn!(log, "result queue closed, exiting"); + return; + } + } + } + }; + + if insert { + debug!(log, "inserting {} samples into database", batch.len()); + match client.insert_samples(&batch).await { + Ok(()) => trace!(log, "successfully inserted samples"), + Err(e) => { + warn!( + log, + "failed to insert some results into metric DB: {}", + e.to_string() + ); + } + } + // TODO-correctness The `insert_samples` call above may fail. The method itself needs + // better handling of partially-inserted results in that case, but we may need to retry + // or otherwise handle an error here as well. + batch.clear(); + } + + if let Some(token) = collection_token { + let _ = token.send(()); + } + } +} + +/// The internal agent the oximeter server uses to collect metrics from producers. +#[derive(Debug)] +pub struct OximeterAgent { + /// The collector ID for this agent + pub id: Uuid, + log: Logger, + // Oximeter target used by this agent to produce metrics about itself. + collection_target: self_stats::OximeterCollector, + // Handle to the TX-side of a channel for collecting results from the collection tasks + result_sender: mpsc::Sender<(Option, ProducerResults)>, + // The actual tokio tasks running the collection on a timer. + collection_tasks: + Arc>>, +} + +impl OximeterAgent { + /// Construct a new agent with the given ID and logger. + pub async fn with_id( + id: Uuid, + address: SocketAddrV6, + db_config: DbConfig, + resolver: &Resolver, + log: &Logger, + ) -> Result { + let (result_sender, result_receiver) = mpsc::channel(8); + let log = log.new(o!( + "component" => "oximeter-agent", + "collector_id" => id.to_string(), + )); + let insertion_log = log.new(o!("component" => "results-sink")); + + // Construct the ClickHouse client first, propagate an error if we can't reach the + // database. + let db_address = if let Some(address) = db_config.address { + address + } else { + SocketAddr::new( + resolver.lookup_ip(ServiceName::Clickhouse).await?, + CLICKHOUSE_PORT, + ) + }; + + // Determine the version of the database. + // + // There are three cases + // + // - The database exists and is at the expected version. Continue in + // this case. + // + // - The database exists and is at a lower-than-expected version. We + // fail back to the caller here, which will retry indefinitely until the + // DB has been updated. + // + // - The DB doesn't exist at all. This reports a version number of 0. We + // need to create the DB here, at the latest version. This is used in + // fresh installations and tests. + let client = Client::new(db_address, &log); + match client.check_db_is_at_expected_version().await { + Ok(_) => {} + Err(oximeter_db::Error::DatabaseVersionMismatch { + found: 0, + .. + }) => { + debug!(log, "oximeter database does not exist, creating"); + let replicated = client.is_oximeter_cluster().await?; + client + .initialize_db_with_version( + replicated, + oximeter_db::OXIMETER_VERSION, + ) + .await?; + } + Err(e) => return Err(Error::from(e)), + } + + // Set up tracking of statistics about ourselves. + let collection_target = self_stats::OximeterCollector { + collector_id: id, + collector_ip: (*address.ip()).into(), + collector_port: address.port(), + }; + + // Spawn the task for aggregating and inserting all metrics + tokio::spawn(async move { + results_sink( + insertion_log, + client, + db_config.batch_size, + Duration::from_secs(db_config.batch_interval), + result_receiver, + ) + .await + }); + Ok(Self { + id, + log, + collection_target, + result_sender, + collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + }) + } + + /// Construct a new standalone `oximeter` collector. + pub async fn new_standalone( + id: Uuid, + address: SocketAddrV6, + db_config: Option, + log: &Logger, + ) -> Result { + let (result_sender, result_receiver) = mpsc::channel(8); + let log = log.new(o!( + "component" => "oximeter-standalone", + "collector_id" => id.to_string(), + )); + + // If we have configuration for ClickHouse, we'll spawn the results + // sink task as usual. If not, we'll spawn a dummy task that simply + // prints the results as they're received. + let insertion_log = log.new(o!("component" => "results-sink")); + if let Some(db_config) = db_config { + let Some(address) = db_config.address else { + return Err(Error::Standalone(anyhow!( + "Must provide explicit IP address in standalone mode" + ))); + }; + let client = Client::new(address, &log); + let replicated = client.is_oximeter_cluster().await?; + if !replicated { + client.init_single_node_db().await?; + } else { + client.init_replicated_db().await?; + } + + // Spawn the task for aggregating and inserting all metrics + tokio::spawn(async move { + results_sink( + insertion_log, + client, + db_config.batch_size, + Duration::from_secs(db_config.batch_interval), + result_receiver, + ) + .await + }); + } else { + tokio::spawn(results_printer(insertion_log, result_receiver)); + } + + // Set up tracking of statistics about ourselves. + let collection_target = self_stats::OximeterCollector { + collector_id: id, + collector_ip: (*address.ip()).into(), + collector_port: address.port(), + }; + + // Construct the ClickHouse client first, propagate an error if we can't reach the + // database. + Ok(Self { + id, + log, + collection_target, + result_sender, + collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + }) + } + + /// Register a new producer with this oximeter instance. + pub async fn register_producer( + &self, + info: ProducerEndpoint, + ) -> Result<(), Error> { + let id = info.id; + match self.collection_tasks.lock().await.entry(id) { + Entry::Vacant(value) => { + debug!( + self.log, + "registered new metric producer"; + "producer_id" => id.to_string(), + "address" => info.address, + ); + + // Build channel to control the task and receive results. + let (tx, rx) = mpsc::channel(4); + let q = self.result_sender.clone(); + let log = self.log.new(o!("component" => "collection-task", "producer_id" => id.to_string())); + let info_clone = info.clone(); + let target = self.collection_target; + let task = tokio::spawn(async move { + collection_task(log, target, info_clone, rx, q).await; + }); + value.insert((info, CollectionTask { inbox: tx, task })); + } + Entry::Occupied(mut value) => { + debug!( + self.log, + "received request to register existing metric \ + producer, updating collection information"; + "producer_id" => id.to_string(), + "interval" => ?info.interval, + "address" => info.address, + ); + value.get_mut().0 = info.clone(); + value + .get() + .1 + .inbox + .send(CollectionMessage::Update(info)) + .await + .unwrap(); + } + } + Ok(()) + } + + /// Forces a collection from all producers. + /// + /// Returns once all those values have been inserted into Clickhouse, + /// or an error occurs trying to perform the collection. + pub async fn force_collection(&self) { + let mut collection_oneshots = vec![]; + let collection_tasks = self.collection_tasks.lock().await; + for (_id, (_endpoint, task)) in collection_tasks.iter() { + let (tx, rx) = oneshot::channel(); + // Scrape from each producer, into oximeter... + task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); + // ... and keep track of the token that indicates once the metric + // has made it into Clickhouse. + collection_oneshots.push(rx); + } + drop(collection_tasks); + + // Only return once all producers finish processing the token we + // provided. + // + // NOTE: This can either mean that the collection completed + // successfully, or an error occurred in the collection pathway. + futures::future::join_all(collection_oneshots).await; + } + + /// List existing producers. + pub async fn list_producers( + &self, + start_id: Option, + limit: usize, + ) -> Vec { + let start = if let Some(id) = start_id { + Bound::Excluded(id) + } else { + Bound::Unbounded + }; + self.collection_tasks + .lock() + .await + .range((start, Bound::Unbounded)) + .take(limit) + .map(|(_id, (info, _t))| info.clone()) + .collect() + } + + /// Delete a producer by ID, stopping its collection task. + pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { + let (_info, task) = self + .collection_tasks + .lock() + .await + .remove(&id) + .ok_or_else(|| Error::NoSuchProducer(id))?; + debug!( + self.log, + "removed collection task from set"; + "producer_id" => %id, + ); + match task.inbox.send(CollectionMessage::Shutdown).await { + Ok(_) => debug!( + self.log, + "shut down collection task"; + "producer_id" => %id, + ), + Err(e) => error!( + self.log, + "failed to shut down collection task"; + "producer_id" => %id, + "error" => ?e, + ), + } + Ok(()) + } +} diff --git a/oximeter/collector/src/http_entrypoints.rs b/oximeter/collector/src/http_entrypoints.rs new file mode 100644 index 0000000000..493083a40d --- /dev/null +++ b/oximeter/collector/src/http_entrypoints.rs @@ -0,0 +1,133 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Oximeter collector server HTTP API + +// Copyright 2023 Oxide Computer Company + +use crate::OximeterAgent; +use dropshot::endpoint; +use dropshot::ApiDescription; +use dropshot::EmptyScanParams; +use dropshot::HttpError; +use dropshot::HttpResponseDeleted; +use dropshot::HttpResponseOk; +use dropshot::HttpResponseUpdatedNoContent; +use dropshot::PaginationParams; +use dropshot::Query; +use dropshot::RequestContext; +use dropshot::ResultsPage; +use dropshot::TypedBody; +use dropshot::WhichPage; +use omicron_common::api::internal::nexus::ProducerEndpoint; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use std::sync::Arc; +use uuid::Uuid; + +// Build the HTTP API internal to the control plane +pub fn oximeter_api() -> ApiDescription> { + let mut api = ApiDescription::new(); + api.register(producers_post) + .expect("Could not register producers_post API handler"); + api.register(producers_list) + .expect("Could not register producers_list API handler"); + api.register(producer_delete) + .expect("Could not register producers_delete API handler"); + api.register(collector_info) + .expect("Could not register collector_info API handler"); + api +} + +// Handle a request from Nexus to register a new producer with this collector. +#[endpoint { + method = POST, + path = "/producers", +}] +async fn producers_post( + request_context: RequestContext>, + body: TypedBody, +) -> Result { + let agent = request_context.context(); + let producer_info = body.into_inner(); + agent + .register_producer(producer_info) + .await + .map_err(HttpError::from) + .map(|_| HttpResponseUpdatedNoContent()) +} + +// Parameters for paginating the list of producers. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +struct ProducerPage { + id: Uuid, +} + +// List all producers +#[endpoint { + method = GET, + path = "/producers", +}] +async fn producers_list( + request_context: RequestContext>, + query: Query>, +) -> Result>, HttpError> { + let agent = request_context.context(); + let pagination = query.into_inner(); + let limit = request_context.page_limit(&pagination)?.get() as usize; + let start = match &pagination.page { + WhichPage::First(..) => None, + WhichPage::Next(ProducerPage { id }) => Some(*id), + }; + let producers = agent.list_producers(start, limit).await; + ResultsPage::new( + producers, + &EmptyScanParams {}, + |info: &ProducerEndpoint, _| ProducerPage { id: info.id }, + ) + .map(HttpResponseOk) +} + +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +struct ProducerIdPathParams { + producer_id: Uuid, +} + +// Delete a producer by ID. +#[endpoint { + method = DELETE, + path = "/producers/{producer_id}", +}] +async fn producer_delete( + request_context: RequestContext>, + path: dropshot::Path, +) -> Result { + let agent = request_context.context(); + let producer_id = path.into_inner().producer_id; + agent + .delete_producer(producer_id) + .await + .map_err(HttpError::from) + .map(|_| HttpResponseDeleted()) +} + +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct CollectorInfo { + /// The collector's UUID. + pub id: Uuid, +} + +// Return identifying information about this collector +#[endpoint { + method = GET, + path = "/info", +}] +async fn collector_info( + request_context: RequestContext>, +) -> Result, HttpError> { + let agent = request_context.context(); + let info = CollectorInfo { id: agent.id }; + Ok(HttpResponseOk(info)) +} diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index ac5517205f..f3c793d5c2 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -6,64 +6,41 @@ // Copyright 2023 Oxide Computer Company -use anyhow::anyhow; -use anyhow::Context; -use dropshot::endpoint; -use dropshot::ApiDescription; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; -use dropshot::EmptyScanParams; use dropshot::HttpError; -use dropshot::HttpResponseDeleted; -use dropshot::HttpResponseOk; -use dropshot::HttpResponseUpdatedNoContent; use dropshot::HttpServer; use dropshot::HttpServerStarter; -use dropshot::PaginationParams; -use dropshot::Query; -use dropshot::RequestContext; -use dropshot::ResultsPage; -use dropshot::TypedBody; -use dropshot::WhichPage; use internal_dns::resolver::ResolveError; use internal_dns::resolver::Resolver; use internal_dns::ServiceName; -use omicron_common::address::CLICKHOUSE_PORT; use omicron_common::address::NEXUS_INTERNAL_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; -use oximeter::types::ProducerResults; -use oximeter::types::ProducerResultsItem; -use oximeter_db::Client; -use oximeter_db::DbWrite; use serde::Deserialize; use serde::Serialize; use slog::debug; use slog::error; use slog::info; use slog::o; -use slog::trace; use slog::warn; use slog::Drain; use slog::Logger; -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; -use std::ops::Bound; use std::path::Path; use std::sync::Arc; -use std::time::Duration; use thiserror::Error; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio::time::interval; use uuid::Uuid; +mod agent; +mod http_entrypoints; +mod self_stats; mod standalone; + +pub use agent::OximeterAgent; +pub use http_entrypoints::oximeter_api; pub use standalone::standalone_nexus_api; pub use standalone::Server as StandaloneNexus; @@ -101,289 +78,6 @@ impl From for HttpError { } } -/// A simple representation of a producer, used mostly for standalone mode. -/// -/// These are usually specified as a structured string, formatted like: -/// `"@
"`. -#[derive(Copy, Clone, Debug)] -pub struct ProducerInfo { - /// The ID of the producer. - pub id: Uuid, - /// The address on which the producer listens. - pub address: SocketAddr, -} - -impl std::str::FromStr for ProducerInfo { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - let (id, addr) = s - .split_once('@') - .context("Producer info should written as @
")?; - let id = id.parse().context("Invalid UUID")?; - let address = addr.parse().context("Invalid address")?; - Ok(Self { id, address }) - } -} - -type CollectionToken = oneshot::Sender<()>; - -// Messages for controlling a collection task -#[derive(Debug)] -enum CollectionMessage { - // Explicit request that the task collect data from its producer - // - // Also sends a oneshot that is signalled once the task scrapes - // data from the Producer, and places it in the Clickhouse server. - Collect(CollectionToken), - // Request that the task update its interval and the socket address on which it collects data - // from its producer. - Update(ProducerEndpoint), - // Request that the task exit - Shutdown, -} - -async fn perform_collection( - log: &Logger, - client: &reqwest::Client, - producer: &ProducerEndpoint, - outbox: &mpsc::Sender<(Option, ProducerResults)>, - token: Option, -) { - debug!(log, "collecting from producer"); - let res = client - .get(format!( - "http://{}{}", - producer.address, - producer.collection_route() - )) - .send() - .await; - match res { - Ok(res) => { - if res.status().is_success() { - match res.json::().await { - Ok(results) => { - debug!( - log, - "collected {} total results", - results.len(); - ); - outbox.send((token, results)).await.unwrap(); - } - Err(e) => { - warn!( - log, - "failed to collect results from producer: {}", - e.to_string(); - ); - } - } - } else { - warn!( - log, - "failed to receive metric results from producer"; - "status_code" => res.status().as_u16(), - ); - } - } - Err(e) => { - warn!( - log, - "failed to send collection request to producer: {}", - e.to_string(); - ); - } - } -} - -// Background task used to collect metrics from one producer on an interval. -// -// This function is started by the `OximeterAgent`, when a producer is registered. The task loops -// endlessly, and collects metrics from the assigned producer on a timeout. The assigned agent can -// also send a `CollectionMessage`, for example to update the collection interval. This is not -// currently used, but will likely be exposed via control plane interfaces in the future. -async fn collection_task( - log: Logger, - mut producer: ProducerEndpoint, - mut inbox: mpsc::Receiver, - outbox: mpsc::Sender<(Option, ProducerResults)>, -) { - let client = reqwest::Client::new(); - let mut collection_timer = interval(producer.interval); - collection_timer.tick().await; // completes immediately - debug!( - log, - "starting oximeter collection task"; - "interval" => ?producer.interval, - ); - - loop { - tokio::select! { - message = inbox.recv() => { - match message { - None => { - debug!(log, "collection task inbox closed, shutting down"); - return; - } - Some(CollectionMessage::Shutdown) => { - debug!(log, "collection task received shutdown request"); - return; - }, - Some(CollectionMessage::Collect(token)) => { - debug!(log, "collection task received explicit request to collect"); - perform_collection(&log, &client, &producer, &outbox, Some(token)).await; - }, - Some(CollectionMessage::Update(new_info)) => { - producer = new_info; - debug!( - log, - "collection task received request to update its producer information"; - "interval" => ?producer.interval, - "address" => producer.address, - ); - collection_timer = interval(producer.interval); - collection_timer.tick().await; // completes immediately - } - } - } - _ = collection_timer.tick() => { - perform_collection(&log, &client, &producer, &outbox, None).await; - } - } - } -} - -// Struct representing a task for collecting metric data from a single producer -#[derive(Debug)] -struct CollectionTask { - // Channel used to send messages from the agent to the actual task. The task owns the other - // side. - pub inbox: mpsc::Sender, - // Handle to the actual tokio task running the collection loop. - #[allow(dead_code)] - pub task: JoinHandle<()>, -} - -// A task run by `oximeter` in standalone mode, which simply prints results as -// they're received. -async fn results_printer( - log: Logger, - mut rx: mpsc::Receiver<(Option, ProducerResults)>, -) { - loop { - match rx.recv().await { - Some((_, results)) => { - for res in results.into_iter() { - match res { - ProducerResultsItem::Ok(samples) => { - for sample in samples.into_iter() { - info!( - log, - ""; - "sample" => ?sample, - ); - } - } - ProducerResultsItem::Err(e) => { - error!( - log, - "received error from a producer"; - "err" => ?e, - ); - } - } - } - } - None => { - debug!(log, "result queue closed, exiting"); - return; - } - } - } -} - -// Aggregation point for all results, from all collection tasks. -async fn results_sink( - log: Logger, - client: Client, - batch_size: usize, - batch_interval: Duration, - mut rx: mpsc::Receiver<(Option, ProducerResults)>, -) { - let mut timer = interval(batch_interval); - timer.tick().await; // completes immediately - let mut batch = Vec::with_capacity(batch_size); - loop { - let mut collection_token = None; - let insert = tokio::select! { - _ = timer.tick() => { - if batch.is_empty() { - trace!(log, "batch interval expired, but no samples to insert"); - false - } else { - true - } - } - results = rx.recv() => { - match results { - Some((token, results)) => { - let flattened_results = { - let mut flattened = Vec::with_capacity(results.len()); - for inner_batch in results.into_iter() { - match inner_batch { - ProducerResultsItem::Ok(samples) => flattened.extend(samples.into_iter()), - ProducerResultsItem::Err(e) => { - debug!( - log, - "received error (not samples) from a producer: {}", - e.to_string() - ); - } - } - } - flattened - }; - batch.extend(flattened_results); - - collection_token = token; - if collection_token.is_some() { - true - } else { - batch.len() >= batch_size - } - } - None => { - warn!(log, "result queue closed, exiting"); - return; - } - } - } - }; - - if insert { - debug!(log, "inserting {} samples into database", batch.len()); - match client.insert_samples(&batch).await { - Ok(()) => trace!(log, "successfully inserted samples"), - Err(e) => { - warn!( - log, - "failed to insert some results into metric DB: {}", - e.to_string() - ); - } - } - // TODO-correctness The `insert_samples` call above may fail. The method itself needs - // better handling of partially-inserted results in that case, but we may need to retry - // or otherwise handle an error here as well. - batch.clear(); - } - - if let Some(token) = collection_token { - let _ = token.send(()); - } - } -} - /// Configuration for interacting with the metric database. #[derive(Debug, Clone, Copy, Deserialize, Serialize)] pub struct DbConfig { @@ -402,7 +96,12 @@ pub struct DbConfig { } impl DbConfig { + /// Default number of samples to wait for before inserting a batch into + /// ClickHouse. pub const DEFAULT_BATCH_SIZE: usize = 1000; + + /// Default number of seconds to wait before inserting a batch into + /// ClickHouse. pub const DEFAULT_BATCH_INTERVAL: u64 = 5; // Construct config with an address, using the defaults for other fields @@ -415,274 +114,6 @@ impl DbConfig { } } -/// The internal agent the oximeter server uses to collect metrics from producers. -#[derive(Debug)] -pub struct OximeterAgent { - /// The collector ID for this agent - pub id: Uuid, - log: Logger, - // Handle to the TX-side of a channel for collecting results from the collection tasks - result_sender: mpsc::Sender<(Option, ProducerResults)>, - // The actual tokio tasks running the collection on a timer. - collection_tasks: - Arc>>, -} - -impl OximeterAgent { - /// Construct a new agent with the given ID and logger. - pub async fn with_id( - id: Uuid, - db_config: DbConfig, - resolver: &Resolver, - log: &Logger, - ) -> Result { - let (result_sender, result_receiver) = mpsc::channel(8); - let log = log.new(o!( - "component" => "oximeter-agent", - "collector_id" => id.to_string(), - )); - let insertion_log = log.new(o!("component" => "results-sink")); - - // Construct the ClickHouse client first, propagate an error if we can't reach the - // database. - let db_address = if let Some(address) = db_config.address { - address - } else { - SocketAddr::new( - resolver.lookup_ip(ServiceName::Clickhouse).await?, - CLICKHOUSE_PORT, - ) - }; - - // Determine the version of the database. - // - // There are three cases - // - // - The database exists and is at the expected version. Continue in - // this case. - // - // - The database exists and is at a lower-than-expected version. We - // fail back to the caller here, which will retry indefinitely until the - // DB has been updated. - // - // - The DB doesn't exist at all. This reports a version number of 0. We - // need to create the DB here, at the latest version. This is used in - // fresh installations and tests. - let client = Client::new(db_address, &log); - match client.check_db_is_at_expected_version().await { - Ok(_) => {} - Err(oximeter_db::Error::DatabaseVersionMismatch { - found: 0, - .. - }) => { - debug!(log, "oximeter database does not exist, creating"); - let replicated = client.is_oximeter_cluster().await?; - client - .initialize_db_with_version( - replicated, - oximeter_db::OXIMETER_VERSION, - ) - .await?; - } - Err(e) => return Err(Error::from(e)), - } - - // Spawn the task for aggregating and inserting all metrics - tokio::spawn(async move { - results_sink( - insertion_log, - client, - db_config.batch_size, - Duration::from_secs(db_config.batch_interval), - result_receiver, - ) - .await - }); - Ok(Self { - id, - log, - result_sender, - collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), - }) - } - - /// Construct a new standalone `oximeter` collector. - pub async fn new_standalone( - id: Uuid, - db_config: Option, - log: &Logger, - ) -> Result { - let (result_sender, result_receiver) = mpsc::channel(8); - let log = log.new(o!( - "component" => "oximeter-standalone", - "collector_id" => id.to_string(), - )); - - // If we have configuration for ClickHouse, we'll spawn the results - // sink task as usual. If not, we'll spawn a dummy task that simply - // prints the results as they're received. - let insertion_log = log.new(o!("component" => "results-sink")); - if let Some(db_config) = db_config { - let Some(address) = db_config.address else { - return Err(Error::Standalone(anyhow!( - "Must provide explicit IP address in standalone mode" - ))); - }; - let client = Client::new(address, &log); - let replicated = client.is_oximeter_cluster().await?; - if !replicated { - client.init_single_node_db().await?; - } else { - client.init_replicated_db().await?; - } - - // Spawn the task for aggregating and inserting all metrics - tokio::spawn(async move { - results_sink( - insertion_log, - client, - db_config.batch_size, - Duration::from_secs(db_config.batch_interval), - result_receiver, - ) - .await - }); - } else { - tokio::spawn(results_printer(insertion_log, result_receiver)); - } - - // Construct the ClickHouse client first, propagate an error if we can't reach the - // database. - Ok(Self { - id, - log, - result_sender, - collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), - }) - } - - /// Register a new producer with this oximeter instance. - pub async fn register_producer( - &self, - info: ProducerEndpoint, - ) -> Result<(), Error> { - let id = info.id; - match self.collection_tasks.lock().await.entry(id) { - Entry::Vacant(value) => { - debug!( - self.log, - "registered new metric producer"; - "producer_id" => id.to_string(), - "address" => info.address, - ); - - // Build channel to control the task and receive results. - let (tx, rx) = mpsc::channel(4); - let q = self.result_sender.clone(); - let log = self.log.new(o!("component" => "collection-task", "producer_id" => id.to_string())); - let info_clone = info.clone(); - let task = tokio::spawn(async move { - collection_task(log, info_clone, rx, q).await; - }); - value.insert((info, CollectionTask { inbox: tx, task })); - } - Entry::Occupied(mut value) => { - debug!( - self.log, - "received request to register existing metric \ - producer, updating collection information"; - "producer_id" => id.to_string(), - "interval" => ?info.interval, - "address" => info.address, - ); - value.get_mut().0 = info.clone(); - value - .get() - .1 - .inbox - .send(CollectionMessage::Update(info)) - .await - .unwrap(); - } - } - Ok(()) - } - - /// Forces a collection from all producers. - /// - /// Returns once all those values have been inserted into Clickhouse, - /// or an error occurs trying to perform the collection. - pub async fn force_collection(&self) { - let mut collection_oneshots = vec![]; - let collection_tasks = self.collection_tasks.lock().await; - for (_id, (_endpoint, task)) in collection_tasks.iter() { - let (tx, rx) = oneshot::channel(); - // Scrape from each producer, into oximeter... - task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); - // ... and keep track of the token that indicates once the metric - // has made it into Clickhouse. - collection_oneshots.push(rx); - } - drop(collection_tasks); - - // Only return once all producers finish processing the token we - // provided. - // - // NOTE: This can either mean that the collection completed - // successfully, or an error occurred in the collection pathway. - futures::future::join_all(collection_oneshots).await; - } - - /// List existing producers. - pub async fn list_producers( - &self, - start_id: Option, - limit: usize, - ) -> Vec { - let start = if let Some(id) = start_id { - Bound::Excluded(id) - } else { - Bound::Unbounded - }; - self.collection_tasks - .lock() - .await - .range((start, Bound::Unbounded)) - .take(limit) - .map(|(_id, (info, _t))| info.clone()) - .collect() - } - - /// Delete a producer by ID, stopping its collection task. - pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { - let (_info, task) = self - .collection_tasks - .lock() - .await - .remove(&id) - .ok_or_else(|| Error::NoSuchProducer(id))?; - debug!( - self.log, - "removed collection task from set"; - "producer_id" => %id, - ); - match task.inbox.send(CollectionMessage::Shutdown).await { - Ok(_) => debug!( - self.log, - "shut down collection task"; - "producer_id" => %id, - ), - Err(e) => error!( - self.log, - "failed to shut down collection task"; - "producer_id" => %id, - "error" => ?e, - ), - } - Ok(()) - } -} - /// Configuration used to initialize an oximeter server #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { @@ -768,8 +199,14 @@ impl Oximeter { let make_agent = || async { debug!(log, "creating ClickHouse client"); Ok(Arc::new( - OximeterAgent::with_id(args.id, config.db, &resolver, &log) - .await?, + OximeterAgent::with_id( + args.id, + args.address, + config.db, + &resolver, + &log, + ) + .await?, )) }; let log_client_failure = |error, delay| { @@ -858,7 +295,13 @@ impl Oximeter { ) -> Result { let db_config = clickhouse.map(DbConfig::with_address); let agent = Arc::new( - OximeterAgent::new_standalone(args.id, db_config, &log).await?, + OximeterAgent::new_standalone( + args.id, + args.address, + db_config, + &log, + ) + .await?, ); let dropshot_log = log.new(o!("component" => "dropshot")); @@ -941,108 +384,3 @@ impl Oximeter { self.agent.delete_producer(id).await } } - -// Build the HTTP API internal to the control plane -pub fn oximeter_api() -> ApiDescription> { - let mut api = ApiDescription::new(); - api.register(producers_post) - .expect("Could not register producers_post API handler"); - api.register(producers_list) - .expect("Could not register producers_list API handler"); - api.register(producer_delete) - .expect("Could not register producers_delete API handler"); - api.register(collector_info) - .expect("Could not register collector_info API handler"); - api -} - -// Handle a request from Nexus to register a new producer with this collector. -#[endpoint { - method = POST, - path = "/producers", -}] -async fn producers_post( - request_context: RequestContext>, - body: TypedBody, -) -> Result { - let agent = request_context.context(); - let producer_info = body.into_inner(); - agent - .register_producer(producer_info) - .await - .map_err(HttpError::from) - .map(|_| HttpResponseUpdatedNoContent()) -} - -// Parameters for paginating the list of producers. -#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] -struct ProducerPage { - id: Uuid, -} - -// List all producers -#[endpoint { - method = GET, - path = "/producers", -}] -async fn producers_list( - request_context: RequestContext>, - query: Query>, -) -> Result>, HttpError> { - let agent = request_context.context(); - let pagination = query.into_inner(); - let limit = request_context.page_limit(&pagination)?.get() as usize; - let start = match &pagination.page { - WhichPage::First(..) => None, - WhichPage::Next(ProducerPage { id }) => Some(*id), - }; - let producers = agent.list_producers(start, limit).await; - ResultsPage::new( - producers, - &EmptyScanParams {}, - |info: &ProducerEndpoint, _| ProducerPage { id: info.id }, - ) - .map(HttpResponseOk) -} - -#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] -struct ProducerIdPathParams { - producer_id: Uuid, -} - -// Delete a producer by ID. -#[endpoint { - method = DELETE, - path = "/producers/{producer_id}", -}] -async fn producer_delete( - request_context: RequestContext>, - path: dropshot::Path, -) -> Result { - let agent = request_context.context(); - let producer_id = path.into_inner().producer_id; - agent - .delete_producer(producer_id) - .await - .map_err(HttpError::from) - .map(|_| HttpResponseDeleted()) -} - -#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] -pub struct CollectorInfo { - /// The collector's UUID. - pub id: Uuid, -} - -// Return identifying information about this collector -#[endpoint { - method = GET, - path = "/info", -}] -async fn collector_info( - request_context: RequestContext>, -) -> Result, HttpError> { - let agent = request_context.context(); - let info = CollectorInfo { id: agent.id }; - Ok(HttpResponseOk(info)) -} diff --git a/oximeter/collector/src/self_stats.rs b/oximeter/collector/src/self_stats.rs new file mode 100644 index 0000000000..a20e1b7954 --- /dev/null +++ b/oximeter/collector/src/self_stats.rs @@ -0,0 +1,154 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Metrics oximeter reports about itself + +// Copyright 2023 Oxide Computer Company + +use crate::ProducerEndpoint; +use oximeter::types::Cumulative; +use oximeter::types::ProducerResultsItem; +use oximeter::Metric; +use oximeter::MetricsError; +use oximeter::Sample; +use oximeter::Target; +use std::collections::BTreeMap; +use std::net::IpAddr; +use std::time::Duration; +use uuid::Uuid; + +/// The interval on which we report self statistics +pub const COLLECTION_INTERVAL: Duration = Duration::from_secs(60); + +/// A target representing a single oximeter collector. +#[derive(Clone, Copy, Debug, Target)] +pub struct OximeterCollector { + /// The collector's ID. + pub collector_id: Uuid, + /// The collector server's IP address. + pub collector_ip: IpAddr, + /// The collector server's port. + pub collector_port: u16, +} + +/// The number of successful collections from a single producer. +#[derive(Debug, Metric)] +pub struct Collections { + /// The producer's ID. + pub producer_id: Uuid, + /// The producer's IP address. + pub producer_ip: IpAddr, + /// The producer's port. + pub producer_port: u16, + /// The base route in the producer server used to collect metrics. + /// + /// The full route is `{base_route}/{producer_id}`. + pub base_route: String, + pub datum: Cumulative, +} + +/// Small enum to help understand why oximeter failed to collect from a +/// producer. +#[derive( + Clone, + Copy, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + strum::EnumIter, + strum::Display, +)] +#[non_exhaustive] +#[strum(serialize_all = "snake_case")] +pub enum FailureReason { + /// The producer could not be reached. + Unreachable, + /// Error during deserialization. + Deserialization, + /// Some unknown reason. + Other, +} + +/// The number of failed collections from a single producer. +#[derive(Debug, Metric)] +pub struct FailedCollections { + /// The producer's ID. + pub producer_id: Uuid, + /// The producer's IP address. + pub producer_ip: IpAddr, + /// The producer's port. + pub producer_port: u16, + /// The base route in the producer server used to collect metrics. + /// + /// The full route is `{base_route}/{producer_id}`. + pub base_route: String, + /// The reason we could not collect. + // + // NOTE: This should always be generated through a `FailureReason`. + pub reason: String, + pub datum: Cumulative, +} + +/// Oximeter collection statistics maintained by each collection task. +#[derive(Debug)] +pub struct CollectionTaskStats { + pub collector: OximeterCollector, + pub collections: Collections, + pub failed_collections: BTreeMap, +} + +impl CollectionTaskStats { + pub fn new( + collector: OximeterCollector, + producer: &ProducerEndpoint, + ) -> Self { + Self { + collector, + collections: Collections { + producer_id: producer.id, + producer_ip: producer.address.ip(), + producer_port: producer.address.port(), + base_route: producer.base_route.clone(), + datum: Cumulative::new(0), + }, + failed_collections: BTreeMap::new(), + } + } + + pub fn failures_for_reason( + &mut self, + reason: FailureReason, + ) -> &mut FailedCollections { + self.failed_collections.entry(reason).or_insert_with(|| { + FailedCollections { + producer_id: self.collections.producer_id, + producer_ip: self.collections.producer_ip, + producer_port: self.collections.producer_port, + base_route: self.collections.base_route.clone(), + reason: reason.to_string(), + datum: Cumulative::new(0), + } + }) + } + + pub fn sample(&self) -> Vec { + fn to_item(res: Result) -> ProducerResultsItem { + match res { + Ok(s) => ProducerResultsItem::Ok(vec![s]), + Err(s) => ProducerResultsItem::Err(s), + } + } + let mut samples = Vec::with_capacity(1 + self.failed_collections.len()); + samples.push(to_item(Sample::new(&self.collector, &self.collections))); + samples.extend( + self.failed_collections + .values() + .map(|metric| to_item(Sample::new(&self.collector, metric))), + ); + samples + } +}