From 21602a89e452da543d04f7b09281a2574914c1bc Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Mon, 18 Sep 2023 20:20:35 +0000 Subject: [PATCH] WIP --- oximeter/collector/src/bin/oximeter.rs | 24 +++--- oximeter/collector/src/lib.rs | 29 ++++--- oximeter/collector/src/standalone.rs | 108 ++++++++++++------------- oximeter/producer/examples/producer.rs | 28 +++---- oximeter/producer/src/lib.rs | 32 -------- 5 files changed, 96 insertions(+), 125 deletions(-) diff --git a/oximeter/collector/src/bin/oximeter.rs b/oximeter/collector/src/bin/oximeter.rs index 221af51c7b7..bc7fe94579f 100644 --- a/oximeter/collector/src/bin/oximeter.rs +++ b/oximeter/collector/src/bin/oximeter.rs @@ -12,15 +12,15 @@ use omicron_common::cmd::CmdError; use oximeter_collector::oximeter_api; use oximeter_collector::standalone_nexus_api; use oximeter_collector::Config; +use oximeter_collector::Error; use oximeter_collector::Oximeter; use oximeter_collector::OximeterArguments; +use oximeter_collector::StandaloneNexus; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::PathBuf; use uuid::Uuid; -use oximeter_collector::StandaloneNexus; -use oximeter_collector::Error; pub fn run_openapi() -> Result<(), String> { oximeter_api() @@ -136,19 +136,21 @@ async fn do_run() -> Result<(), CmdError> { Args::Standalone { id, address, nexus, clickhouse } => { // Start the standalone Nexus server, for registration of both the // collector and producers. - let nexus_server = run_standalone_nexus(nexus).await.map_err(|e| CmdError::Failure(e.to_string()))?; + let nexus_server = run_standalone_nexus(nexus) + .await + .map_err(|e| CmdError::Failure(e.to_string()))?; let args = OximeterArguments { id, address }; Oximeter::new_standalone( nexus_server.log(), &args, nexus_server.local_addr(), - clickhouse + clickhouse, ) - .await - .unwrap() - .serve_forever() - .await - .map_err(|e| CmdError::Failure(e.to_string())) + .await + .unwrap() + .serve_forever() + .await + .map_err(|e| CmdError::Failure(e.to_string())) } Args::StandaloneOpenapi => { run_standalone_openapi().map_err(CmdError::Failure) @@ -156,6 +158,8 @@ async fn do_run() -> Result<(), CmdError> { } } -async fn run_standalone_nexus(addr: SocketAddrV6) -> Result { +async fn run_standalone_nexus( + addr: SocketAddrV6, +) -> Result { StandaloneNexus::new(addr.into()) } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index b9db29f35a0..6674d65ecdc 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -6,26 +6,25 @@ // Copyright 2023 Oxide Computer Company -use std::ops::Bound; use anyhow::anyhow; use anyhow::Context; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; +use dropshot::EmptyScanParams; use dropshot::HttpError; use dropshot::HttpResponseDeleted; use dropshot::HttpResponseOk; use dropshot::HttpResponseUpdatedNoContent; use dropshot::HttpServer; use dropshot::HttpServerStarter; -use dropshot::RequestContext; -use dropshot::TypedBody; use dropshot::PaginationParams; -use dropshot::ResultsPage; use dropshot::Query; +use dropshot::RequestContext; +use dropshot::ResultsPage; +use dropshot::TypedBody; use dropshot::WhichPage; -use dropshot::EmptyScanParams; use internal_dns::resolver::ResolveError; use internal_dns::resolver::Resolver; use internal_dns::ServiceName; @@ -52,6 +51,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; +use std::ops::Bound; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -64,8 +64,8 @@ use tokio::time::interval; use uuid::Uuid; mod standalone; -pub use standalone::Server as StandaloneNexus; pub use standalone::standalone_nexus_api; +pub use standalone::Server as StandaloneNexus; /// Errors collecting metric data #[derive(Debug, Error)] @@ -608,7 +608,11 @@ impl OximeterAgent { } /// List existing producers. - pub async fn list_producers(&self, start_id: Option, limit: usize) -> Vec { + pub async fn list_producers( + &self, + start_id: Option, + limit: usize, + ) -> Vec { let start = if let Some(id) = start_id { Bound::Excluded(id) } else { @@ -894,7 +898,11 @@ impl Oximeter { } /// List producers. - pub async fn list_producers(&self, start: Option, limit: usize) -> Vec { + pub async fn list_producers( + &self, + start: Option, + limit: usize, + ) -> Vec { self.agent.list_producers(start, limit).await } @@ -949,7 +957,7 @@ struct ProducerPage { }] async fn producers_list( request_context: RequestContext>, - query: Query> + query: Query>, ) -> Result>, HttpError> { let agent = request_context.context(); let pagination = query.into_inner(); @@ -963,7 +971,8 @@ async fn producers_list( producers, &EmptyScanParams {}, |info: &ProducerEndpoint, _| ProducerPage { id: info.id }, - ).map(HttpResponseOk) + ) + .map(HttpResponseOk) } #[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)] diff --git a/oximeter/collector/src/standalone.rs b/oximeter/collector/src/standalone.rs index ad841a005b1..ef429358888 100644 --- a/oximeter/collector/src/standalone.rs +++ b/oximeter/collector/src/standalone.rs @@ -7,31 +7,31 @@ // Copyright 2023 Oxide Computer Company +use crate::Error; use dropshot::endpoint; -use tokio::sync::Mutex; -use std::sync::Arc; -use std::collections::HashMap; use dropshot::ApiDescription; -use dropshot::RequestContext; -use dropshot::TypedBody; +use dropshot::ConfigDropshot; use dropshot::HttpError; use dropshot::HttpResponseUpdatedNoContent; -use omicron_common::api::internal::nexus::ProducerEndpoint; +use dropshot::HttpServer; +use dropshot::HttpServerStarter; +use dropshot::RequestContext; +use dropshot::TypedBody; use nexus_types::internal_api::params::OximeterInfo; +use omicron_common::api::internal::nexus::ProducerEndpoint; +use omicron_common::FileKv; use oximeter_client::Client; use rand::seq::IteratorRandom; -use uuid::Uuid; -use slog::Logger; -use slog::Drain; -use slog::o; use slog::debug; use slog::error; -use crate::Error; -use dropshot::HttpServer; +use slog::o; +use slog::Drain; +use slog::Logger; +use std::collections::HashMap; use std::net::SocketAddr; -use dropshot::HttpServerStarter; -use dropshot::ConfigDropshot; -use omicron_common::FileKv; +use std::sync::Arc; +use tokio::sync::Mutex; +use uuid::Uuid; #[derive(Debug)] struct ProducerAssignment { @@ -47,8 +47,7 @@ struct Inner { impl Inner { fn random_collector(&self) -> Option<(Uuid, OximeterInfo)> { - self - .collectors + self.collectors .iter() .choose(&mut rand::thread_rng()) .map(|(id, info)| (*id, info.clone())) @@ -68,11 +67,14 @@ impl StandaloneNexus { inner: Mutex::new(Inner { producers: HashMap::new(), collectors: HashMap::new(), - }) + }), } } - async fn register_producer(&self, info: &ProducerEndpoint) -> Result<(), HttpError> { + async fn register_producer( + &self, + info: &ProducerEndpoint, + ) -> Result<(), HttpError> { let mut inner = self.inner.lock().await; let assignment = match inner.producers.get_mut(&info.id) { None => { @@ -80,21 +82,23 @@ impl StandaloneNexus { // // Select a random collector, and assign it to the producer. // We'll return the assignment from this match block. - let Some((collector_id, collector_info)) = inner.random_collector() else { - return Err(HttpError::for_unavail(None, String::from("No collectors available"))); + let Some((collector_id, collector_info)) = + inner.random_collector() + else { + return Err(HttpError::for_unavail( + None, + String::from("No collectors available"), + )); }; let client = Client::new( format!("http://{}", collector_info.address).as_str(), self.log.clone(), ); - client - .producers_post(&info.into()) - .await - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - let assignment = ProducerAssignment { - producer: info.clone(), - collector_id, - }; + client.producers_post(&info.into()).await.map_err(|e| { + HttpError::for_internal_error(e.to_string()) + })?; + let assignment = + ProducerAssignment { producer: info.clone(), collector_id }; assignment } Some(existing_assignment) => { @@ -108,26 +112,26 @@ impl StandaloneNexus { // changed its IP address. Re-register it with the collector to // which it's already assigned. let collector_id = existing_assignment.collector_id; - let collector_info = inner.collectors.get(&collector_id).unwrap(); + let collector_info = + inner.collectors.get(&collector_id).unwrap(); let client = Client::new( format!("http://{}", collector_info.address).as_str(), self.log.clone(), ); - client - .producers_post(&info.into()) - .await - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - ProducerAssignment { - producer: info.clone(), - collector_id, - } + client.producers_post(&info.into()).await.map_err(|e| { + HttpError::for_internal_error(e.to_string()) + })?; + ProducerAssignment { producer: info.clone(), collector_id } } }; inner.producers.insert(info.id, assignment); Ok(()) } - async fn register_collector(&self, info: OximeterInfo) -> Result<(), HttpError> { + async fn register_collector( + &self, + info: OximeterInfo, + ) -> Result<(), HttpError> { // If this is being registered again, send all its assignments again. let mut inner = self.inner.lock().await; if inner.collectors.insert(info.collector_id, info.clone()).is_some() { @@ -135,10 +139,8 @@ impl StandaloneNexus { format!("http://{}", info.address).as_str(), self.log.clone(), ); - for producer_info in inner - .producers - .values() - .filter_map(|assignment| { + for producer_info in + inner.producers.values().filter_map(|assignment| { if assignment.collector_id == info.collector_id { Some(&assignment.producer) } else { @@ -146,10 +148,9 @@ impl StandaloneNexus { } }) { - client - .producers_post(&producer_info.into()) - .await - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + client.producers_post(&producer_info.into()).await.map_err( + |e| HttpError::for_internal_error(e.to_string()), + )?; } } Ok(()) @@ -177,7 +178,8 @@ async fn cpapi_producers_post( ) -> Result { let context = request_context.context(); let producer_info = producer_info.into_inner(); - context.register_producer(&producer_info) + context + .register_producer(&producer_info) .await .map(|_| HttpResponseUpdatedNoContent()) .map_err(|e| HttpError::for_internal_error(e.to_string())) @@ -194,7 +196,8 @@ async fn cpapi_collectors_post( ) -> Result { let context = request_context.context(); let oximeter_info = oximeter_info.into_inner(); - context.register_collector(oximeter_info) + context + .register_collector(oximeter_info) .await .map(|_| HttpResponseUpdatedNoContent()) .map_err(|e| HttpError::for_internal_error(e.to_string())) @@ -220,13 +223,10 @@ impl Server { } let nexus = Arc::new(StandaloneNexus::new( - log.new(slog::o!("component" => "nexus-standalone")) + log.new(slog::o!("component" => "nexus-standalone")), )); let server = HttpServerStarter::new( - &ConfigDropshot { - bind_address: address, - ..Default::default() - }, + &ConfigDropshot { bind_address: address, ..Default::default() }, standalone_nexus_api(), Arc::clone(&nexus), &log, diff --git a/oximeter/producer/examples/producer.rs b/oximeter/producer/examples/producer.rs index 9ab313d2508..b3fc2f88eb7 100644 --- a/oximeter/producer/examples/producer.rs +++ b/oximeter/producer/examples/producer.rs @@ -32,16 +32,12 @@ use uuid::Uuid; /// Run an example oximeter metric producer. #[derive(Parser)] struct Args { - /// If true, run in standalone mode, without registering with Nexus. - #[arg(long)] - standalone: bool, - /// The address to use for the producer server. #[arg(long, default_value = "[::1]:0")] address: SocketAddr, /// The address of nexus at which to register. - #[arg(long, conflicts_with = "standalone", default_value = "[::1]:12221")] + #[arg(long, default_value = "[::1]:12221")] nexus: SocketAddr, } @@ -132,20 +128,14 @@ async fn main() -> anyhow::Result<()> { base_route: "/collect".to_string(), interval: Duration::from_secs(10), }; - let server = if args.standalone { - Server::new_standalone(registry, server_info, dropshot, log) - .await - .context("failed to create standalone producer")? - } else { - let config = Config { - server_info, - registration_address: args.nexus, - dropshot, - log, - }; - Server::with_registry(registry, &config) - .await - .context("failed to create producer")? + let config = Config { + server_info, + registration_address: args.nexus, + dropshot, + log, }; + let server = Server::with_registry(registry, &config) + .await + .context("failed to create producer")?; server.serve_forever().await.context("server failed") } diff --git a/oximeter/producer/src/lib.rs b/oximeter/producer/src/lib.rs index 36ed3b79f3b..2354f9c2176 100644 --- a/oximeter/producer/src/lib.rs +++ b/oximeter/producer/src/lib.rs @@ -107,38 +107,6 @@ impl Server { .await } - /// Create a standalone producer server, without registering with Nexus. - /// - /// This is intended for development situations. Normally, producers must - /// register with Nexus, which assigns them to an `oximeter` collector - /// instance, and records the assignment durably in the control plane - /// database. However, one may want to run the producer server during - /// development without requiring all the control plane machinery. - /// - /// This can be used to spawn a server intended to use with `oximeter` - /// running in standalone mode. - pub async fn new_standalone( - registry: ProducerRegistry, - server_info: ProducerEndpoint, - dropshot: ConfigDropshot, - log: LogConfig, - ) -> Result { - if registry.producer_id() != server_info.id { - return Err(Error::UuidMismatch); - } - let log = Self::build_logger(&log)?; - let server = Self::build_dropshot_server(&log, ®istry, &dropshot)?; - info!( - log, - "starting oximeter standalone metric producer server"; - "route" => server_info.collection_route(), - "producer_id" => ?registry.producer_id(), - "address" => server.local_addr(), - "interval" => ?server_info.interval, - ); - Ok(Self { registry, server }) - } - /// Serve requests for metrics. pub async fn serve_forever(self) -> Result<(), Error> { self.server.await.map_err(Error::Server)