From 396f1838f55811afdce7964285bbae84c4c6a946 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Mon, 19 Feb 2024 15:04:54 +0100 Subject: [PATCH] Add a `ThreadLocal` metrics aggregator Thus far, this only works for `count` metrics, but we can extend it in the future as well. --- Cargo.lock | 3 + crates/symbolicator-js/src/metrics.rs | 125 +++++++----- crates/symbolicator-service/Cargo.toml | 3 + crates/symbolicator-service/src/metrics.rs | 219 ++++++++++++++++++--- 4 files changed, 274 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02d07091c..dcdc0f491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4492,6 +4492,7 @@ dependencies = [ "aws-types", "cadence", "chrono", + "crossbeam-utils", "filetime", "flate2", "futures", @@ -4505,6 +4506,7 @@ dependencies = [ "once_cell", "rand", "reqwest", + "rustc-hash", "sentry", "serde", "serde_json", @@ -4516,6 +4518,7 @@ dependencies = [ "symbolicator-test", "tempfile", "thiserror", + "thread_local", "tokio", "tokio-util", "tracing", diff --git a/crates/symbolicator-js/src/metrics.rs b/crates/symbolicator-js/src/metrics.rs index 11f8da3dd..85c011504 100644 --- a/crates/symbolicator-js/src/metrics.rs +++ b/crates/symbolicator-js/src/metrics.rs @@ -27,7 +27,7 @@ //! Should be `0`, as we should find/use files from within bundles or as individual artifacts. use symbolic::debuginfo::sourcebundle::SourceFileType; -use symbolicator_service::metric; +use symbolicator_service::{metric, metrics}; use crate::interface::ResolvedWith; @@ -117,6 +117,18 @@ impl JsMetrics { } pub fn submit_metrics(&self, artifact_bundles: u64) { + metrics::with_client(|client| { + client.with_local_aggregator(|aggregator| { + self.submit_metrics_inner(aggregator, artifact_bundles); + }); + }) + } + + fn submit_metrics_inner( + &self, + aggregator: &mut metrics::LocalAggregator, + artifact_bundles: u64, + ) { metric!(time_raw("js.needed_files") = self.needed_files); metric!(time_raw("js.api_requests") = self.api_requests); metric!(time_raw("js.queried_bundles") = self.queried_bundles); @@ -129,74 +141,85 @@ impl JsMetrics { // have a per-event avg, etc. // Sources: - metric!( - counter("js.found_via_bundle_debugid") += self.found_source_via_debugid, - "type" => "source", + aggregator.emit_count( + "js.found_via_bundle_debugid", + self.found_source_via_debugid, + &[("type", "source")], ); - metric!( - counter("js.found_via_bundle_url") += self.found_source_via_release, - "type" => "source", - "lookup" => "release", + aggregator.emit_count( + "js.found_via_bundle_url", + self.found_source_via_release, + &[("type", "source"), ("lookup", "release")], ); - metric!( - counter("js.found_via_bundle_url") += self.found_source_via_release_old, - "type" => "source", - "lookup" => "release-old", + aggregator.emit_count( + "js.found_via_bundle_url", + self.found_source_via_release_old, + &[("type", "source"), ("lookup", "release-old")], ); - metric!( - counter("js.found_via_scraping") += self.found_source_via_scraping, - "type" => "source", + aggregator.emit_count( + "js.found_via_scraping", + self.found_source_via_scraping, + &[("type", "source")], ); - metric!( - counter("js.file_not_found") += self.source_not_found, - "type" => "source", + aggregator.emit_count( + "js.file_not_found", + self.source_not_found, + &[("type", "source")], ); // SourceMaps: - metric!( - counter("js.found_via_bundle_debugid") += self.found_sourcemap_via_debugid, - "type" => "sourcemap", + aggregator.emit_count( + "js.found_via_bundle_debugid", + self.found_sourcemap_via_debugid, + &[("type", "sourcemap")], ); - metric!( - counter("js.found_via_bundle_url") += self.found_sourcemap_via_release, - "type" => "sourcemap", - "lookup" => "release", + aggregator.emit_count( + "js.found_via_bundle_url", + self.found_sourcemap_via_release, + &[("type", "sourcemap"), ("lookup", "release")], ); - metric!( - counter("js.found_via_bundle_url") += self.found_sourcemap_via_release_old, - "type" => "sourcemap", - "lookup" => "release-old", + aggregator.emit_count( + "js.found_via_bundle_url", + self.found_sourcemap_via_release_old, + &[("type", "sourcemap"), ("lookup", "release-old")], ); - metric!( - counter("js.found_via_scraping") += self.found_sourcemap_via_scraping, - "type" => "sourcemap", + aggregator.emit_count( + "js.found_via_scraping", + self.found_sourcemap_via_scraping, + &[("type", "sourcemap")], ); - metric!( - counter("js.file_not_found") += self.sourcemap_not_found, - "type" => "sourcemap", + aggregator.emit_count( + "js.file_not_found", + self.sourcemap_not_found, + &[("type", "sourcemap")], ); - metric!(counter("js.sourcemap_not_needed") += self.sourcemap_not_needed); + aggregator.emit_count("js.sourcemap_not_needed", self.sourcemap_not_needed, &[]); // Lookup Method: - metric!( - counter("js.bundle_lookup") += self.found_bundle_via_bundleindex, - "method" => "bundleindex" + aggregator.emit_count( + "js.bundle_lookup", + self.found_bundle_via_bundleindex, + &[("method", "bundleindex")], ); - metric!( - counter("js.bundle_lookup") += self.found_bundle_via_debugid, - "method" => "debugid" + aggregator.emit_count( + "js.bundle_lookup", + self.found_bundle_via_debugid, + &[("method", "debugid")], ); - metric!( - counter("js.bundle_lookup") += self.found_bundle_via_index, - "method" => "index" + aggregator.emit_count( + "js.bundle_lookup", + self.found_bundle_via_index, + &[("method", "index")], ); - metric!( - counter("js.bundle_lookup") += self.found_bundle_via_release, - "method" => "release" + aggregator.emit_count( + "js.bundle_lookup", + self.found_bundle_via_release, + &[("method", "release")], ); - metric!( - counter("js.bundle_lookup") += self.found_bundle_via_release_old, - "method" => "release-old" + aggregator.emit_count( + "js.bundle_lookup", + self.found_bundle_via_release_old, + &[("method", "release-old")], ); } } diff --git a/crates/symbolicator-service/Cargo.toml b/crates/symbolicator-service/Cargo.toml index d2b24c759..16513bb96 100644 --- a/crates/symbolicator-service/Cargo.toml +++ b/crates/symbolicator-service/Cargo.toml @@ -16,6 +16,7 @@ aws-sdk-s3 = "1.4.0" aws-types = "1.0.1" cadence = "1.0.0" chrono = { version = "0.4.19", features = ["serde"] } +crossbeam-utils = "0.8.19" filetime = "0.2.16" flate2 = "1.0.23" futures = "0.3.12" @@ -29,6 +30,7 @@ moka = { version = "0.12.1", features = ["future", "sync"] } once_cell = "1.17.1" rand = "0.8.5" reqwest = { version = "0.11.0", features = ["gzip", "brotli", "deflate", "json", "stream", "trust-dns"] } +rustc-hash = "1.1.0" sentry = { version = "0.32.1", features = ["tracing"] } serde = { version = "1.0.137", features = ["derive", "rc"] } serde_json = "1.0.81" @@ -38,6 +40,7 @@ symbolic = { version = "12.7.1", features = ["cfi", "common-serde", "debuginfo", symbolicator-sources = { path = "../symbolicator-sources" } tempfile = "3.2.0" thiserror = "1.0.31" +thread_local = "1.1.7" tokio = { version = "1.24.2", features = ["rt", "macros", "fs"] } tokio-util = { version = "0.7.1", features = ["io"] } tracing = "0.1.34" diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index 70e651db5..66e92e173 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -1,17 +1,176 @@ //! Provides access to the metrics sytem. use std::collections::BTreeMap; +use std::fmt::Write; use std::net::ToSocketAddrs; -use std::sync::OnceLock; +use std::ops::Deref; +use std::sync::{Arc, Mutex, OnceLock}; +use std::time::Duration; +use std::{fmt, thread}; -use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; +use cadence::{BufferedUdpMetricSink, MetricSink, QueuingMetricSink, StatsdClient}; +use crossbeam_utils::CachePadded; +use rustc_hash::FxHashMap; +use thread_local::ThreadLocal; -static METRICS_CLIENT: OnceLock = OnceLock::new(); +static METRICS_CLIENT: OnceLock = OnceLock::new(); /// The metrics prelude that is necessary to use the client. pub mod prelude { pub use cadence::prelude::*; } +type LocalAggregators = Arc>>>; + +#[derive(Debug, Clone)] +struct Sink(Arc); + +impl MetricSink for Sink { + fn emit(&self, metric: &str) -> std::io::Result { + self.0.emit(metric) + } + fn flush(&self) -> std::io::Result<()> { + self.0.flush() + } +} + +pub struct MetricsWrapper { + /// The raw `cadence` client. + statsd_client: StatsdClient, + + /// A thread local aggregator for `count` metrics + local_aggregator: LocalAggregators, +} + +impl fmt::Debug for MetricsWrapper { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MetricsWrapper") + .field("statsd_client", &self.statsd_client) + .field( + "local_aggregator", + &format_args!("LocalAggregator {{ .. }}"), + ) + .finish() + } +} + +fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators { + let local_aggregator: LocalAggregators = Default::default(); + + let aggregators = Arc::clone(&local_aggregator); + let prefix = if prefix.is_empty() { + String::new() + } else { + format!("{}.", prefix.trim_end_matches('.')) + }; + + let thread_fn = move || { + let mut formatted_metric = String::with_capacity(256); + loop { + thread::sleep(Duration::from_secs(5)); + + let mut total_aggregations = Aggregations::default(); + + for local_aggregator in aggregators.iter() { + let local_aggregations = { + let mut local_aggregator = local_aggregator.lock().unwrap(); + std::mem::take(&mut local_aggregator.aggregations) + }; + + if total_aggregations.is_empty() { + total_aggregations = local_aggregations; + } else { + for (key, value) in local_aggregations { + let aggregated_value = total_aggregations.entry(key).or_default(); + *aggregated_value += value + } + } + } + + for (AggregationKey { name, tags }, value) in total_aggregations.drain() { + let _ = write!( + &mut formatted_metric, + "{}{name}:{value}|c{}", + prefix, formatted_global_tags + ); + + if !tags.is_empty() { + if formatted_global_tags.is_empty() { + formatted_metric.push_str("|#"); + } else { + formatted_metric.push(','); + } + formatted_metric.push_str(&tags); + } + + let _ = sink.emit(&formatted_metric); + + formatted_metric.clear(); + } + } + }; + + thread::Builder::new() + .name("metrics-aggregator".into()) + .spawn(thread_fn) + .unwrap(); + + local_aggregator +} + +impl MetricsWrapper { + pub fn with_local_aggregator(&self, f: impl FnOnce(&mut LocalAggregator)) { + let mut local_aggregator = self + .local_aggregator + .get_or(Default::default) + .lock() + .unwrap(); + f(&mut local_aggregator) + } +} + +impl Deref for MetricsWrapper { + type Target = StatsdClient; + + fn deref(&self) -> &Self::Target { + &self.statsd_client + } +} + +#[derive(Eq, Ord, PartialEq, PartialOrd, Hash)] +struct AggregationKey { + name: Box, + tags: Box, +} + +type Aggregations = FxHashMap; + +#[derive(Default)] +pub struct LocalAggregator { + buf: String, + aggregations: Aggregations, +} + +impl LocalAggregator { + pub fn emit_count(&mut self, name: &str, value: i64, tags: &[(&str, &str)]) { + self.buf.reserve(256); + for (key, value) in tags { + if !self.buf.is_empty() { + self.buf.push(','); + } + let _ = write!(&mut self.buf, "{key}:{value}"); + } + + let key = AggregationKey { + name: name.into(), + tags: self.buf.as_str().into(), + }; + self.buf.clear(); + + let aggregation = self.aggregations.entry(key).or_default(); + *aggregation += value; + } +} + /// Tell the metrics system to report to statsd. pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap) { let addrs: Vec<_> = host.to_socket_addrs().unwrap().collect(); @@ -22,13 +181,31 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< socket.set_nonblocking(true).unwrap(); let udp_sink = BufferedUdpMetricSink::from(&addrs[..], socket).unwrap(); let queuing_sink = QueuingMetricSink::from(udp_sink); - let mut builder = StatsdClient::builder(prefix, queuing_sink); + let sink = Sink(Arc::new(queuing_sink)); + + let mut builder = StatsdClient::builder(prefix, sink.clone()); + + let mut formatted_global_tags = String::new(); for (key, value) in tags { + if formatted_global_tags.is_empty() { + formatted_global_tags.push_str("|#"); + } else { + formatted_global_tags.push(','); + } + let _ = write!(&mut formatted_global_tags, "{key}:{value}"); + builder = builder.with_tag(key, value) } - let client = builder.build(); + let statsd_client = builder.build(); + + let local_aggregator = make_aggregator(prefix, formatted_global_tags, sink); - METRICS_CLIENT.set(client).unwrap(); + let wrapper = MetricsWrapper { + statsd_client, + local_aggregator, + }; + + METRICS_CLIENT.set(wrapper).unwrap(); } /// Invoke a callback with the current [`StatsdClient`]. @@ -38,7 +215,7 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< #[inline(always)] pub fn with_client(f: F) where - F: FnOnce(&StatsdClient), + F: FnOnce(&MetricsWrapper), { if let Some(client) = METRICS_CLIENT.get() { f(client) @@ -50,28 +227,20 @@ where macro_rules! metric { // counters (counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; - $crate::metrics::with_client(|client| { - client - .count_with_tags($id, $value) - $(.with_tag($k, $v))* - .send(); - }); - }}; - (counter($id:expr) -= $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; $crate::metrics::with_client(|client| { - client - .count_with_tags($id, -$value) - $(.with_tag($k, $v))* - .send(); + client.with_local_aggregator(|local| { + let tags: &[(&str, &str)] = &[ + $(($k, $v)),* + ]; + local.emit_count($id, $value, tags); + }); }); }}; // gauges (gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; $crate::metrics::with_client(|client| { + use $crate::metrics::prelude::*; client .gauge_with_tags($id, $value) $(.with_tag($k, $v))* @@ -81,8 +250,8 @@ macro_rules! metric { // timers (timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; $crate::metrics::with_client(|client| { + use $crate::metrics::prelude::*; client .time_with_tags($id, $value) $(.with_tag($k, $v))* @@ -92,8 +261,8 @@ macro_rules! metric { // we use statsd timers to send things such as filesizes as well. (time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; $crate::metrics::with_client(|client| { + use $crate::metrics::prelude::*; client .time_with_tags($id, $value) $(.with_tag($k, $v))* @@ -103,8 +272,8 @@ macro_rules! metric { // histograms (histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - use $crate::metrics::prelude::*; $crate::metrics::with_client(|client| { + use $crate::metrics::prelude::*; client .histogram_with_tags($id, $value) $(.with_tag($k, $v))*