From 3a0e0bb13b061d06e0eda9a65e91278d0c8583d2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 21 Aug 2024 14:54:29 -0700 Subject: [PATCH] add a bit more config validation --- gateway/src/metrics.rs | 101 ++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 37 deletions(-) diff --git a/gateway/src/metrics.rs b/gateway/src/metrics.rs index c4b097263b4..a0a86e2c676 100644 --- a/gateway/src/metrics.rs +++ b/gateway/src/metrics.rs @@ -83,6 +83,15 @@ pub struct DevConfig { pub bind_loopback: bool, } +struct ValidatedMetricsConfig { + sp_poll_interval: Duration, + oximeter_collection_interval: Duration, + /// Capacity for the channel of samples from poller tasks to the Oximeter + /// producer. + max_buffered_sample_chunks: usize, + dev_config: Option, +} + /// Manages SP pollers, making sure that every SP has a poller task. struct PollerManager { log: slog::Logger, @@ -128,7 +137,6 @@ struct ServerManager { log: slog::Logger, addrs: watch::Receiver>, registry: ProducerRegistry, - cfg: MetricsConfig, } #[derive(Debug)] @@ -171,6 +179,7 @@ impl Metrics { apictx: Arc, ) -> anyhow::Result { let &MgsArguments { id, rack_id, ref addresses } = args; + let cfg = cfg.validate()?; // Create a channel for the SP poller tasks to send samples to the // Oximeter producer endpoint. @@ -183,9 +192,8 @@ impl Metrics { // is what we want, as we would prefer a full buffer to result in // clobbering the oldest measurements, rather than leaving the newest // ones on the floor. - let max_buffered_sample_chunks = cfg.sample_channel_capacity(); let (sample_tx, sample_rx) = - broadcast::channel(max_buffered_sample_chunks); + broadcast::channel(cfg.max_buffered_sample_chunks); // Using a channel for this is, admittedly, a bit of an end-run around // the `OnceLock` on the `ServerContext` that *also* stores the rack ID, @@ -204,20 +212,18 @@ impl Metrics { }; let pollers = { let log = log.new(slog::o!("component" => "sensor-poller")); - let poll_interval = - Duration::from_millis(cfg.sp_poll_interval_ms as u64); slog::info!( &log, "SP sensor metrics configured"; - "poll_interval" => ?poll_interval, - "max_buffered_sample_chunks" => max_buffered_sample_chunks, + "poll_interval" => ?cfg.sp_poll_interval, + "max_buffered_sample_chunks" => cfg.max_buffered_sample_chunks, ); tokio::spawn( PollerManager { sample_tx, apictx, - poll_interval, + poll_interval: cfg.sp_poll_interval, tasks: tokio::task::JoinSet::new(), log, mgs_id: id, @@ -236,7 +242,7 @@ impl Metrics { .context("failed to register metrics producer")?; tokio::spawn( - ServerManager { log, addrs: addrs_rx, registry, cfg }.run(), + ServerManager { log, addrs: addrs_rx, registry }.run(cfg), ) }; Ok(Self { addrs_tx, rack_id_tx, server, pollers }) @@ -279,34 +285,55 @@ impl Drop for Metrics { } impl MetricsConfig { - fn oximeter_collection_interval(&self) -> Duration { - Duration::from_secs(self.oximeter_collection_interval_secs as u64) - } + fn validate(self) -> anyhow::Result { + anyhow::ensure!( + self.oximeter_collection_interval_secs > 0, + "`metrics.oximeter_collection_interval_secs` probably shouldn't \ + be 0 seconds", + ); + let oximeter_collection_interval = + Duration::from_secs(self.oximeter_collection_interval_secs as u64); - /// Returns the number of sample chunks from individual SPs to buffer. - fn sample_channel_capacity(&self) -> usize { - // Roughly how many times will we poll SPs for each metrics collection - // interval? - let polls_per_metrics_interval = { - let collection_interval_ms: usize = self - .oximeter_collection_interval() - .as_millis() - .try_into() - .expect("your oximeter collection interval is way too big..."); - collection_interval_ms / self.sp_poll_interval_ms + anyhow::ensure!( + self.sp_poll_interval_ms > 0, + "`metrics.sp_poll_interval_ms` probably shouldn't be 0 ms", + ); + let sp_poll_interval = + Duration::from_secs(self.sp_poll_interval_ms as u64); + + let max_buffered_sample_chunks = { + // Roughly how many times will we poll SPs for each metrics collection + // interval? + let polls_per_metrics_interval = { + let collection_interval_ms: usize = oximeter_collection_interval + .as_millis() + .try_into() + .with_context(|| format!( + "configured Oximeter collection interval ({:?}) is way too big...", + oximeter_collection_interval, + ))?; + collection_interval_ms / self.sp_poll_interval_ms + }; + + // How many sample collection intervals do we want to allow to elapse before + // we start putting stuff on the floor? + // + // Let's say 16. Chosen totally arbitrarily but seems reasonable-ish. + let sloppiness = 16; + let capacity = + NORMAL_NUMBER_OF_SPS * polls_per_metrics_interval * sloppiness; + // Finally, the buffer capacity will probably be allocated in a power of two + // anyway, so let's make sure our thing is a power of two so we don't waste + // the allocation we're gonna get anyway. + capacity.next_power_of_two() }; - // How many sample collection intervals do we want to allow to elapse before - // we start putting stuff on the floor? - // - // Let's say 16. Chosen totally arbitrarily but seems reasonable-ish. - let sloppiness = 16; - let capacity = - NORMAL_NUMBER_OF_SPS * polls_per_metrics_interval * sloppiness; - // Finally, the buffer capacity will probably be allocated in a power of two - // anyway, so let's make sure our thing is a power of two so we don't waste - // the allocation we're gonna get anyway. - capacity.next_power_of_two() + Ok(ValidatedMetricsConfig { + oximeter_collection_interval, + sp_poll_interval, + max_buffered_sample_chunks, + dev_config: self.dev, + }) } } @@ -1003,9 +1030,9 @@ fn stringify_byte_string(bytes: &[u8]) -> String { } impl ServerManager { - async fn run(mut self) -> anyhow::Result<()> { + async fn run(mut self, cfg: ValidatedMetricsConfig) -> anyhow::Result<()> { let (registration_address, bind_loopback) = - if let Some(ref dev) = self.cfg.dev { + if let Some(ref dev) = cfg.dev_config { slog::warn!( &self.log, "using development metrics configuration overrides!"; @@ -1016,8 +1043,8 @@ impl ServerManager { } else { (None, false) }; - let interval = self.cfg.oximeter_collection_interval(); let id = self.registry.producer_id(); + let interval = cfg.oximeter_collection_interval; let mut current_server: Option = None; loop {