Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Feb 21, 2024
1 parent 964cd09 commit a9fc8ec
Showing 1 changed file with 94 additions and 9 deletions.
103 changes: 94 additions & 9 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl fmt::Debug for MetricsWrapper {
}
}

/// We are not (yet) aggregating distributions, but keeping every value.
/// To not overwhelm downstream services, we send them in batches instead of all at once.
const DISTRIBUTION_BATCH_SIZE: usize = 20;

/// Creates [`LocalAggregators`] and starts a thread that will periodically
/// send aggregated metrics upstream to the `sink`.
fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators {
Expand All @@ -68,17 +72,23 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L

let thread_fn = move || {
let mut formatted_metric = String::with_capacity(256);
let mut suffix = String::with_capacity(128);
loop {
thread::sleep(Duration::from_secs(5));

let mut total_counters = AggregatedCounters::default();
let mut total_distributions = AggregatedDistributions::default();

for local_counters in aggregators.iter() {
let local_counters = {
let (local_counters, local_distributions) = {
let mut local_aggregator = local_counters.lock().unwrap();
std::mem::take(&mut local_aggregator.aggregated_counters)
(
std::mem::take(&mut local_aggregator.aggregated_counters),
std::mem::take(&mut local_aggregator.aggregated_distributions),
)
};

// aggregate all the "counter like" metrics
if total_counters.is_empty() {
total_counters = local_counters;
} else {
Expand All @@ -88,14 +98,26 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
if ty == "|c" {
*aggregated_value += value;
} else if ty == "|g" {
// FIXME: when aggregating multiple thread-locals, we don’t really
// know which one is the "latest". But it also does not really matter.
// FIXME: when aggregating multiple thread-locals,
// we don’t really know which one is the "latest".
// But it also does not really matter that much?
*aggregated_value = value;
}
}
}

// aggregate all the "distribution like" metrics
if total_distributions.is_empty() {
total_distributions = local_distributions;
} else {
for (key, value) in local_distributions {
let aggregated_value = total_distributions.entry(key).or_default();
aggregated_value.extend(value);
}
}
}

// send all the aggregated "counter like" metrics
for (AggregationKey { ty, name, tags }, value) in total_counters.drain() {
let _ = write!(
&mut formatted_metric,
Expand All @@ -115,6 +137,39 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L

formatted_metric.clear();
}

// send all the aggregated "distribution like" metrics
// we do this in a batched manner, as we do not actually *aggregate* them,
// but still send each value individually.
for (AggregationKey { ty, name, tags }, value) in total_distributions.drain() {
suffix.push_str(&formatted_global_tags);

if let Some(tags) = tags {
if formatted_global_tags.is_empty() {
suffix.push_str("|#");
} else {
suffix.push(',');
}
suffix.push_str(&tags);
}

for batch in value.chunks(DISTRIBUTION_BATCH_SIZE) {
formatted_metric.push_str(&prefix);
formatted_metric.push_str(name);

for value in batch {
let _ = write!(&mut formatted_metric, ":{value}");
}

formatted_metric.push_str(ty);
formatted_metric.push_str(&suffix);

let _ = sink.emit(&formatted_metric);
formatted_metric.clear();
}

suffix.clear();
}
}
};

Expand Down Expand Up @@ -157,7 +212,7 @@ struct AggregationKey {
}

type AggregatedCounters = FxHashMap<AggregationKey, i64>;
type AggregatedHistograms = FxHashMap<AggregationKey, Vec<f64>>;
type AggregatedDistributions = FxHashMap<AggregationKey, Vec<f64>>;

#[derive(Default)]
pub struct LocalAggregator {
Expand All @@ -166,8 +221,7 @@ pub struct LocalAggregator {
/// A map of all the `counter` and `gauge` metrics we have aggregated thus far.
aggregated_counters: AggregatedCounters,
/// A map of all the `timer` and `histogram` metrics we have aggregated thus far.
#[allow(unused)] // TODO
aggregated_histograms: AggregatedHistograms,
aggregated_distributions: AggregatedDistributions,
}

impl LocalAggregator {
Expand Down Expand Up @@ -220,6 +274,37 @@ impl LocalAggregator {
let aggregation = self.aggregated_counters.entry(key).or_default();
*aggregation = value as i64;
}

/// Emit a `timer` metric, for which every value is accumulated
pub fn emit_timer(&mut self, name: &'static str, value: f64, tags: &[(&'static str, &str)]) {
let tags = self.format_tags(tags);
self.emit_distribution_inner("|ms", name, value, tags)
}

/// Emit a `histogram` metric, for which every value is accumulated
pub fn emit_histogram(
&mut self,
name: &'static str,
value: f64,
tags: &[(&'static str, &str)],
) {
let tags = self.format_tags(tags);
self.emit_distribution_inner("|h", name, value, tags)
}

/// Emit a distribution metric, which is aggregated by appending to a list of values.
fn emit_distribution_inner(
&mut self,
ty: &'static str,
name: &'static str,
value: f64,
tags: Option<Box<str>>,
) {
let key = AggregationKey { ty, name, tags };

let aggregation = self.aggregated_distributions.entry(key).or_default();
aggregation.push(value);
}
}

/// Tell the metrics system to report to statsd.
Expand Down Expand Up @@ -280,7 +365,7 @@ macro_rules! metric {
// counters
(counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
local.emit_count($id, $value, tags);
Expand All @@ -290,7 +375,7 @@ macro_rules! metric {
// gauges
(gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
local.emit_gauge($id, $value, tags);
Expand Down

0 comments on commit a9fc8ec

Please sign in to comment.