Skip to content

Commit

Permalink
add a bit more config validation
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Aug 21, 2024
1 parent 6f58866 commit 3a0e0bb
Showing 1 changed file with 64 additions and 37 deletions.
101 changes: 64 additions & 37 deletions gateway/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ pub struct DevConfig {
pub bind_loopback: bool,
}

struct ValidatedMetricsConfig {
sp_poll_interval: Duration,
oximeter_collection_interval: Duration,
/// Capacity for the channel of samples from poller tasks to the Oximeter
/// producer.
max_buffered_sample_chunks: usize,
dev_config: Option<DevConfig>,
}

/// Manages SP pollers, making sure that every SP has a poller task.
struct PollerManager {
log: slog::Logger,
Expand Down Expand Up @@ -128,7 +137,6 @@ struct ServerManager {
log: slog::Logger,
addrs: watch::Receiver<Vec<SocketAddrV6>>,
registry: ProducerRegistry,
cfg: MetricsConfig,
}

#[derive(Debug)]
Expand Down Expand Up @@ -171,6 +179,7 @@ impl Metrics {
apictx: Arc<ServerContext>,
) -> anyhow::Result<Self> {
let &MgsArguments { id, rack_id, ref addresses } = args;
let cfg = cfg.validate()?;

// Create a channel for the SP poller tasks to send samples to the
// Oximeter producer endpoint.
Expand All @@ -183,9 +192,8 @@ impl Metrics {
// is what we want, as we would prefer a full buffer to result in
// clobbering the oldest measurements, rather than leaving the newest
// ones on the floor.
let max_buffered_sample_chunks = cfg.sample_channel_capacity();
let (sample_tx, sample_rx) =
broadcast::channel(max_buffered_sample_chunks);
broadcast::channel(cfg.max_buffered_sample_chunks);

// Using a channel for this is, admittedly, a bit of an end-run around
// the `OnceLock` on the `ServerContext` that *also* stores the rack ID,
Expand All @@ -204,20 +212,18 @@ impl Metrics {
};
let pollers = {
let log = log.new(slog::o!("component" => "sensor-poller"));
let poll_interval =
Duration::from_millis(cfg.sp_poll_interval_ms as u64);
slog::info!(
&log,
"SP sensor metrics configured";
"poll_interval" => ?poll_interval,
"max_buffered_sample_chunks" => max_buffered_sample_chunks,
"poll_interval" => ?cfg.sp_poll_interval,
"max_buffered_sample_chunks" => cfg.max_buffered_sample_chunks,
);

tokio::spawn(
PollerManager {
sample_tx,
apictx,
poll_interval,
poll_interval: cfg.sp_poll_interval,
tasks: tokio::task::JoinSet::new(),
log,
mgs_id: id,
Expand All @@ -236,7 +242,7 @@ impl Metrics {
.context("failed to register metrics producer")?;

tokio::spawn(
ServerManager { log, addrs: addrs_rx, registry, cfg }.run(),
ServerManager { log, addrs: addrs_rx, registry }.run(cfg),
)
};
Ok(Self { addrs_tx, rack_id_tx, server, pollers })
Expand Down Expand Up @@ -279,34 +285,55 @@ impl Drop for Metrics {
}

impl MetricsConfig {
fn oximeter_collection_interval(&self) -> Duration {
Duration::from_secs(self.oximeter_collection_interval_secs as u64)
}
fn validate(self) -> anyhow::Result<ValidatedMetricsConfig> {
anyhow::ensure!(
self.oximeter_collection_interval_secs > 0,
"`metrics.oximeter_collection_interval_secs` probably shouldn't \
be 0 seconds",
);
let oximeter_collection_interval =
Duration::from_secs(self.oximeter_collection_interval_secs as u64);

/// Returns the number of sample chunks from individual SPs to buffer.
fn sample_channel_capacity(&self) -> usize {
// Roughly how many times will we poll SPs for each metrics collection
// interval?
let polls_per_metrics_interval = {
let collection_interval_ms: usize = self
.oximeter_collection_interval()
.as_millis()
.try_into()
.expect("your oximeter collection interval is way too big...");
collection_interval_ms / self.sp_poll_interval_ms
anyhow::ensure!(
self.sp_poll_interval_ms > 0,
"`metrics.sp_poll_interval_ms` probably shouldn't be 0 ms",
);
let sp_poll_interval =
Duration::from_secs(self.sp_poll_interval_ms as u64);

let max_buffered_sample_chunks = {
// Roughly how many times will we poll SPs for each metrics collection
// interval?
let polls_per_metrics_interval = {
let collection_interval_ms: usize = oximeter_collection_interval
.as_millis()
.try_into()
.with_context(|| format!(
"configured Oximeter collection interval ({:?}) is way too big...",
oximeter_collection_interval,
))?;
collection_interval_ms / self.sp_poll_interval_ms
};

// How many sample collection intervals do we want to allow to elapse before
// we start putting stuff on the floor?
//
// Let's say 16. Chosen totally arbitrarily but seems reasonable-ish.
let sloppiness = 16;
let capacity =
NORMAL_NUMBER_OF_SPS * polls_per_metrics_interval * sloppiness;
// Finally, the buffer capacity will probably be allocated in a power of two
// anyway, so let's make sure our thing is a power of two so we don't waste
// the allocation we're gonna get anyway.
capacity.next_power_of_two()
};

// How many sample collection intervals do we want to allow to elapse before
// we start putting stuff on the floor?
//
// Let's say 16. Chosen totally arbitrarily but seems reasonable-ish.
let sloppiness = 16;
let capacity =
NORMAL_NUMBER_OF_SPS * polls_per_metrics_interval * sloppiness;
// Finally, the buffer capacity will probably be allocated in a power of two
// anyway, so let's make sure our thing is a power of two so we don't waste
// the allocation we're gonna get anyway.
capacity.next_power_of_two()
Ok(ValidatedMetricsConfig {
oximeter_collection_interval,
sp_poll_interval,
max_buffered_sample_chunks,
dev_config: self.dev,
})
}
}

Expand Down Expand Up @@ -1003,9 +1030,9 @@ fn stringify_byte_string(bytes: &[u8]) -> String {
}

impl ServerManager {
async fn run(mut self) -> anyhow::Result<()> {
async fn run(mut self, cfg: ValidatedMetricsConfig) -> anyhow::Result<()> {
let (registration_address, bind_loopback) =
if let Some(ref dev) = self.cfg.dev {
if let Some(ref dev) = cfg.dev_config {
slog::warn!(
&self.log,
"using development metrics configuration overrides!";
Expand All @@ -1016,8 +1043,8 @@ impl ServerManager {
} else {
(None, false)
};
let interval = self.cfg.oximeter_collection_interval();
let id = self.registry.producer_id();
let interval = cfg.oximeter_collection_interval;

let mut current_server: Option<oximeter_producer::Server> = None;
loop {
Expand Down

0 comments on commit 3a0e0bb

Please sign in to comment.