From 1c23ff9f86c211a6e726c20908dadb13799bbcfe Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 15 Sep 2023 00:21:59 +0000 Subject: [PATCH] Adds functionality to run oximeter standalone - Adds a "standalone" mode for the `oximeter-collector` crate, including the binary and main inner types. This runs in a slightly different mode, in which the ClickHouse database itself isn't strictly required. In this case, a task to simply print the results will be spawned in place of the normal results-sink task which inserts records into the database. - Creates a tiny fake Nexus server, which includes only the API needed to register collectors and producers. This is started automatically when running `oximeter standalone`, and used to assign producers / collectors as the real Nexus does, but without a database. The assignments are only in memory. - Adds internal `oximeter` API for listing / deleting a producer for each oximeter collector, and an `omdb` subcommand which exercises the listing. --- Cargo.lock | 12 + common/src/api/internal/nexus.rs | 2 +- omdb/Cargo.toml | 2 + omdb/src/bin/omdb/main.rs | 4 + omdb/src/bin/omdb/oximeter.rs | 93 ++++ omdb/tests/usage_errors.out | 14 +- openapi/oximeter.json | 130 +++++ oximeter-client/Cargo.toml | 1 + oximeter/collector/Cargo.toml | 7 + oximeter/collector/src/bin/oximeter.rs | 103 +++- oximeter/collector/src/lib.rs | 473 ++++++++++++++++-- oximeter/collector/src/standalone.rs | 263 ++++++++++ .../tests/output/cmd-oximeter-noargs-stderr | 8 +- oximeter/producer/Cargo.toml | 4 + oximeter/producer/examples/producer.rs | 45 +- oximeter/producer/src/lib.rs | 142 ++++-- 16 files changed, 1186 insertions(+), 117 deletions(-) create mode 100644 omdb/src/bin/omdb/oximeter.rs create mode 100644 oximeter/collector/src/standalone.rs diff --git a/Cargo.lock b/Cargo.lock index 8b4915bbc61..5040caae2fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5055,6 +5055,7 @@ dependencies = [ "diesel", "dropshot", "expectorate", + "futures", "humantime", "nexus-client 0.1.0", "nexus-db-model", @@ -5066,6 +5067,7 @@ dependencies = [ "omicron-nexus", "omicron-rpaths", "omicron-test-utils", + "oximeter-client", "pq-sys", "regex", "serde", @@ -5527,6 +5529,7 @@ name = "oximeter-client" version = "0.1.0" dependencies = [ "chrono", + "futures", "omicron-common 0.1.0", "progenitor", "reqwest", @@ -5539,23 +5542,30 @@ dependencies = [ name = "oximeter-collector" version = "0.1.0" dependencies = [ + "anyhow", "clap 4.4.3", "dropshot", "expectorate", "futures", "internal-dns 0.1.0", "nexus-client 0.1.0", + "nexus-types", "omicron-common 0.1.0", "omicron-test-utils", "openapi-lint", "openapiv3", "oximeter 0.1.0", + "oximeter-client", "oximeter-db", + "rand 0.8.5", "reqwest", + "schemars", "serde", "serde_json", "slog", + "slog-async", "slog-dtrace", + "slog-term", "subprocess", "thiserror", "tokio", @@ -5627,7 +5637,9 @@ dependencies = [ name = "oximeter-producer" version = "0.1.0" dependencies = [ + "anyhow", "chrono", + "clap 4.4.3", "dropshot", "nexus-client 0.1.0", "omicron-common 0.1.0", diff --git a/common/src/api/internal/nexus.rs b/common/src/api/internal/nexus.rs index 018869ce14f..983976bbb7b 100644 --- a/common/src/api/internal/nexus.rs +++ b/common/src/api/internal/nexus.rs @@ -67,7 +67,7 @@ pub struct InstanceRuntimeState { /// Information announced by a metric server, used so that clients can contact it and collect /// available metric data from it. -#[derive(Debug, Clone, JsonSchema, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, PartialEq)] pub struct ProducerEndpoint { pub id: Uuid, pub address: SocketAddr, diff --git a/omdb/Cargo.toml b/omdb/Cargo.toml index 4b3ecd6e0e8..86f2195143e 100644 --- a/omdb/Cargo.toml +++ b/omdb/Cargo.toml @@ -15,11 +15,13 @@ clap.workspace = true diesel.workspace = true dropshot.workspace = true humantime.workspace = true +futures.workspace = true nexus-client.workspace = true nexus-db-model.workspace = true nexus-db-queries.workspace = true nexus-types.workspace = true omicron-common.workspace = true +oximeter-client.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" serde.workspace = true diff --git a/omdb/src/bin/omdb/main.rs b/omdb/src/bin/omdb/main.rs index 861df47b51d..c5fa8238a71 100644 --- a/omdb/src/bin/omdb/main.rs +++ b/omdb/src/bin/omdb/main.rs @@ -25,6 +25,7 @@ use clap::Subcommand; mod db; mod nexus; +mod oximeter; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { @@ -37,6 +38,7 @@ async fn main() -> Result<(), anyhow::Error> { match args.command { OmdbCommands::Nexus(nexus) => nexus.run_cmd(&log).await, OmdbCommands::Db(db) => db.run_cmd(&log).await, + OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&log).await, } } @@ -67,6 +69,8 @@ enum OmdbCommands { Db(db::DbArgs), /// Debug a specific Nexus instance Nexus(nexus::NexusArgs), + /// Query oximeter collector state + Oximeter(oximeter::OximeterArgs), } fn parse_dropshot_log_level( diff --git a/omdb/src/bin/omdb/oximeter.rs b/omdb/src/bin/omdb/oximeter.rs new file mode 100644 index 00000000000..85f5de87589 --- /dev/null +++ b/omdb/src/bin/omdb/oximeter.rs @@ -0,0 +1,93 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! omdb commands that query oximeter + +use anyhow::Context; +use clap::Args; +use clap::Subcommand; +use futures::TryStreamExt; +use oximeter_client::types::ProducerEndpoint; +use oximeter_client::Client; +use slog::Logger; +use std::net::SocketAddr; +use std::time::Duration; +use tabled::Table; +use tabled::Tabled; +use uuid::Uuid; + +#[derive(Debug, Args)] +pub struct OximeterArgs { + /// URL of the oximeter collector to query + #[arg(long, env("OMDB_OXIMETER_URL"))] + oximeter_url: String, + + #[command(subcommand)] + command: OximeterCommands, +} + +/// Subcommands that query oximeter collector state +#[derive(Debug, Subcommand)] +enum OximeterCommands { + /// List the producers the collector is assigned to poll + ListProducers, +} + +impl OximeterArgs { + fn client(&self, log: &Logger) -> Client { + Client::new( + &self.oximeter_url, + log.new(slog::o!("component" => "oximeter-client")), + ) + } + + pub async fn run_cmd(&self, log: &Logger) -> anyhow::Result<()> { + let client = self.client(log); + match self.command { + OximeterCommands::ListProducers => { + self.list_producers(client).await + } + } + } + + async fn list_producers(&self, client: Client) -> anyhow::Result<()> { + let info = client + .collector_info() + .await + .context("failed to fetch collector info")?; + let producers: Vec = client + .producers_list_stream(None) + .map_ok(Producer::from) + .try_collect() + .await + .context("failed to list producers")?; + let table = Table::new(producers) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!("Collector ID: {}\n", info.id); + println!("{table}"); + Ok(()) + } +} + +#[derive(Tabled)] +struct Producer { + id: Uuid, + address: SocketAddr, + base_route: String, + interval: String, +} + +impl From for Producer { + fn from(p: ProducerEndpoint) -> Self { + let interval = Duration::new(p.interval.secs, p.interval.nanos); + Self { + id: p.id, + address: p.address.parse().unwrap(), + base_route: p.base_route, + interval: humantime::format_duration(interval).to_string(), + } + } +} diff --git a/omdb/tests/usage_errors.out b/omdb/tests/usage_errors.out index 860c0efe214..4846868b743 100644 --- a/omdb/tests/usage_errors.out +++ b/omdb/tests/usage_errors.out @@ -9,9 +9,10 @@ Omicron debugger (unstable) Usage: omdb [OPTIONS] Commands: - db Query the control plane database (CockroachDB) - nexus Debug a specific Nexus instance - help Print this message or the help of the given subcommand(s) + db Query the control plane database (CockroachDB) + nexus Debug a specific Nexus instance + oximeter Query oximeter collector state + help Print this message or the help of the given subcommand(s) Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] @@ -29,9 +30,10 @@ using internal APIs. This is a prototype. The commands and output are unstable Usage: omdb [OPTIONS] Commands: - db Query the control plane database (CockroachDB) - nexus Debug a specific Nexus instance - help Print this message or the help of the given subcommand(s) + db Query the control plane database (CockroachDB) + nexus Debug a specific Nexus instance + oximeter Query oximeter collector state + help Print this message or the help of the given subcommand(s) Options: --log-level diff --git a/openapi/oximeter.json b/openapi/oximeter.json index 6781b778923..ebc7957c2ee 100644 --- a/openapi/oximeter.json +++ b/openapi/oximeter.json @@ -10,7 +10,76 @@ "version": "0.0.1" }, "paths": { + "/info": { + "get": { + "operationId": "collector_info", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/CollectorInfo" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/producers": { + "get": { + "operationId": "producers_list", + "parameters": [ + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProducerEndpointResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": { + "required": [] + } + }, "post": { "operationId": "producers_post", "requestBody": { @@ -35,6 +104,33 @@ } } } + }, + "/producers/{producer_id}": { + "delete": { + "operationId": "producer_delete", + "parameters": [ + { + "in": "path", + "name": "producer_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "204": { + "description": "successful deletion" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { @@ -51,6 +147,19 @@ } }, "schemas": { + "CollectorInfo": { + "type": "object", + "properties": { + "id": { + "description": "The collector's UUID.", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "id" + ] + }, "Duration": { "type": "object", "properties": { @@ -113,6 +222,27 @@ "id", "interval" ] + }, + "ProducerEndpointResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/ProducerEndpoint" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] } } } diff --git a/oximeter-client/Cargo.toml b/oximeter-client/Cargo.toml index e4e68464d7a..041e31c8c97 100644 --- a/oximeter-client/Cargo.toml +++ b/oximeter-client/Cargo.toml @@ -6,6 +6,7 @@ license = "MPL-2.0" [dependencies] chrono.workspace = true +futures.workspace = true omicron-common.workspace = true progenitor.workspace = true reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 1137651aa04..8423ac0787f 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -6,18 +6,25 @@ description = "The oximeter metric collection server" license = "MPL-2.0" [dependencies] +anyhow.workspace = true clap.workspace = true dropshot.workspace = true futures.workspace = true internal-dns.workspace = true nexus-client.workspace = true +nexus-types.workspace = true omicron-common.workspace = true oximeter.workspace = true +oximeter-client.workspace = true oximeter-db.workspace = true +rand.workspace = true reqwest = { workspace = true, features = [ "json" ] } +schemars.workspace = true serde.workspace = true slog.workspace = true +slog-async.workspace = true slog-dtrace.workspace = true +slog-term.workspace = true thiserror.workspace = true tokio.workspace = true toml.workspace = true diff --git a/oximeter/collector/src/bin/oximeter.rs b/oximeter/collector/src/bin/oximeter.rs index bf54cf33fa0..a616a34025c 100644 --- a/oximeter/collector/src/bin/oximeter.rs +++ b/oximeter/collector/src/bin/oximeter.rs @@ -3,12 +3,21 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Main entry point to run an `oximeter` server in the control plane. -// Copyright 2021 Oxide Computer Company + +// Copyright 2023 Oxide Computer Company use clap::Parser; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; -use oximeter_collector::{oximeter_api, Config, Oximeter, OximeterArguments}; +use oximeter_collector::oximeter_api; +use oximeter_collector::standalone_nexus_api; +use oximeter_collector::Config; +use oximeter_collector::Oximeter; +use oximeter_collector::OximeterArguments; +use oximeter_collector::StandaloneNexus; +use slog::Level; +use std::net::Ipv6Addr; +use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::PathBuf; use uuid::Uuid; @@ -23,6 +32,16 @@ pub fn run_openapi() -> Result<(), String> { .map_err(|e| e.to_string()) } +pub fn run_standalone_openapi() -> Result<(), String> { + standalone_nexus_api() + .openapi("Oxide Nexus API", "0.0.1") + .description("API for interacting with Nexus") + .contact_url("https://oxide.computer") + .contact_email("api@oxide.computer") + .write(&mut std::io::stdout()) + .map_err(|e| e.to_string()) +} + /// Run an oximeter metric collection server in the Oxide Control Plane. #[derive(Parser)] #[clap(name = "oximeter", about = "See README.adoc for more information")] @@ -36,12 +55,71 @@ enum Args { #[clap(name = "CONFIG_FILE", action)] config_file: PathBuf, + /// The UUID for this instance of the `oximeter` collector. #[clap(short, long, action)] id: Uuid, + /// The socket address at which `oximeter`'s HTTP server runs. #[clap(short, long, action)] address: SocketAddrV6, }, + + /// Run `oximeter` in standalone mode for development. + /// + /// In this mode, `oximeter` can be used to test the collection of metrics + /// from producers, without requiring all the normal machinery of the + /// control plane. The collector is run as usual, but additionally starts an + /// API server to stand-in for Nexus. The registrations of the producers and + /// collectors occurs through the normal code path, but uses this standalone + /// Nexus instead of the real thing. + Standalone { + /// The ID for the collector. + /// + /// Default is to generate a new, random UUID. + #[arg(long, default_value_t = Uuid::new_v4())] + id: Uuid, + + /// Address at which `oximeter` itself listens. + /// + /// This address can be used to register new producers, after the + /// program has already started. + #[arg( + long, + default_value_t = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12223, 0, 0) + )] + address: SocketAddrV6, + + /// The address for the fake Nexus server used to register. + /// + /// This program starts a fake version of Nexus, which is used only to + /// register the producers and collectors. This allows them to operate + /// as they usually would, registering each other with Nexus so that an + /// assignment between them can be made. + #[arg( + long, + default_value_t = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12221, 0, 0) + )] + nexus: SocketAddrV6, + + /// The address for ClickHouse. + /// + /// If not provided, `oximeter` will not attempt to insert records into + /// the database at all. In this mode, the program will print the + /// collected samples, instead of inserting them into the database. + #[arg(long)] + clickhouse: Option, + + /// The log-level. + #[arg(long, default_value_t = Level::Info, value_parser = parse_log_level)] + log_level: Level, + }, + + /// Print the fake Nexus's standalone API. + StandaloneOpenapi, +} + +fn parse_log_level(s: &str) -> Result { + s.parse().map_err(|_| "Invalid log level".to_string()) } #[tokio::main] @@ -65,5 +143,26 @@ async fn do_run() -> Result<(), CmdError> { .await .map_err(|e| CmdError::Failure(e.to_string())) } + Args::Standalone { id, address, nexus, clickhouse, log_level } => { + // Start the standalone Nexus server, for registration of both the + // collector and producers. + let nexus_server = StandaloneNexus::new(nexus.into(), log_level) + .map_err(|e| CmdError::Failure(e.to_string()))?; + let args = OximeterArguments { id, address }; + Oximeter::new_standalone( + nexus_server.log(), + &args, + nexus_server.local_addr(), + clickhouse, + ) + .await + .unwrap() + .serve_forever() + .await + .map_err(|e| CmdError::Failure(e.to_string())) + } + Args::StandaloneOpenapi => { + run_standalone_openapi().map_err(CmdError::Failure) + } } } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index bf75b567eae..6674d65ecdc 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -4,35 +4,71 @@ //! Implementation of the `oximeter` metric collection server. -// Copyright 2021 Oxide Computer Company - -use dropshot::{ - endpoint, ApiDescription, ConfigDropshot, ConfigLogging, HttpError, - HttpResponseUpdatedNoContent, HttpServer, HttpServerStarter, - RequestContext, TypedBody, -}; -use internal_dns::resolver::{ResolveError, Resolver}; +// Copyright 2023 Oxide Computer Company + +use anyhow::anyhow; +use anyhow::Context; +use dropshot::endpoint; +use dropshot::ApiDescription; +use dropshot::ConfigDropshot; +use dropshot::ConfigLogging; +use dropshot::EmptyScanParams; +use dropshot::HttpError; +use dropshot::HttpResponseDeleted; +use dropshot::HttpResponseOk; +use dropshot::HttpResponseUpdatedNoContent; +use dropshot::HttpServer; +use dropshot::HttpServerStarter; +use dropshot::PaginationParams; +use dropshot::Query; +use dropshot::RequestContext; +use dropshot::ResultsPage; +use dropshot::TypedBody; +use dropshot::WhichPage; +use internal_dns::resolver::ResolveError; +use internal_dns::resolver::Resolver; use internal_dns::ServiceName; -use omicron_common::address::{CLICKHOUSE_PORT, NEXUS_INTERNAL_PORT}; +use omicron_common::address::CLICKHOUSE_PORT; +use omicron_common::address::NEXUS_INTERNAL_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; -use omicron_common::{backoff, FileKv}; -use oximeter::types::{ProducerResults, ProducerResultsItem}; -use oximeter_db::{Client, DbWrite}; -use serde::{Deserialize, Serialize}; -use slog::{debug, error, info, o, trace, warn, Drain, Logger}; -use std::collections::{btree_map::Entry, BTreeMap}; -use std::net::{SocketAddr, SocketAddrV6}; +use omicron_common::backoff; +use omicron_common::FileKv; +use oximeter::types::ProducerResults; +use oximeter::types::ProducerResultsItem; +use oximeter_db::Client; +use oximeter_db::DbWrite; +use serde::Deserialize; +use serde::Serialize; +use slog::debug; +use slog::error; +use slog::info; +use slog::o; +use slog::trace; +use slog::warn; +use slog::Drain; +use slog::Logger; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::net::SocketAddrV6; +use std::ops::Bound; use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::{ - sync::mpsc, sync::oneshot, sync::Mutex, task::JoinHandle, time::interval, -}; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time::interval; use uuid::Uuid; +mod standalone; +pub use standalone::standalone_nexus_api; +pub use standalone::Server as StandaloneNexus; + /// Errors collecting metric data -#[derive(Debug, Clone, Error)] +#[derive(Debug, Error)] pub enum Error { #[error("Error running Oximeter collector server: {0}")] Server(String), @@ -45,6 +81,48 @@ pub enum Error { #[error(transparent)] ResolveError(#[from] ResolveError), + + #[error("No producer is registered with ID")] + NoSuchProducer(Uuid), + + #[error("Error running standalone")] + Standalone(#[from] anyhow::Error), +} + +impl From for HttpError { + fn from(e: Error) -> Self { + match e { + Error::NoSuchProducer(id) => HttpError::for_not_found( + None, + format!("No such producer: {id}"), + ), + _ => HttpError::for_internal_error(e.to_string()), + } + } +} + +/// A simple representation of a producer, used mostly for standalone mode. +/// +/// These are usually specified as a structured string, formatted like: +/// `"@
"`. +#[derive(Copy, Clone, Debug)] +pub struct ProducerInfo { + /// The ID of the producer. + pub id: Uuid, + /// The address on which the producer listens. + pub address: SocketAddr, +} + +impl std::str::FromStr for ProducerInfo { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + let (id, addr) = s + .split_once('@') + .context("Producer info should written as @
")?; + let id = id.parse().context("Invalid UUID")?; + let address = addr.parse().context("Invalid address")?; + Ok(Self { id, address }) + } } type CollectionToken = oneshot::Sender<()>; @@ -61,7 +139,6 @@ enum CollectionMessage { // from its producer. Update(ProducerEndpoint), // Request that the task exit - #[allow(dead_code)] Shutdown, } @@ -72,7 +149,7 @@ async fn perform_collection( outbox: &mpsc::Sender<(Option, ProducerResults)>, token: Option, ) { - info!(log, "collecting from producer"); + debug!(log, "collecting from producer"); let res = client .get(format!( "http://{}{}", @@ -187,6 +264,44 @@ struct CollectionTask { pub task: JoinHandle<()>, } +// A task run by `oximeter` in standalone mode, which simply prints results as +// they're received. +async fn results_printer( + log: Logger, + mut rx: mpsc::Receiver<(Option, ProducerResults)>, +) { + loop { + match rx.recv().await { + Some((_, results)) => { + for res in results.into_iter() { + match res { + ProducerResultsItem::Ok(samples) => { + for sample in samples.into_iter() { + info!( + log, + ""; + "sample" => ?sample, + ); + } + } + ProducerResultsItem::Err(e) => { + error!( + log, + "received error from a producer"; + "err" => ?e, + ); + } + } + } + } + None => { + debug!(log, "result queue closed, exiting"); + return; + } + } + } +} + // Aggregation point for all results, from all collection tasks. async fn results_sink( log: Logger, @@ -286,6 +401,20 @@ pub struct DbConfig { pub batch_interval: u64, } +impl DbConfig { + pub const DEFAULT_BATCH_SIZE: usize = 1000; + pub const DEFAULT_BATCH_INTERVAL: u64 = 5; + + // Construct config with an address, using the defaults for other fields + fn with_address(address: SocketAddr) -> Self { + Self { + address: Some(address), + batch_size: Self::DEFAULT_BATCH_SIZE, + batch_interval: Self::DEFAULT_BATCH_INTERVAL, + } + } +} + /// The internal agent the oximeter server uses to collect metrics from producers. #[derive(Debug)] pub struct OximeterAgent { @@ -295,7 +424,8 @@ pub struct OximeterAgent { // Handle to the TX-side of a channel for collecting results from the collection tasks result_sender: mpsc::Sender<(Option, ProducerResults)>, // The actual tokio tasks running the collection on a timer. - collection_tasks: Arc>>, + collection_tasks: + Arc>>, } impl OximeterAgent { @@ -307,7 +437,10 @@ impl OximeterAgent { log: &Logger, ) -> Result { let (result_sender, result_receiver) = mpsc::channel(8); - let log = log.new(o!("component" => "oximeter-agent", "collector_id" => id.to_string())); + let log = log.new(o!( + "component" => "oximeter-agent", + "collector_id" => id.to_string(), + )); let insertion_log = log.new(o!("component" => "results-sink")); // Construct the ClickHouse client first, propagate an error if we can't reach the @@ -347,6 +480,61 @@ impl OximeterAgent { }) } + /// Construct a new standalone `oximeter` collector. + pub async fn new_standalone( + id: Uuid, + db_config: Option, + log: &Logger, + ) -> Result { + let (result_sender, result_receiver) = mpsc::channel(8); + let log = log.new(o!( + "component" => "oximeter-standalone", + "collector_id" => id.to_string(), + )); + + // If we have configuration for ClickHouse, we'll spawn the results + // sink task as usual. If not, we'll spawn a dummy task that simply + // prints the results as they're received. + let insertion_log = log.new(o!("component" => "results-sink")); + if let Some(db_config) = db_config { + let Some(address) = db_config.address else { + return Err(Error::Standalone(anyhow!( + "Must provide explicit IP address in standalone mode" + ))); + }; + let client = Client::new(address, &log); + let replicated = client.is_oximeter_cluster().await?; + if !replicated { + client.init_single_node_db().await?; + } else { + client.init_replicated_db().await?; + } + + // Spawn the task for aggregating and inserting all metrics + tokio::spawn(async move { + results_sink( + insertion_log, + client, + db_config.batch_size, + Duration::from_secs(db_config.batch_interval), + result_receiver, + ) + .await + }); + } else { + tokio::spawn(results_printer(insertion_log, result_receiver)); + } + + // Construct the ClickHouse client first, propagate an error if we can't reach the + // database. + Ok(Self { + id, + log, + result_sender, + collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + }) + } + /// Register a new producer with this oximeter instance. pub async fn register_producer( &self, @@ -355,30 +543,36 @@ impl OximeterAgent { let id = info.id; match self.collection_tasks.lock().await.entry(id) { Entry::Vacant(value) => { - info!(self.log, "registered new metric producer"; - "producer_id" => id.to_string(), - "address" => info.address, + debug!( + self.log, + "registered new metric producer"; + "producer_id" => id.to_string(), + "address" => info.address, ); // Build channel to control the task and receive results. let (tx, rx) = mpsc::channel(4); let q = self.result_sender.clone(); let log = self.log.new(o!("component" => "collection-task", "producer_id" => id.to_string())); + let info_clone = info.clone(); let task = tokio::spawn(async move { - collection_task(log, info, rx, q).await; + collection_task(log, info_clone, rx, q).await; }); - value.insert(CollectionTask { inbox: tx, task }); + value.insert((info, CollectionTask { inbox: tx, task })); } - Entry::Occupied(value) => { - info!( + Entry::Occupied(mut value) => { + debug!( self.log, - "received request to register existing metric producer, updating collection information"; + "received request to register existing metric \ + producer, updating collection information"; "producer_id" => id.to_string(), "interval" => ?info.interval, "address" => info.address, ); + value.get_mut().0 = info.clone(); value .get() + .1 .inbox .send(CollectionMessage::Update(info)) .await @@ -395,10 +589,10 @@ impl OximeterAgent { pub async fn force_collection(&self) { let mut collection_oneshots = vec![]; let collection_tasks = self.collection_tasks.lock().await; - for task in collection_tasks.iter() { + for (_id, (_endpoint, task)) in collection_tasks.iter() { let (tx, rx) = oneshot::channel(); // Scrape from each producer, into oximeter... - task.1.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); + task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); // ... and keep track of the token that indicates once the metric // has made it into Clickhouse. collection_oneshots.push(rx); @@ -412,6 +606,55 @@ impl OximeterAgent { // successfully, or an error occurred in the collection pathway. futures::future::join_all(collection_oneshots).await; } + + /// List existing producers. + pub async fn list_producers( + &self, + start_id: Option, + limit: usize, + ) -> Vec { + let start = if let Some(id) = start_id { + Bound::Excluded(id) + } else { + Bound::Unbounded + }; + self.collection_tasks + .lock() + .await + .range((start, Bound::Unbounded)) + .take(limit) + .map(|(_id, (info, _t))| info.clone()) + .collect() + } + + /// Delete a producer by ID, stopping its collection task. + pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { + let (_info, task) = self + .collection_tasks + .lock() + .await + .remove(&id) + .ok_or_else(|| Error::NoSuchProducer(id))?; + debug!( + self.log, + "removed collection task from set"; + "producer_id" => %id, + ); + match task.inbox.send(CollectionMessage::Shutdown).await { + Ok(_) => debug!( + self.log, + "shut down collection task"; + "producer_id" => %id, + ), + Err(e) => error!( + self.log, + "failed to shut down collection task"; + "producer_id" => %id, + "error" => ?e, + ), + } + Ok(()) + } } /// Configuration used to initialize an oximeter server @@ -440,6 +683,7 @@ impl Config { } } +/// Arguments for running the `oximeter` collector. pub struct OximeterArguments { pub id: Uuid, pub address: SocketAddrV6, @@ -447,7 +691,7 @@ pub struct OximeterArguments { /// A server used to collect metrics from components in the control plane. pub struct Oximeter { - _agent: Arc, + agent: Arc, server: HttpServer>, } @@ -572,7 +816,67 @@ impl Oximeter { .expect("Expected an infinite retry loop contacting Nexus"); info!(log, "oximeter registered with nexus"; "id" => ?agent.id); - Ok(Self { _agent: agent, server }) + Ok(Self { agent, server }) + } + + /// Create a new `oximeter` collector running in standalone mode. + pub async fn new_standalone( + log: &Logger, + args: &OximeterArguments, + nexus: SocketAddr, + clickhouse: Option, + ) -> Result { + let db_config = clickhouse.map(DbConfig::with_address); + let agent = Arc::new( + OximeterAgent::new_standalone(args.id, db_config, &log).await?, + ); + + let dropshot_log = log.new(o!("component" => "dropshot")); + let server = HttpServerStarter::new( + &ConfigDropshot { + bind_address: SocketAddr::V6(args.address), + ..Default::default() + }, + oximeter_api(), + Arc::clone(&agent), + &dropshot_log, + ) + .map_err(|e| Error::Server(e.to_string()))? + .start(); + info!(log, "started oximeter standalone server"); + + // Notify the standalone nexus. + let client = reqwest::Client::new(); + let notify_nexus = || async { + debug!(log, "contacting nexus"); + client + .post(format!("http://{}/metrics/collectors", nexus)) + .json(&nexus_client::types::OximeterInfo { + address: server.local_addr().to_string(), + collector_id: agent.id, + }) + .send() + .await + .map_err(|e| backoff::BackoffError::transient(e.to_string()))? + .error_for_status() + .map_err(|e| backoff::BackoffError::transient(e.to_string())) + }; + let log_notification_failure = |error, delay| { + warn!( + log, + "failed to contact nexus, will retry in {:?}", delay; + "error" => ?error + ); + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + notify_nexus, + log_notification_failure, + ) + .await + .expect("Expected an infinite retry loop contacting Nexus"); + + Ok(Self { agent, server }) } /// Serve requests forever, consuming the server. @@ -592,6 +896,20 @@ impl Oximeter { pub async fn force_collect(&self) { self.server.app_private().force_collection().await } + + /// List producers. + pub async fn list_producers( + &self, + start: Option, + limit: usize, + ) -> Vec { + self.agent.list_producers(start, limit).await + } + + /// Delete a producer by ID, stopping its collection task. + pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { + self.agent.delete_producer(id).await + } } // Build the HTTP API internal to the control plane @@ -599,6 +917,12 @@ pub fn oximeter_api() -> ApiDescription> { let mut api = ApiDescription::new(); api.register(producers_post) .expect("Could not register producers_post API handler"); + api.register(producers_list) + .expect("Could not register producers_list API handler"); + api.register(producer_delete) + .expect("Could not register producers_delete API handler"); + api.register(collector_info) + .expect("Could not register collector_info API handler"); api } @@ -616,6 +940,79 @@ async fn producers_post( agent .register_producer(producer_info) .await - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - Ok(HttpResponseUpdatedNoContent()) + .map_err(HttpError::from) + .map(|_| HttpResponseUpdatedNoContent()) +} + +// Parameters for paginating the list of producers. +#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] +struct ProducerPage { + id: Uuid, +} + +// List all producers +#[endpoint { + method = GET, + path = "/producers", +}] +async fn producers_list( + request_context: RequestContext>, + query: Query>, +) -> Result>, HttpError> { + let agent = request_context.context(); + let pagination = query.into_inner(); + let limit = request_context.page_limit(&pagination)?.get() as usize; + let start = match &pagination.page { + WhichPage::First(..) => None, + WhichPage::Next(ProducerPage { id }) => Some(*id), + }; + let producers = agent.list_producers(start, limit).await; + ResultsPage::new( + producers, + &EmptyScanParams {}, + |info: &ProducerEndpoint, _| ProducerPage { id: info.id }, + ) + .map(HttpResponseOk) +} + +#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] +struct ProducerIdPathParams { + producer_id: Uuid, +} + +// Delete a producer by ID. +#[endpoint { + method = DELETE, + path = "/producers/{producer_id}", +}] +async fn producer_delete( + request_context: RequestContext>, + path: dropshot::Path, +) -> Result { + let agent = request_context.context(); + let producer_id = path.into_inner().producer_id; + agent + .delete_producer(producer_id) + .await + .map_err(HttpError::from) + .map(|_| HttpResponseDeleted()) +} + +#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] +pub struct CollectorInfo { + /// The collector's UUID. + pub id: Uuid, +} + +// Return identifying information about this collector +#[endpoint { + method = GET, + path = "/info", +}] +async fn collector_info( + request_context: RequestContext>, +) -> Result, HttpError> { + let agent = request_context.context(); + let info = CollectorInfo { id: agent.id }; + Ok(HttpResponseOk(info)) } diff --git a/oximeter/collector/src/standalone.rs b/oximeter/collector/src/standalone.rs new file mode 100644 index 00000000000..826a5f46630 --- /dev/null +++ b/oximeter/collector/src/standalone.rs @@ -0,0 +1,263 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Implementation of a standalone fake Nexus, simply for registering producers +//! and collectors with one another. + +// Copyright 2023 Oxide Computer Company + +use crate::Error; +use dropshot::endpoint; +use dropshot::ApiDescription; +use dropshot::ConfigDropshot; +use dropshot::HttpError; +use dropshot::HttpResponseUpdatedNoContent; +use dropshot::HttpServer; +use dropshot::HttpServerStarter; +use dropshot::RequestContext; +use dropshot::TypedBody; +use nexus_types::internal_api::params::OximeterInfo; +use omicron_common::api::internal::nexus::ProducerEndpoint; +use omicron_common::FileKv; +use oximeter_client::Client; +use rand::seq::IteratorRandom; +use slog::debug; +use slog::error; +use slog::info; +use slog::o; +use slog::Drain; +use slog::Level; +use slog::Logger; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::Mutex; +use uuid::Uuid; + +// An assignment of a producer to an oximeter collector. +#[derive(Debug)] +struct ProducerAssignment { + producer: ProducerEndpoint, + collector_id: Uuid, +} + +#[derive(Debug)] +struct Inner { + // Map of producers by ID to their information and assigned oximeter + // collector. + producers: HashMap, + // Map of available oximeter collectors. + collectors: HashMap, +} + +impl Inner { + fn random_collector(&self) -> Option<(Uuid, OximeterInfo)> { + self.collectors + .iter() + .choose(&mut rand::thread_rng()) + .map(|(id, info)| (*id, *info)) + } +} + +// A stripped-down Nexus server, with only the APIs for registering metric +// producers and collectors. +#[derive(Debug)] +pub struct StandaloneNexus { + pub log: Logger, + inner: Mutex, +} + +impl StandaloneNexus { + fn new(log: Logger) -> Self { + Self { + log, + inner: Mutex::new(Inner { + producers: HashMap::new(), + collectors: HashMap::new(), + }), + } + } + + async fn register_producer( + &self, + info: &ProducerEndpoint, + ) -> Result<(), HttpError> { + let mut inner = self.inner.lock().await; + let assignment = match inner.producers.get_mut(&info.id) { + None => { + // There is no record for this producer. + // + // Select a random collector, and assign it to the producer. + // We'll return the assignment from this match block. + let Some((collector_id, collector_info)) = + inner.random_collector() + else { + return Err(HttpError::for_unavail( + None, + String::from("No collectors available"), + )); + }; + let client = Client::new( + format!("http://{}", collector_info.address).as_str(), + self.log.clone(), + ); + client.producers_post(&info.into()).await.map_err(|e| { + HttpError::for_internal_error(e.to_string()) + })?; + let assignment = + ProducerAssignment { producer: info.clone(), collector_id }; + assignment + } + Some(existing_assignment) => { + // We have a record, first check if it matches the assignment we + // have. + if &existing_assignment.producer == info { + return Ok(()); + } + + // This appears to be a re-registration, e.g., the producer + // changed its IP address. Re-register it with the collector to + // which it's already assigned. + let collector_id = existing_assignment.collector_id; + let collector_info = + inner.collectors.get(&collector_id).unwrap(); + let client = Client::new( + format!("http://{}", collector_info.address).as_str(), + self.log.clone(), + ); + client.producers_post(&info.into()).await.map_err(|e| { + HttpError::for_internal_error(e.to_string()) + })?; + ProducerAssignment { producer: info.clone(), collector_id } + } + }; + inner.producers.insert(info.id, assignment); + Ok(()) + } + + async fn register_collector( + &self, + info: OximeterInfo, + ) -> Result<(), HttpError> { + // If this is being registered again, send all its assignments again. + let mut inner = self.inner.lock().await; + if inner.collectors.insert(info.collector_id, info).is_some() { + let client = Client::new( + format!("http://{}", info.address).as_str(), + self.log.clone(), + ); + for producer_info in + inner.producers.values().filter_map(|assignment| { + if assignment.collector_id == info.collector_id { + Some(&assignment.producer) + } else { + None + } + }) + { + client.producers_post(&producer_info.into()).await.map_err( + |e| HttpError::for_internal_error(e.to_string()), + )?; + } + } + Ok(()) + } +} + +// Build the HTTP API of the fake Nexus for registration. +pub fn standalone_nexus_api() -> ApiDescription> { + let mut api = ApiDescription::new(); + api.register(cpapi_producers_post) + .expect("Could not register cpapi_producers_post API handler"); + api.register(cpapi_collectors_post) + .expect("Could not register cpapi_collectors_post API handler"); + api +} + +/// Accept a registration from a new metric producer +#[endpoint { + method = POST, + path = "/metrics/producers", + }] +async fn cpapi_producers_post( + request_context: RequestContext>, + producer_info: TypedBody, +) -> Result { + let context = request_context.context(); + let producer_info = producer_info.into_inner(); + context + .register_producer(&producer_info) + .await + .map(|_| HttpResponseUpdatedNoContent()) + .map_err(|e| HttpError::for_internal_error(e.to_string())) +} + +/// Accept a notification of a new oximeter collection server. +#[endpoint { + method = POST, + path = "/metrics/collectors", + }] +async fn cpapi_collectors_post( + request_context: RequestContext>, + oximeter_info: TypedBody, +) -> Result { + let context = request_context.context(); + let oximeter_info = oximeter_info.into_inner(); + context + .register_collector(oximeter_info) + .await + .map(|_| HttpResponseUpdatedNoContent()) + .map_err(|e| HttpError::for_internal_error(e.to_string())) +} + +/// A standalone Nexus server, with APIs only for registering metric collectors +/// and producers. +pub struct Server { + server: HttpServer>, +} + +impl Server { + /// Create a new server listening on the provided address. + pub fn new(address: SocketAddr, log_level: Level) -> Result { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let drain = slog::LevelFilter::new(drain, log_level).fuse(); + let (drain, registration) = slog_dtrace::with_drain(drain); + let log = slog::Logger::root(drain.fuse(), o!(FileKv)); + if let slog_dtrace::ProbeRegistration::Failed(e) = registration { + let msg = format!("failed to register DTrace probes: {}", e); + error!(log, "{}", msg); + return Err(Error::Server(msg)); + } else { + debug!(log, "registered DTrace probes"); + } + + let nexus = Arc::new(StandaloneNexus::new( + log.new(slog::o!("component" => "nexus-standalone")), + )); + let server = HttpServerStarter::new( + &ConfigDropshot { bind_address: address, ..Default::default() }, + standalone_nexus_api(), + Arc::clone(&nexus), + &log, + ) + .map_err(|e| Error::Server(e.to_string()))? + .start(); + info!( + log, + "created standalone nexus server for metric collections"; + "address" => %address, + ); + Ok(Self { server }) + } + + pub fn log(&self) -> &Logger { + &self.server.app_private().log + } + + pub fn local_addr(&self) -> SocketAddr { + self.server.local_addr() + } +} diff --git a/oximeter/collector/tests/output/cmd-oximeter-noargs-stderr b/oximeter/collector/tests/output/cmd-oximeter-noargs-stderr index 7b736fe8a12..3f0fd4726d1 100644 --- a/oximeter/collector/tests/output/cmd-oximeter-noargs-stderr +++ b/oximeter/collector/tests/output/cmd-oximeter-noargs-stderr @@ -3,9 +3,11 @@ See README.adoc for more information Usage: oximeter Commands: - openapi Print the external OpenAPI Spec document and exit - run Start an Oximeter server - help Print this message or the help of the given subcommand(s) + openapi Print the external OpenAPI Spec document and exit + run Start an Oximeter server + standalone Run `oximeter` in standalone mode for development + standalone-openapi Print the fake Nexus's standalone API + help Print this message or the help of the given subcommand(s) Options: -h, --help Print help diff --git a/oximeter/producer/Cargo.toml b/oximeter/producer/Cargo.toml index e511294e529..ba560eabf56 100644 --- a/oximeter/producer/Cargo.toml +++ b/oximeter/producer/Cargo.toml @@ -19,3 +19,7 @@ slog-dtrace.workspace = true tokio.workspace = true thiserror.workspace = true uuid.workspace = true + +[dev-dependencies] +anyhow.workspace = true +clap.workspace = true diff --git a/oximeter/producer/examples/producer.rs b/oximeter/producer/examples/producer.rs index 9ff30032ca1..dd9722c80a9 100644 --- a/oximeter/producer/examples/producer.rs +++ b/oximeter/producer/examples/producer.rs @@ -6,14 +6,17 @@ // Copyright 2023 Oxide Computer Company +use anyhow::Context; use chrono::DateTime; use chrono::Utc; +use clap::Parser; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use dropshot::ConfigLoggingLevel; use dropshot::HandlerTaskMode; use omicron_common::api::internal::nexus::ProducerEndpoint; use oximeter::types::Cumulative; +use oximeter::types::ProducerRegistry; use oximeter::types::Sample; use oximeter::Metric; use oximeter::MetricsError; @@ -22,9 +25,22 @@ use oximeter::Target; use oximeter_producer::Config; use oximeter_producer::LogConfig; use oximeter_producer::Server; +use std::net::SocketAddr; use std::time::Duration; use uuid::Uuid; +/// Run an example oximeter metric producer. +#[derive(Parser)] +struct Args { + /// The address to use for the producer server. + #[arg(long, default_value = "[::1]:0")] + address: SocketAddr, + + /// The address of nexus at which to register. + #[arg(long, default_value = "[::1]:12221")] + nexus: SocketAddr, +} + /// Example target describing a virtual machine. #[derive(Debug, Clone, Target)] pub struct VirtualMachine { @@ -93,30 +109,29 @@ impl Producer for CpuBusyProducer { } #[tokio::main] -async fn main() { - let address = "[::1]:0".parse().unwrap(); +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); let dropshot = ConfigDropshot { - bind_address: address, + bind_address: args.address, request_body_max_bytes: 2048, default_handler_task_mode: HandlerTaskMode::Detached, }; let log = LogConfig::Config(ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Debug, }); + let registry = ProducerRegistry::new(); + let producer = CpuBusyProducer::new(4); + registry.register_producer(producer).unwrap(); let server_info = ProducerEndpoint { - id: Uuid::new_v4(), - address, + id: registry.producer_id(), + address: args.address, base_route: "/collect".to_string(), interval: Duration::from_secs(10), }; - let config = Config { - server_info, - registration_address: "[::1]:12221".parse().unwrap(), - dropshot, - log, - }; - let server = Server::start(&config).await.unwrap(); - let producer = CpuBusyProducer::new(4); - server.registry().register_producer(producer).unwrap(); - server.serve_forever().await.unwrap(); + let config = + Config { server_info, registration_address: args.nexus, dropshot, log }; + let server = Server::with_registry(registry, &config) + .await + .context("failed to create producer")?; + server.serve_forever().await.context("server failed") } diff --git a/oximeter/producer/src/lib.rs b/oximeter/producer/src/lib.rs index 01910af8e89..2354f9c2176 100644 --- a/oximeter/producer/src/lib.rs +++ b/oximeter/producer/src/lib.rs @@ -40,6 +40,9 @@ pub enum Error { #[error("Error registering as metric producer: {0}")] RegistrationError(String), + + #[error("Producer registry and config UUIDs do not match")] + UuidMismatch, } /// Either configuration for building a logger, or an actual logger already @@ -82,14 +85,59 @@ impl Server { /// Start a new metric server, registering it with the chosen endpoint, and listening for /// requests on the associated address and route. pub async fn start(config: &Config) -> Result { - // Clone mutably, as we may update the address after the server starts, see below. - let mut config = config.clone(); + Self::with_registry( + ProducerRegistry::with_id(config.server_info.id), + &config, + ) + .await + } + + /// Create a new metric producer server, with an existing registry. + pub async fn with_registry( + registry: ProducerRegistry, + config: &Config, + ) -> Result { + Self::new_impl( + registry, + config.server_info.clone(), + &config.registration_address, + &config.dropshot, + &config.log, + ) + .await + } + + /// Serve requests for metrics. + pub async fn serve_forever(self) -> Result<(), Error> { + self.server.await.map_err(Error::Server) + } + + /// Close the server + pub async fn close(self) -> Result<(), Error> { + self.server.close().await.map_err(Error::Server) + } + + /// Return the [`ProducerRegistry`] managed by this server. + /// + /// The registry is thread-safe and clonable, so the returned reference can be used throughout + /// an application to register types implementing the [`Producer`](oximeter::traits::Producer) + /// trait. The samples generated by the registered producers will be included in response to a + /// request on the collection endpoint. + pub fn registry(&self) -> &ProducerRegistry { + &self.registry + } + + /// Return the server's local listening address + pub fn address(&self) -> std::net::SocketAddr { + self.server.local_addr() + } + fn build_logger(log: &LogConfig) -> Result { // Build a logger, either using the configuration or actual logger // provided. First build the base logger from the configuration or a // clone of the provided logger, and then add the DTrace and Dropshot // loggers on top of it. - let base_logger = match config.log { + let base_logger = match log { LogConfig::Config(conf) => conf .to_logger("metric-server") .map_err(|msg| Error::Server(msg.to_string()))?, @@ -104,74 +152,64 @@ impl Server { } else { debug!(log, "registered DTrace probes"); } - let dropshot_log = log.new(o!("component" => "dropshot")); + Ok(log) + } - // Build the producer registry and server that uses it as its context. - let registry = ProducerRegistry::with_id(config.server_info.id); - let server = HttpServerStarter::new( - &config.dropshot, + fn build_dropshot_server( + log: &Logger, + registry: &ProducerRegistry, + dropshot: &ConfigDropshot, + ) -> Result, Error> { + let dropshot_log = log.new(o!("component" => "dropshot")); + HttpServerStarter::new( + dropshot, metric_server_api(), registry.clone(), &dropshot_log, ) - .map_err(|e| Error::Server(e.to_string()))? - .start(); - - // Client code may decide to assign a specific address and/or port, or to listen on any - // available address and port, assigned by the OS. For example, `[::1]:0` would assign any - // port on localhost. If needed, update the address in the `ProducerEndpoint` with the - // actual address the server has bound. - // - // TODO-robustness: Is there a better way to do this? We'd like to support users picking an - // exact address or using whatever's available. The latter is useful during tests or other - // situations in which we don't know which ports are available. - if config.server_info.address != server.local_addr() { - assert_eq!(config.server_info.address.port(), 0); + .map_err(|e| Error::Server(e.to_string())) + .map(HttpServerStarter::start) + } + + // Create a new server registering with Nexus. + async fn new_impl( + registry: ProducerRegistry, + mut server_info: ProducerEndpoint, + registration_address: &SocketAddr, + dropshot: &ConfigDropshot, + log: &LogConfig, + ) -> Result { + if registry.producer_id() != server_info.id { + return Err(Error::UuidMismatch); + } + let log = Self::build_logger(log)?; + let server = Self::build_dropshot_server(&log, ®istry, dropshot)?; + + // Update the producer endpoint address with the actual server's + // address, to handle cases where client listens on any available + // address. + if server_info.address != server.local_addr() { + assert_eq!(server_info.address.port(), 0); debug!( log, "Requested any available port, Dropshot server has been bound to {}", server.local_addr(), ); - config.server_info.address = server.local_addr(); + server_info.address = server.local_addr(); } debug!(log, "registering metric server as a producer"); - register(config.registration_address, &log, &config.server_info) - .await?; + register(*registration_address, &log, &server_info).await?; info!( log, - "starting oximeter metric server"; - "route" => config.server_info.collection_route(), + "starting oximeter metric producer server"; + "route" => server_info.collection_route(), "producer_id" => ?registry.producer_id(), - "address" => config.server_info.address, + "address" => server.local_addr(), + "interval" => ?server_info.interval, ); Ok(Self { registry, server }) } - - /// Serve requests for metrics. - pub async fn serve_forever(self) -> Result<(), Error> { - self.server.await.map_err(Error::Server) - } - - /// Close the server - pub async fn close(self) -> Result<(), Error> { - self.server.close().await.map_err(Error::Server) - } - - /// Return the [`ProducerRegistry`] managed by this server. - /// - /// The registry is thread-safe and clonable, so the returned reference can be used throughout - /// an application to register types implementing the [`Producer`](oximeter::traits::Producer) - /// trait. The samples generated by the registered producers will be included in response to a - /// request on the collection endpoint. - pub fn registry(&self) -> &ProducerRegistry { - &self.registry - } - - /// Return the server's local listening address - pub fn address(&self) -> std::net::SocketAddr { - self.server.local_addr() - } } // Register API endpoints of the `Server`.