Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Feb 26, 2024
1 parent 7856b21 commit bcb4aea
Showing 1 changed file with 67 additions and 58 deletions.
125 changes: 67 additions & 58 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ pub struct MetricsWrapper {
local_aggregator: LocalAggregators,
}

impl MetricsWrapper {
/// Invokes the provided callback with a mutable reference to a thread-local [`LocalAggregator`].
fn with_local_aggregator(&self, f: impl FnOnce(&mut LocalAggregator)) {
let mut local_aggregator = self
.local_aggregator
.get_or(Default::default)
.lock()
.unwrap();
f(&mut local_aggregator)
}
}

impl Deref for MetricsWrapper {
type Target = StatsdClient;

fn deref(&self) -> &Self::Target {
&self.statsd_client
}
}

impl fmt::Debug for MetricsWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MetricsWrapper")
Expand All @@ -61,7 +81,7 @@ const DISTRIBUTION_BATCH_SIZE: usize = 20;
/// Creates [`LocalAggregators`] and starts a thread that will periodically
/// send aggregated metrics upstream to the `sink`.
fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators {
let local_aggregators: LocalAggregators = Default::default();
let local_aggregators = LocalAggregators::default();

let aggregators = Arc::clone(&local_aggregators);
let prefix = if prefix.is_empty() {
Expand All @@ -71,54 +91,18 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
};

let thread_fn = move || {
// to avoid reallocation, just reserve some space.
// the size is rather arbitrary, but should be large enough for formatted metrics.
let mut formatted_metric = String::with_capacity(256);
let mut suffix = String::with_capacity(128);

loop {
thread::sleep(Duration::from_secs(5));

let mut total_counters = AggregatedCounters::default();
let mut total_distributions = AggregatedDistributions::default();

for local_aggregator in aggregators.iter() {
let (local_counters, local_distributions) = {
let mut local_aggregator = local_aggregator.lock().unwrap();
(
std::mem::take(&mut local_aggregator.aggregated_counters),
std::mem::take(&mut local_aggregator.aggregated_distributions),
)
};

// aggregate all the "counter like" metrics
if total_counters.is_empty() {
total_counters = local_counters;
} else {
for (key, value) in local_counters {
let ty = key.ty;
let aggregated_value = total_counters.entry(key).or_default();
if ty == "|c" {
*aggregated_value += value;
} else if ty == "|g" {
// FIXME: when aggregating multiple thread-locals,
// we don’t really know which one is the "latest".
// But it also does not really matter that much?
*aggregated_value = value;
}
}
}

// aggregate all the "distribution like" metrics
if total_distributions.is_empty() {
total_distributions = local_distributions;
} else {
for (key, value) in local_distributions {
let aggregated_value = total_distributions.entry(key).or_default();
aggregated_value.extend(value);
}
}
}
let (total_counters, total_distributions) = aggregate_all(&aggregators);

// send all the aggregated "counter like" metrics
for (AggregationKey { ty, name, tags }, value) in total_counters.drain() {
for (AggregationKey { ty, name, tags }, value) in total_counters {
let _ = write!(
&mut formatted_metric,
"{prefix}{name}:{value}{ty}{formatted_global_tags}"
Expand All @@ -141,7 +125,7 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
// send all the aggregated "distribution like" metrics
// we do this in a batched manner, as we do not actually *aggregate* them,
// but still send each value individually.
for (AggregationKey { ty, name, tags }, value) in total_distributions.drain() {
for (AggregationKey { ty, name, tags }, value) in total_distributions {
suffix.push_str(&formatted_global_tags);

if let Some(tags) = tags {
Expand Down Expand Up @@ -181,24 +165,49 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
local_aggregators
}

impl MetricsWrapper {
/// Invokes the provided callback with a mutable reference to a thread-local [`LocalAggregator`].
fn with_local_aggregator(&self, f: impl FnOnce(&mut LocalAggregator)) {
let mut local_aggregator = self
.local_aggregator
.get_or(Default::default)
.lock()
.unwrap();
f(&mut local_aggregator)
}
}
fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, AggregatedDistributions) {
let mut total_counters = AggregatedCounters::default();
let mut total_distributions = AggregatedDistributions::default();

impl Deref for MetricsWrapper {
type Target = StatsdClient;
for local_aggregator in aggregators.iter() {
let (local_counters, local_distributions) = {
let mut local_aggregator = local_aggregator.lock().unwrap();
(
std::mem::take(&mut local_aggregator.aggregated_counters),
std::mem::take(&mut local_aggregator.aggregated_distributions),
)
};

fn deref(&self) -> &Self::Target {
&self.statsd_client
// aggregate all the "counter like" metrics
if total_counters.is_empty() {
total_counters = local_counters;
} else {
for (key, value) in local_counters {
let ty = key.ty;
let aggregated_value = total_counters.entry(key).or_default();
if ty == "|c" {
*aggregated_value += value;
} else if ty == "|g" {
// FIXME: when aggregating multiple thread-locals,
// we don’t really know which one is the "latest".
// But it also does not really matter that much?
*aggregated_value = value;
}
}
}

// aggregate all the "distribution like" metrics
if total_distributions.is_empty() {
total_distributions = local_distributions;
} else {
for (key, value) in local_distributions {
let aggregated_value = total_distributions.entry(key).or_default();
aggregated_value.extend(value);
}
}
}

(total_counters, total_distributions)
}

/// The key by which we group/aggregate metrics.
Expand Down

0 comments on commit bcb4aea

Please sign in to comment.