diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index cc1efd126f..7dae63e947 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -7,11 +7,15 @@ use crate::helpers::CONNECTION_OPTIONS_HEADING; use crate::Omdb; use anyhow::Context; +use chrono::SecondsFormat; use clap::Args; use clap::Subcommand; use futures::TryStreamExt; use internal_dns_types::names::ServiceName; +use oximeter_client::types::FailedCollection; +use oximeter_client::types::ProducerDetails; use oximeter_client::types::ProducerEndpoint; +use oximeter_client::types::SuccessfulCollection; use oximeter_client::Client; use slog::Logger; use std::net::SocketAddr; @@ -41,6 +45,11 @@ pub struct OximeterArgs { enum OximeterCommands { /// List the producers the collector is assigned to poll. ListProducers, + /// Fetch details about a single assigned producer. + ProducerDetails { + /// The ID of the producer to fetch. + producer_id: Uuid, + }, } impl OximeterArgs { @@ -81,9 +90,26 @@ impl OximeterArgs { OximeterCommands::ListProducers => { self.list_producers(client).await } + OximeterCommands::ProducerDetails { producer_id } => { + self.producer_details(client, producer_id).await + } } } + async fn producer_details( + &self, + client: Client, + producer_id: Uuid, + ) -> anyhow::Result<()> { + let details = client + .producer_details(&producer_id) + .await + .context("failed to fetch producer details")? + .into_inner(); + print_producer_details(details); + Ok(()) + } + async fn list_producers(&self, client: Client) -> anyhow::Result<()> { let info = client .collector_info() @@ -120,11 +146,168 @@ struct Producer { impl From for Producer { fn from(p: ProducerEndpoint) -> Self { - let interval = Duration::new(p.interval.secs, p.interval.nanos); Self { id: p.id, address: p.address.parse().unwrap(), - interval: humantime::format_duration(interval).to_string(), + interval: duration_to_humantime(&p.interval), + } + } +} + +fn duration_to_humantime(d: &oximeter_client::types::Duration) -> String { + let interval = Duration::new(d.secs, d.nanos); + humantime::format_duration(interval).to_string() +} + +const WIDTH: usize = 12; + +fn print_producer_details(details: ProducerDetails) { + println!(); + println!("{:>WIDTH$}: {}", "ID", details.id); + println!("{:>WIDTH$}: {}", "Address", details.address); + println!( + "{:>WIDTH$}: {}", + "Registered", + details.registered.to_rfc3339_opts(SecondsFormat::Millis, true) + ); + println!( + "{:>WIDTH$}: {}", + "Updated", + details.updated.to_rfc3339_opts(SecondsFormat::Millis, true) + ); + println!( + "{:>WIDTH$}: {}", + "Interval", + duration_to_humantime(&details.interval) + ); + println!("{:>WIDTH$}: {}", "Successes", details.n_collections); + println!("{:>WIDTH$}: {}", "Failures", details.n_failures); + println!(); + print_last_success(details.last_success.as_ref()); + println!(); + print_last_failure(details.last_failure.as_ref()); +} + +fn print_last_success(maybe_success: Option<&SuccessfulCollection>) { + print!("{:>WIDTH$}: ", "Last success"); + match maybe_success { + None => println!("None"), + Some(success) => { + println!(); + println!( + "{:>WIDTH$}: {}", + "Started at", + success.started_at.to_rfc3339_opts(SecondsFormat::Millis, true) + ); + println!( + "{:>WIDTH$}: {:?}", + "Queued for", + Duration::new( + success.time_queued.secs, + success.time_queued.nanos + ) + ); + println!( + "{:>WIDTH$}: {:?}", + "Duration", + Duration::new( + success.time_collecting.secs, + success.time_collecting.nanos + ) + ); + println!("{:>WIDTH$}: {}", "Samples", success.n_samples); } } } + +fn print_last_failure(maybe_failure: Option<&FailedCollection>) { + print!("{:>WIDTH$}: ", "Last failure"); + match maybe_failure { + None => println!("None"), + Some(failure) => { + println!(); + println!( + "{:>WIDTH$}: {}", + "Started at", + failure.started_at.to_rfc3339_opts(SecondsFormat::Millis, true) + ); + println!( + "{:>WIDTH$}: {:?}", + "Queued for", + Duration::new( + failure.time_queued.secs, + failure.time_queued.nanos + ) + ); + println!( + "{:>WIDTH$}: {:?}", + "Duration", + Duration::new( + failure.time_collecting.secs, + failure.time_collecting.nanos + ) + ); + println!("{:>WIDTH$}: {}", "Reason", failure.reason); + } + } +} + +#[cfg(test)] +mod tests { + use super::print_producer_details; + use chrono::Utc; + use oximeter_client::types::FailedCollection; + use oximeter_client::types::ProducerDetails; + use oximeter_client::types::SuccessfulCollection; + use std::time::Duration; + use uuid::Uuid; + + #[test] + fn test_print_producer_details_success_only() { + let now = Utc::now(); + let details = ProducerDetails { + id: Uuid::new_v4(), + address: "[::1]:12345".parse().unwrap(), + interval: Duration::from_secs(10).into(), + last_success: Some(SuccessfulCollection { + n_samples: 100, + started_at: now, + time_collecting: Duration::from_millis(100).into(), + time_queued: Duration::from_millis(10).into(), + }), + last_failure: None, + n_collections: 1, + n_failures: 0, + registered: now, + updated: now, + }; + print_producer_details(details); + } + + #[test] + fn test_print_producer_details_with_failure() { + let now = Utc::now(); + let details = ProducerDetails { + id: Uuid::new_v4(), + interval: Duration::from_secs(10).into(), + address: "[::1]:12345".parse().unwrap(), + last_success: Some(SuccessfulCollection { + n_samples: 100, + started_at: now, + time_collecting: Duration::from_millis(100).into(), + time_queued: Duration::from_millis(10).into(), + }), + last_failure: Some(FailedCollection { + started_at: now, + time_collecting: Duration::from_millis(100).into(), + time_queued: Duration::from_millis(10).into(), + reason: String::from("unreachable"), + }), + n_collections: 1, + n_failures: 1, + registered: now, + updated: now, + }; + print_producer_details(details); + } +} diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 5e66467403..85fc761289 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -761,8 +761,9 @@ Query oximeter collector state Usage: omdb oximeter [OPTIONS] Commands: - list-producers List the producers the collector is assigned to poll - help Print this message or the help of the given subcommand(s) + list-producers List the producers the collector is assigned to poll + producer-details Fetch details about a single assigned producer + help Print this message or the help of the given subcommand(s) Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] diff --git a/openapi/oximeter.json b/openapi/oximeter.json index dea3418b8d..b51c56b667 100644 --- a/openapi/oximeter.json +++ b/openapi/oximeter.json @@ -84,6 +84,39 @@ } }, "/producers/{producer_id}": { + "get": { + "summary": "Get details about a producer by ID.", + "operationId": "producer_details", + "parameters": [ + { + "in": "path", + "name": "producer_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProducerDetails" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, "delete": { "summary": "Delete a producer by ID.", "operationId": "producer_delete", @@ -171,6 +204,114 @@ "request_id" ] }, + "FailedCollection": { + "description": "Details about a previous failed collection.", + "type": "object", + "properties": { + "reason": { + "description": "The reason the collection failed.", + "type": "string" + }, + "started_at": { + "description": "The time at which we started a collection.\n\nNote that this is the time we queued a request to collect for processing by a background task. The `time_queued` can be added to this time to figure out when processing began, and `time_collecting` can be added to that to figure out how long the actual collection process took.", + "type": "string", + "format": "date-time" + }, + "time_collecting": { + "description": "The time it took for the actual collection.", + "allOf": [ + { + "$ref": "#/components/schemas/Duration" + } + ] + }, + "time_queued": { + "description": "The time this request spent queued before being processed.", + "allOf": [ + { + "$ref": "#/components/schemas/Duration" + } + ] + } + }, + "required": [ + "reason", + "started_at", + "time_collecting", + "time_queued" + ] + }, + "ProducerDetails": { + "type": "object", + "properties": { + "address": { + "description": "The current collection address.", + "type": "string" + }, + "id": { + "description": "The producer's ID.", + "type": "string", + "format": "uuid" + }, + "interval": { + "description": "The current collection interval.", + "allOf": [ + { + "$ref": "#/components/schemas/Duration" + } + ] + }, + "last_failure": { + "nullable": true, + "description": "Details about the last failed collection.\n\nThis is None if we've never failed to collect from the producer.", + "allOf": [ + { + "$ref": "#/components/schemas/FailedCollection" + } + ] + }, + "last_success": { + "nullable": true, + "description": "Details about the last successful collection.\n\nThis is None if we've never successfully collected from the producer.", + "allOf": [ + { + "$ref": "#/components/schemas/SuccessfulCollection" + } + ] + }, + "n_collections": { + "description": "The total number of successful collections we've made.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "n_failures": { + "description": "The total number of failed collections.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "registered": { + "description": "The time the producer was first registered with us.", + "type": "string", + "format": "date-time" + }, + "updated": { + "description": "The last time the producer's information was updated.", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "address", + "id", + "interval", + "n_collections", + "n_failures", + "registered", + "updated" + ] + }, "ProducerEndpoint": { "description": "Information announced by a metric server, used so that clients can contact it and collect available metric data from it.", "type": "object", @@ -261,6 +402,45 @@ ] } ] + }, + "SuccessfulCollection": { + "description": "Details about a previous successful collection.", + "type": "object", + "properties": { + "n_samples": { + "description": "The number of samples collected.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "started_at": { + "description": "The time at which we started a collection.\n\nNote that this is the time we queued a request to collect for processing by a background task. The `time_queued` can be added to this time to figure out when processing began, and `time_collecting` can be added to that to figure out how long the actual collection process took.", + "type": "string", + "format": "date-time" + }, + "time_collecting": { + "description": "The time it took for the actual collection.", + "allOf": [ + { + "$ref": "#/components/schemas/Duration" + } + ] + }, + "time_queued": { + "description": "The time this request spent queued before being processed.", + "allOf": [ + { + "$ref": "#/components/schemas/Duration" + } + ] + } + }, + "required": [ + "n_samples", + "started_at", + "time_collecting", + "time_queued" + ] } }, "responses": { diff --git a/oximeter/api/src/lib.rs b/oximeter/api/src/lib.rs index 2231a0cc5d..f47a5ba07e 100644 --- a/oximeter/api/src/lib.rs +++ b/oximeter/api/src/lib.rs @@ -10,6 +10,7 @@ use dropshot::{ use omicron_common::api::internal::nexus::ProducerEndpoint; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, time::Duration}; use uuid::Uuid; #[dropshot::api_description] @@ -26,6 +27,16 @@ pub trait OximeterApi { query: Query>, ) -> Result>, HttpError>; + /// Get details about a producer by ID. + #[endpoint { + method = GET, + path = "/producers/{producer_id}", + }] + async fn producer_details( + request_context: RequestContext, + path: dropshot::Path, + ) -> Result, HttpError>; + /// Delete a producer by ID. #[endpoint { method = DELETE, @@ -64,3 +75,120 @@ pub struct CollectorInfo { /// Last time we refreshed our producer list with Nexus. pub last_refresh: Option>, } + +/// Details about a previous successful collection. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct SuccessfulCollection { + /// The time at which we started a collection. + /// + /// Note that this is the time we queued a request to collect for processing + /// by a background task. The `time_queued` can be added to this time to + /// figure out when processing began, and `time_collecting` can be added to + /// that to figure out how long the actual collection process took. + pub started_at: DateTime, + + /// The time this request spent queued before being processed. + pub time_queued: Duration, + + /// The time it took for the actual collection. + pub time_collecting: Duration, + + /// The number of samples collected. + pub n_samples: u64, +} + +/// Details about a previous failed collection. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct FailedCollection { + /// The time at which we started a collection. + /// + /// Note that this is the time we queued a request to collect for processing + /// by a background task. The `time_queued` can be added to this time to + /// figure out when processing began, and `time_collecting` can be added to + /// that to figure out how long the actual collection process took. + pub started_at: DateTime, + + /// The time this request spent queued before being processed. + pub time_queued: Duration, + + /// The time it took for the actual collection. + pub time_collecting: Duration, + + /// The reason the collection failed. + pub reason: String, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct ProducerDetails { + /// The producer's ID. + pub id: Uuid, + + /// The current collection interval. + pub interval: Duration, + + /// The current collection address. + pub address: SocketAddr, + + /// The time the producer was first registered with us. + pub registered: DateTime, + + /// The last time the producer's information was updated. + pub updated: DateTime, + + /// Details about the last successful collection. + /// + /// This is None if we've never successfully collected from the producer. + pub last_success: Option, + + /// Details about the last failed collection. + /// + /// This is None if we've never failed to collect from the producer. + pub last_failure: Option, + + /// The total number of successful collections we've made. + pub n_collections: u64, + + /// The total number of failed collections. + pub n_failures: u64, +} + +impl ProducerDetails { + pub fn new(info: &ProducerEndpoint) -> Self { + let now = Utc::now(); + Self { + id: info.id, + interval: info.interval, + address: info.address, + registered: now, + updated: now, + last_success: None, + last_failure: None, + n_collections: 0, + n_failures: 0, + } + } + + /// Update with new producer information. + /// + /// # Panics + /// + /// This panics if the new information refers to a different ID. + pub fn update(&mut self, new: &ProducerEndpoint) { + assert_eq!(self.id, new.id); + self.updated = Utc::now(); + self.address = new.address; + self.interval = new.interval; + } + + /// Update when we successfully complete a collection. + pub fn on_success(&mut self, success: SuccessfulCollection) { + self.last_success = Some(success); + self.n_collections += 1; + } + + /// Update when we fail to complete a collection. + pub fn on_failure(&mut self, failure: FailedCollection) { + self.last_failure = Some(failure); + self.n_failures += 1; + } +} diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 6fa8c01c56..4c4f0f4177 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -20,6 +20,9 @@ use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; +use oximeter_api::FailedCollection; +use oximeter_api::ProducerDetails; +use oximeter_api::SuccessfulCollection; use oximeter_db::Client; use oximeter_db::DbWrite; use qorb::claim::Handle; @@ -40,6 +43,7 @@ use std::ops::Bound; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::time::Duration; +use std::time::Instant; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::oneshot; @@ -97,6 +101,18 @@ enum CollectionMessage { Statistics { reply_tx: oneshot::Sender, }, + // Request details from the collection task about its producer. + Details { + reply_tx: oneshot::Sender, + }, +} + +/// Return type for `perform_collection`. +struct SingleCollectionResult { + /// The result of the collection. + result: Result, + /// The duration the collection took. + duration: Duration, } /// Run a single collection from the producer. @@ -104,14 +120,15 @@ async fn perform_collection( log: Logger, client: reqwest::Client, producer: ProducerEndpoint, -) -> Result { +) -> SingleCollectionResult { + let start = Instant::now(); debug!(log, "collecting from producer"); let res = client .get(format!("http://{}/{}", producer.address, producer.id)) .send() .await; trace!(log, "sent collection request to producer"); - match res { + let result = match res { Ok(res) => { if res.status().is_success() { match res.json::().await { @@ -149,7 +166,8 @@ async fn perform_collection( ); Err(self_stats::FailureReason::Unreachable) } - } + }; + SingleCollectionResult { result, duration: start.elapsed() } } // The type of one collection task run to completion. @@ -158,15 +176,54 @@ async fn perform_collection( // can bump the self-stat counter accordingly. type CollectionResult = Result; -// The type of one response message sent from the collection task. -type CollectionResponse = (Option, CollectionResult); +/// Information about when we start a collection. +struct CollectionStartTimes { + /// UTC timestamp at which the request was started. + started_at: DateTime, + /// Instant right before we queued the response for processing. + queued_at: Instant, +} + +impl CollectionStartTimes { + fn new() -> Self { + Self { started_at: Utc::now(), queued_at: Instant::now() } + } +} + +/// Details about a forced collection. +struct ForcedCollectionRequest { + /// The collection token we signal when the collection is completed. + token: CollectionToken, + /// Start time for this collection. + start: CollectionStartTimes, +} + +impl ForcedCollectionRequest { + fn new(token: CollectionToken) -> Self { + Self { token, start: CollectionStartTimes::new() } + } +} + +/// Details about a completed collection. +struct CollectionResponse { + /// Token for a forced collection request. + token: Option, + /// The actual result of the collection. + result: CollectionResult, + /// Time when the collection started. + started_at: DateTime, + /// Time the request spent queued. + time_queued: Duration, + /// Time we spent processing the request. + time_collecting: Duration, +} /// Task that actually performs collections from the producer. async fn inner_collection_loop( log: Logger, mut producer_info_rx: watch::Receiver, - mut forced_collection_rx: mpsc::Receiver, - mut timer_collection_rx: mpsc::Receiver<()>, + mut forced_collection_rx: mpsc::Receiver, + mut timer_collection_rx: mpsc::Receiver, result_tx: mpsc::Sender, ) { let client = reqwest::Client::builder() @@ -178,29 +235,30 @@ async fn inner_collection_loop( loop { // Wait for notification that we have a collection to perform, from // either the forced- or timer-collection queue. - trace!(log, "top of inner collection loop, waiting for next request",); - let maybe_token = tokio::select! { + trace!(log, "top of inner collection loop, waiting for next request"); + let (maybe_token, start_time) = tokio::select! { maybe_request = forced_collection_rx.recv() => { - let Some(request) = maybe_request else { + let Some(ForcedCollectionRequest { token, start }) = maybe_request else { debug!( log, "forced collection request queue closed, exiting" ); return; }; - Some(request) + (Some(token), start) } maybe_request = timer_collection_rx.recv() => { - if maybe_request.is_none() { + let Some(start) = maybe_request else { debug!( log, "timer collection request queue closed, exiting" ); return; }; - None + (None, start) } }; + let time_queued = start_time.queued_at.elapsed(); // Make a future to represent the actual collection. let mut collection_fut = Box::pin(perform_collection( @@ -212,7 +270,7 @@ async fn inner_collection_loop( // Wait for that collection to complete or fail, or for an update to the // producer's information. In the latter case, recreate the future for // the collection itself with the new producer information. - let collection_result = 'collection: loop { + let SingleCollectionResult { result, duration } = 'collection: loop { tokio::select! { biased; @@ -258,8 +316,16 @@ async fn inner_collection_loop( }; // Now that the collection has completed, send on the results, along - // with any collection token we may have gotten with the request. - match result_tx.send((maybe_token, collection_result)).await { + // with the timing information and any collection token we may have + // gotten with the request. + let response = CollectionResponse { + token: maybe_token, + result, + started_at: start_time.started_at, + time_queued, + time_collecting: duration, + }; + match result_tx.send(response).await { Ok(_) => trace!(log, "forwarded results to main collection loop"), Err(_) => { error!( @@ -298,6 +364,10 @@ async fn collection_loop( let mut self_collection_timer = interval(self_stats::COLLECTION_INTERVAL); self_collection_timer.tick().await; + // Keep track of more details about each collection, so we can expose this + // as debugging information in `oximeter`'s public API. + let mut details = ProducerDetails::new(&producer); + // Spawn a task to run the actual collections. // // This is so that we can possibly interrupt and restart collections that @@ -342,23 +412,24 @@ async fn collection_loop( log, "collection task received explicit request to collect" ); - match forced_collection_tx.try_send(token) { + let request = ForcedCollectionRequest::new(token); + match forced_collection_tx.try_send(request) { Ok(_) => trace!( log, "forwarded explicit request to collection task" ), Err(e) => { match e { - TrySendError::Closed(tok) => { + TrySendError::Closed(ForcedCollectionRequest { token, .. }) => { debug!( log, "collection task forced collection \ queue is closed. Attempting to \ notify caller and exiting.", ); - let _ = tok.send(Err(ForcedCollectionError::Closed)); + let _ = token.send(Err(ForcedCollectionError::Closed)); return; } - TrySendError::Full(tok) => { + TrySendError::Full(ForcedCollectionRequest { token, start }) => { error!( log, "collection task forced collection \ @@ -368,7 +439,7 @@ async fn collection_loop( calling `force_collection()` many \ times" ); - if tok + if token .send(Err(ForcedCollectionError::QueueFull)) .is_err() { @@ -379,6 +450,13 @@ async fn collection_loop( closed" ); } + let failure = FailedCollection { + started_at: start.started_at, + time_queued: Duration::ZERO, + time_collecting: Duration::ZERO, + reason: String::from("forced collection queue full"), + }; + details.on_failure(failure); } } } @@ -421,6 +499,8 @@ async fn collection_loop( "interval" => ?new_info.interval, "address" => new_info.address, ); + details.update(&new_info); + stats.update(&new_info); collection_timer = interval(new_info.interval); collection_timer.tick().await; // completes immediately } @@ -442,10 +522,24 @@ async fn collection_loop( ); reply_tx.send(stats.clone()).expect("failed to send statistics"); } + Some(CollectionMessage::Details { reply_tx }) => { + match reply_tx.send(details.clone()) { + Ok(_) => trace!( + log, + "sent producer details reply to oximeter agent", + ), + Err(e) => error!( + log, + "failed to send producer details reply to \ + oximeter agent"; + "error" => ?e, + ), + } + } } } maybe_result = result_rx.recv() => { - let Some((maybe_token, result)) = maybe_result else { + let Some(response) = maybe_result else { error!( log, "channel for receiving results from collection task \ @@ -453,10 +547,31 @@ async fn collection_loop( ); return; }; + let CollectionResponse { + token, + result, + started_at, + time_queued, + time_collecting + } = response; match result { Ok(results) => { stats.collections.datum.increment(); - if outbox.send((maybe_token, results)).await.is_err() { + let n_samples: u64 = results + .iter() + .map(|each| match each { + ProducerResultsItem::Ok(samples) => samples.len() as u64, + _ => 0, + }) + .sum(); + let success = SuccessfulCollection { + started_at, + time_queued, + time_collecting, + n_samples + }; + details.on_success(success); + if outbox.send((token, results)).await.is_err() { error!( log, "failed to send results to outbox, channel is \ @@ -465,7 +580,16 @@ async fn collection_loop( return; } } - Err(reason) => stats.failures_for_reason(reason).datum.increment(), + Err(reason) => { + let failure = FailedCollection { + started_at, + time_queued, + time_collecting, + reason: reason.to_string(), + }; + details.on_failure(failure); + stats.failures_for_reason(reason).datum.increment(); + } } } _ = self_collection_timer.tick() => { @@ -476,7 +600,7 @@ async fn collection_loop( outbox.send((None, stats.sample())).await.unwrap(); } _ = collection_timer.tick() => { - match timer_collection_tx.try_send(()) { + match timer_collection_tx.try_send(CollectionStartTimes::new()) { Ok(_) => { debug!( log, @@ -492,7 +616,14 @@ async fn collection_loop( ); return; } - Err(TrySendError::Full(_)) => { + Err(TrySendError::Full(start)) => { + let failure = FailedCollection { + started_at: start.started_at, + time_queued: Duration::ZERO, + time_collecting: Duration::ZERO, + reason: String::from("collections in progress"), + }; + details.on_failure(failure); error!( log, "timer-based collection request queue is \ @@ -851,6 +982,37 @@ impl OximeterAgent { }) } + /// Fetch details about a producer, if it exists. + pub async fn producer_details( + &self, + id: Uuid, + ) -> Result { + let tasks = self.collection_tasks.lock().await; + let Some((_info, task)) = tasks.get(&id) else { + return Err(Error::NoSuchProducer { id }); + }; + let (reply_tx, rx) = oneshot::channel(); + task.inbox.try_send(CollectionMessage::Details { reply_tx }).map_err( + |_| { + Error::CollectionError( + id, + String::from( + "Failed to send detail request to collection task", + ), + ) + }, + )?; + drop(tasks); + rx.await.map_err(|_| { + Error::CollectionError( + id, + String::from( + "Failed to receive detail response from collection task", + ), + ) + }) + } + /// Register a new producer with this oximeter instance. pub async fn register_producer( &self, @@ -1089,6 +1251,7 @@ async fn refresh_producer_list_task( let mut interval = tokio::time::interval(agent.refresh_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + info!(agent.log, "starting refresh list task"); loop { interval.tick().await; info!(agent.log, "refreshing list of producers from Nexus"); @@ -1208,6 +1371,7 @@ mod tests { use super::OximeterAgent; use super::ProducerEndpoint; use crate::self_stats::FailureReason; + use chrono::Utc; use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::Path; @@ -1562,4 +1726,168 @@ mod tests { ); logctx.cleanup_successful(); } + + #[tokio::test] + async fn verify_producer_details() { + let logctx = test_setup_log("verify_producer_details"); + let log = &logctx.log; + + // Spawn an oximeter collector ... + let collector = OximeterAgent::new_standalone( + Uuid::new_v4(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), + None, + log, + ) + .await + .unwrap(); + + // Spawn the mock server that always reports empty statistics. + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); + + // Register the dummy producer. + let endpoint = ProducerEndpoint { + id: Uuid::new_v4(), + kind: ProducerKind::Service, + address: server.local_addr(), + interval: COLLECTION_INTERVAL, + }; + let id = endpoint.id; + let before = Utc::now(); + collector + .register_producer(endpoint) + .await + .expect("failed to register dummy producer"); + + // We don't manipulate time manually here, since this is pretty short + // and we want to assert things about the actual timing in the test + // below. + while collection_count.load(Ordering::SeqCst) < 1 { + tokio::time::sleep(TICK_INTERVAL).await; + } + + // Get details about the producer. + let count = collection_count.load(Ordering::SeqCst) as u64; + let details = collector + .producer_details(id) + .await + .expect("Should be able to get producer details"); + assert_eq!(details.id, id); + assert!(details.registered > before); + assert!(details.updated > before); + assert_eq!(details.registered, details.updated); + assert!( + details.n_collections == count + || details.n_collections == count - 1 + ); + assert_eq!(details.n_failures, 0); + let success = + details.last_success.expect("Should have a successful collection"); + assert!(success.time_queued > Duration::ZERO); + assert!(success.time_collecting > Duration::ZERO); + assert!(success.n_samples == 0); + assert!(details.last_failure.is_none()); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_updated_producer_is_still_collected_from() { + let logctx = + test_setup_log("test_updated_producer_is_still_collected_from"); + let log = &logctx.log; + + // Spawn an oximeter collector ... + let collector = OximeterAgent::new_standalone( + Uuid::new_v4(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), + None, + log, + ) + .await + .unwrap(); + + // Spawn the mock server that always reports empty statistics. + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); + + // Register the dummy producer. + let id = Uuid::new_v4(); + let endpoint = ProducerEndpoint { + id, + kind: ProducerKind::Service, + address: server.local_addr(), + interval: COLLECTION_INTERVAL, + }; + collector + .register_producer(endpoint) + .await + .expect("failed to register dummy producer"); + + let details = collector.producer_details(id).await.unwrap(); + println!("{details:#?}"); + + // Ensure we get some collections from it. + tokio::time::pause(); + while collection_count.load(Ordering::SeqCst) < 1 { + tokio::time::advance(TICK_INTERVAL).await; + } + + // Now, drop and recreate the server, and register with the same ID at a + // different address. + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); + + // Register the dummy producer. + let endpoint = + ProducerEndpoint { address: server.local_addr(), ..endpoint }; + collector + .register_producer(endpoint) + .await + .expect("failed to register dummy producer a second time"); + + // We should just have one producer. + assert_eq!( + collector.collection_tasks.lock().await.len(), + 1, + "Should only have one producer, it was updated and has the \ + same UUID", + ); + + // We should eventually collect from it again. + let now = Instant::now(); + while now.elapsed() < TEST_WAIT_PERIOD { + tokio::time::advance(TICK_INTERVAL).await; + } + let details = collector.producer_details(id).await.unwrap(); + println!("{details:#?}"); + assert_eq!(details.id, id); + assert_eq!(details.address, server.local_addr()); + assert!(details.n_collections > 0); + assert!(collection_count.load(Ordering::SeqCst) > 0); + logctx.cleanup_successful(); + } } diff --git a/oximeter/collector/src/http_entrypoints.rs b/oximeter/collector/src/http_entrypoints.rs index 1962262453..61777daf2b 100644 --- a/oximeter/collector/src/http_entrypoints.rs +++ b/oximeter/collector/src/http_entrypoints.rs @@ -52,6 +52,19 @@ impl OximeterApi for OximeterApiImpl { .map(HttpResponseOk) } + async fn producer_details( + request_context: RequestContext, + path: dropshot::Path, + ) -> Result, HttpError> { + let agent = request_context.context(); + let producer_id = path.into_inner().producer_id; + agent + .producer_details(producer_id) + .await + .map_err(HttpError::from) + .map(HttpResponseOk) + } + async fn producer_delete( request_context: RequestContext, path: dropshot::Path, diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index cc0ef92c13..3f13eb1382 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -65,11 +65,18 @@ pub enum Error { #[error("Error running standalone")] Standalone(#[from] anyhow::Error), + + #[error("No registered producer with id '{id}'")] + NoSuchProducer { id: Uuid }, } impl From for HttpError { fn from(e: Error) -> Self { - HttpError::for_internal_error(e.to_string()) + if let Error::NoSuchProducer { .. } = e { + HttpError::for_not_found(None, e.to_string()) + } else { + HttpError::for_internal_error(e.to_string()) + } } } diff --git a/oximeter/collector/src/self_stats.rs b/oximeter/collector/src/self_stats.rs index 2ab7b201e5..ff8776c031 100644 --- a/oximeter/collector/src/self_stats.rs +++ b/oximeter/collector/src/self_stats.rs @@ -99,6 +99,32 @@ impl CollectionTaskStats { } } + /// Update this information with a new producer endpoint. + /// + /// # Panics + /// + /// This panics if `new_info` refers to a different ID. + pub fn update(&mut self, new_info: &ProducerEndpoint) { + assert_eq!(self.collections.producer_id, new_info.id); + + // Only reset the counters if the new information is actually different. + let new_ip = new_info.address.ip(); + let new_port = new_info.address.port(); + if self.collections.producer_ip == new_ip + && self.collections.producer_port == new_port + { + return; + } + self.collections.producer_ip = new_ip; + self.collections.producer_port = new_port; + self.collections.datum = Cumulative::new(0); + for each in self.failed_collections.values_mut() { + each.producer_ip = new_ip; + each.producer_port = new_port; + each.datum = Cumulative::new(0); + } + } + pub fn failures_for_reason( &mut self, reason: FailureReason, @@ -135,18 +161,64 @@ impl CollectionTaskStats { #[cfg(test)] mod tests { + use super::CollectionTaskStats; use super::FailureReason; + use super::OximeterCollector; use super::StatusCode; + use omicron_common::api::internal::nexus::ProducerEndpoint; + use omicron_common::api::internal::nexus::ProducerKind; + use std::time::Duration; + use uuid::Uuid; #[test] fn test_failure_reason_serialization() { let data = &[ - (FailureReason::Deserialization, "deserialization"), - (FailureReason::Unreachable, "unreachable"), + (FailureReason::Deserialization, FailureReason::DESERIALIZATION), + (FailureReason::Unreachable, FailureReason::UNREACHABLE), + ( + FailureReason::CollectionsInProgress, + FailureReason::COLLECTIONS_IN_PROGRESS, + ), (FailureReason::Other(StatusCode::INTERNAL_SERVER_ERROR), "500"), ]; for (variant, as_str) in data.iter() { assert_eq!(variant.to_string(), *as_str); } } + + #[test] + fn only_reset_counters_if_info_is_different() { + let info = ProducerEndpoint { + id: Uuid::new_v4(), + kind: ProducerKind::Service, + address: "[::1]:12345".parse().unwrap(), + interval: Duration::from_secs(1), + }; + let collector = OximeterCollector { + collector_id: Uuid::new_v4(), + collector_ip: "::1".parse().unwrap(), + collector_port: 12345, + }; + let mut stats = CollectionTaskStats::new(collector, &info); + stats.collections.datum.increment(); + + stats.update(&info); + assert_eq!( + stats.collections.datum.value(), + 1, + "Should not have reset the counter when updating \ + with the same producer endpoint information" + ); + let info = ProducerEndpoint { + address: "[::1]:11111".parse().unwrap(), + ..info + }; + stats.update(&info); + assert_eq!( + stats.collections.datum.value(), + 0, + "Should have reset the counter when updating \ + with different producer endpoint information" + ); + } }