From dce41b7e3792f86624ccd479c8018355d750bda3 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 21 Feb 2024 10:18:31 +0100 Subject: [PATCH] Aggregate timers and histograms --- .../symbolicator-service/src/download/mod.rs | 8 +- crates/symbolicator-service/src/metrics.rs | 175 +++++++++++++++--- 2 files changed, 149 insertions(+), 34 deletions(-) diff --git a/crates/symbolicator-service/src/download/mod.rs b/crates/symbolicator-service/src/download/mod.rs index 7d1059f80..306da99af 100644 --- a/crates/symbolicator-service/src/download/mod.rs +++ b/crates/symbolicator-service/src/download/mod.rs @@ -615,9 +615,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> { }; let duration = self.creation_time.elapsed(); - let metric_name = format!("{}.duration", self.task_name); metric!( - timer(&metric_name) = duration, + timer("download_duration") = duration, + "task_name" => self.task_name, "status" => status, "source" => self.source_name, ); @@ -631,9 +631,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> { .checked_div(duration.as_millis()) .and_then(|t| t.try_into().ok()) .unwrap_or(bytes_transferred); - let throughput_name = format!("{}.throughput", self.task_name); metric!( - histogram(&throughput_name) = throughput, + histogram("download_throughput") = throughput, + "task_name" => self.task_name, "status" => status, "source" => self.source_name, ); diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index f09e2d1bb..ec5e5f3c3 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -54,6 +54,10 @@ impl fmt::Debug for MetricsWrapper { } } +/// We are not (yet) aggregating distributions, but keeping every value. +/// To not overwhelm downstream services, we send them in batches instead of all at once. +const DISTRIBUTION_BATCH_SIZE: usize = 20; + /// Creates [`LocalAggregators`] and starts a thread that will periodically /// send aggregated metrics upstream to the `sink`. fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators { @@ -68,17 +72,23 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L let thread_fn = move || { let mut formatted_metric = String::with_capacity(256); + let mut suffix = String::with_capacity(128); loop { thread::sleep(Duration::from_secs(5)); let mut total_counters = AggregatedCounters::default(); - - for local_counters in aggregators.iter() { - let local_counters = { - let mut local_aggregator = local_counters.lock().unwrap(); - std::mem::take(&mut local_aggregator.aggregated_counters) + let mut total_distributions = AggregatedDistributions::default(); + + for local_aggregator in aggregators.iter() { + let (local_counters, local_distributions) = { + let mut local_aggregator = local_aggregator.lock().unwrap(); + ( + std::mem::take(&mut local_aggregator.aggregated_counters), + std::mem::take(&mut local_aggregator.aggregated_distributions), + ) }; + // aggregate all the "counter like" metrics if total_counters.is_empty() { total_counters = local_counters; } else { @@ -88,14 +98,26 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L if ty == "|c" { *aggregated_value += value; } else if ty == "|g" { - // FIXME: when aggregating multiple thread-locals, we don’t really - // know which one is the "latest". But it also does not really matter. + // FIXME: when aggregating multiple thread-locals, + // we don’t really know which one is the "latest". + // But it also does not really matter that much? *aggregated_value = value; } } } + + // aggregate all the "distribution like" metrics + if total_distributions.is_empty() { + total_distributions = local_distributions; + } else { + for (key, value) in local_distributions { + let aggregated_value = total_distributions.entry(key).or_default(); + aggregated_value.extend(value); + } + } } + // send all the aggregated "counter like" metrics for (AggregationKey { ty, name, tags }, value) in total_counters.drain() { let _ = write!( &mut formatted_metric, @@ -115,6 +137,39 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L formatted_metric.clear(); } + + // send all the aggregated "distribution like" metrics + // we do this in a batched manner, as we do not actually *aggregate* them, + // but still send each value individually. + for (AggregationKey { ty, name, tags }, value) in total_distributions.drain() { + suffix.push_str(&formatted_global_tags); + + if let Some(tags) = tags { + if formatted_global_tags.is_empty() { + suffix.push_str("|#"); + } else { + suffix.push(','); + } + suffix.push_str(&tags); + } + + for batch in value.chunks(DISTRIBUTION_BATCH_SIZE) { + formatted_metric.push_str(&prefix); + formatted_metric.push_str(name); + + for value in batch { + let _ = write!(&mut formatted_metric, ":{value}"); + } + + formatted_metric.push_str(ty); + formatted_metric.push_str(&suffix); + + let _ = sink.emit(&formatted_metric); + formatted_metric.clear(); + } + + suffix.clear(); + } } }; @@ -146,6 +201,7 @@ impl Deref for MetricsWrapper { } } +/// The key by which we group/aggregate metrics. #[derive(Eq, Ord, PartialEq, PartialOrd, Hash)] struct AggregationKey { /// The metric type, pre-formatted as a statsd suffix such as `|c`. @@ -157,8 +213,37 @@ struct AggregationKey { } type AggregatedCounters = FxHashMap; -type AggregatedHistograms = FxHashMap>; +type AggregatedDistributions = FxHashMap>; + +pub trait IntoDistributionValue { + fn into_value(self) -> f64; +} + +impl IntoDistributionValue for Duration { + fn into_value(self) -> f64 { + self.as_secs_f64() / 1_000. + } +} +impl IntoDistributionValue for usize { + fn into_value(self) -> f64 { + self as f64 + } +} + +impl IntoDistributionValue for u64 { + fn into_value(self) -> f64 { + self as f64 + } +} + +impl IntoDistributionValue for i32 { + fn into_value(self) -> f64 { + self as f64 + } +} + +/// The `thread_local` aggregator which pre-aggregates metrics per-thread. #[derive(Default)] pub struct LocalAggregator { /// A mutable scratch-buffer that is reused to format tags into it. @@ -166,8 +251,7 @@ pub struct LocalAggregator { /// A map of all the `counter` and `gauge` metrics we have aggregated thus far. aggregated_counters: AggregatedCounters, /// A map of all the `timer` and `histogram` metrics we have aggregated thus far. - #[allow(unused)] // TODO - aggregated_histograms: AggregatedHistograms, + aggregated_distributions: AggregatedDistributions, } impl LocalAggregator { @@ -220,6 +304,37 @@ impl LocalAggregator { let aggregation = self.aggregated_counters.entry(key).or_default(); *aggregation = value as i64; } + + /// Emit a `timer` metric, for which every value is accumulated + pub fn emit_timer(&mut self, name: &'static str, value: f64, tags: &[(&'static str, &str)]) { + let tags = self.format_tags(tags); + self.emit_distribution_inner("|ms", name, value, tags) + } + + /// Emit a `histogram` metric, for which every value is accumulated + pub fn emit_histogram( + &mut self, + name: &'static str, + value: f64, + tags: &[(&'static str, &str)], + ) { + let tags = self.format_tags(tags); + self.emit_distribution_inner("|h", name, value, tags) + } + + /// Emit a distribution metric, which is aggregated by appending to a list of values. + fn emit_distribution_inner( + &mut self, + ty: &'static str, + name: &'static str, + value: f64, + tags: Option>, + ) { + let key = AggregationKey { ty, name, tags }; + + let aggregation = self.aggregated_distributions.entry(key).or_default(); + aggregation.push(value); + } } /// Tell the metrics system to report to statsd. @@ -280,7 +395,7 @@ macro_rules! metric { // counters (counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ $crate::metrics::with_client(|_client, local| { - let tags: &[(&str, &str)] = &[ + let tags: &[(&'static str, &str)] = &[ $(($k, $v)),* ]; local.emit_count($id, $value, tags); @@ -290,7 +405,7 @@ macro_rules! metric { // gauges (gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ $crate::metrics::with_client(|_client, local| { - let tags: &[(&str, &str)] = &[ + let tags: &[(&'static str, &str)] = &[ $(($k, $v)),* ]; local.emit_gauge($id, $value, tags); @@ -299,34 +414,34 @@ macro_rules! metric { // timers (timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client, _local| { - use $crate::metrics::prelude::*; - client - .time_with_tags($id, $value) - $(.with_tag($k, $v))* - .send(); + $crate::metrics::with_client(|_client, local| { + let tags: &[(&'static str, &str)] = &[ + $(($k, $v)),* + ]; + use $crate::metrics::IntoDistributionValue; + local.emit_timer($id, ($value).into_value(), tags); }); }}; // we use statsd timers to send things such as filesizes as well. (time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client, _local| { - use $crate::metrics::prelude::*; - client - .time_with_tags($id, $value) - $(.with_tag($k, $v))* - .send(); + $crate::metrics::with_client(|_client, local| { + let tags: &[(&'static str, &str)] = &[ + $(($k, $v)),* + ]; + use $crate::metrics::IntoDistributionValue; + local.emit_timer($id, ($value).into_value(), tags); }); }}; // histograms (histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client, _local| { - use $crate::metrics::prelude::*; - client - .histogram_with_tags($id, $value) - $(.with_tag($k, $v))* - .send(); + $crate::metrics::with_client(|_client, local| { + let tags: &[(&'static str, &str)] = &[ + $(($k, $v)),* + ]; + use $crate::metrics::IntoDistributionValue; + local.emit_histogram($id, ($value).into_value(), tags); }); }}; }