From 552c8de53b34cce642ff336066be80c544c4c9d9 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 5 Mar 2024 15:58:51 +0000 Subject: [PATCH] wip: metrics layer --- Cargo.lock | 34 ++--- limitador-server/src/main.rs | 23 ++++ limitador-server/src/metrics.rs | 236 ++++++++++++++++++++++++++++++++ 3 files changed, 267 insertions(+), 26 deletions(-) create mode 100644 limitador-server/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 6282425e..6e759665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -821,46 +821,37 @@ checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.15" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", - "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1704,15 +1695,6 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" -[[package]] -name = "memoffset" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.17" diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 5f2fa5e7..b43042ae 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -12,6 +12,7 @@ use crate::config::{ }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; +use crate::metrics::MetricsLayer; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; use limitador::counter::Counter; @@ -52,6 +53,7 @@ mod envoy_rls; mod http_api; mod config; +mod metrics; const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION"); const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE"); @@ -300,7 +302,17 @@ async fn main() -> Result<(), Box> { } else { tracing_subscriber::fmt::layer() }; + // let metrics_layer = Builder::default().layer(|| Histogram::new_with_max(1_000_000, 2).unwrap()); + let metrics_layer = MetricsLayer::new().gather( + "should_rate_limit", + |timings| println!("should_rate_limit/datastore timings: {:?}", timings), + vec!["datastore"], + ); + // .consume(|); + // should_rate_limit + // datastore + // let mut subscriber = &tracing_subscriber::registry().with(level).with(fmt_layer); if !config.tracing_host.is_empty() { let tracer = opentelemetry_otlp::new_pipeline() .tracing() @@ -315,18 +327,25 @@ async fn main() -> Result<(), Box> { ]))) .install_batch(opentelemetry_sdk::runtime::Tokio)?; let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); + // subscriber.with(telemetry_layer); tracing_subscriber::registry() .with(level) .with(fmt_layer) .with(telemetry_layer) + // .with(MetricsLayer) + .with(metrics_layer) .init(); } else { tracing_subscriber::registry() .with(level) .with(fmt_layer) + // .with(MetricsLayer) + .with(metrics_layer) .init(); }; + // subscriber.init(); + info!("Version: {}", version); info!("Using config: {:?}", config); config @@ -345,6 +364,10 @@ async fn main() -> Result<(), Box> { process::exit(1) } }; + // match rate_limiter { + // Limiter::Blocking(limiter) => limiter., + // Limiter::Async(limiter) => limiter.gather_prometheus_metrics(), + // } info!("limits file path: {}", limit_file); if let Err(e) = rate_limiter.load_limits_from_file(&limit_file).await { diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs new file mode 100644 index 00000000..7ffa3ee9 --- /dev/null +++ b/limitador-server/src/metrics.rs @@ -0,0 +1,236 @@ +use std::collections::HashMap; +use std::ops; +use std::time::Instant; +use tracing::span::{Attributes, Id}; +use tracing::Subscriber; +use tracing_subscriber::layer::{Context, Layer}; +use tracing_subscriber::registry::LookupSpan; + +#[derive(Debug, Clone, Copy)] +struct Timings { + idle: u64, + busy: u64, + last: Instant, +} + +impl Timings { + fn new() -> Self { + Self { + idle: 0, + busy: 0, + last: Instant::now(), + } + } +} + +impl ops::Add for Timings { + type Output = Self; + + fn add(self, _rhs: Self) -> Self { + Self { + busy: self.busy + _rhs.busy, + idle: self.idle + _rhs.idle, + last: if self.last < _rhs.last { + self.last + } else { + _rhs.last + }, + } + } +} + +impl ops::AddAssign for Timings { + fn add_assign(&mut self, _rhs: Self) { + *self = *self + _rhs + } +} + +#[derive(Debug)] +struct SpanState { + group_times: HashMap, +} + +impl SpanState { + fn new(group: String) -> Self { + let mut hm = HashMap::new(); + hm.insert(group, Timings::new()); + Self { group_times: hm } + } + + fn increment(&mut self, group: String, timings: Timings) { + self.group_times + .entry(group) + .and_modify(|x| *x += timings) + .or_insert(timings); + } +} + +pub struct MetricsGroup { + consumer: Box, + records: Vec, +} + +impl MetricsGroup { + pub fn new(consumer: fn(Timings), records: Vec) -> Self { + Self { + consumer: Box::new(consumer), + records, + } + } +} + +pub struct MetricsLayer { + groups: HashMap, +} + +impl MetricsLayer { + pub fn new() -> Self { + Self { + groups: HashMap::new(), + } + } + + pub fn gather(mut self, aggregate: &str, consumer: fn(Timings), records: Vec<&str>) -> Self { + // TODO: does not handle case where aggregate already exists + let rec = records.iter().map(|r| r.to_string()).collect(); + self.groups + .entry(aggregate.to_string()) + .or_insert(MetricsGroup::new(consumer, rec)); + self + } +} + +impl Layer for MetricsLayer +where + S: Subscriber, + S: for<'lookup> LookupSpan<'lookup>, +{ + fn on_new_span(&self, _attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + + // option 1 + // if is aggregate, append marker + // if marker present + record -> start timing + + // option 2 + // if is record, iterate through parents + // if aggregate present -> start timing + + // if this is an aggregate span + // or parent is an aggregate span + // append new marker + + // should_rate_limit -> datastore + // check_and_update -> datastore + + // if there's a parent + if let Some(parent) = span.parent() { + // if the parent has SpanState propagate to this span + if let Some(span_state) = parent.extensions_mut().get_mut::() { + extensions.insert(SpanState { + group_times: span_state.group_times.clone(), + }); + } + } + + // if we are an aggregator + if self.groups.contains_key(name) { + if let Some(span_state) = extensions.get_mut::() { + // if the SpanState has come from parent and we are not present, add ourselves + span_state + .group_times + .entry(name.to_string()) + .or_insert(Timings::new()); + } else { + // otherwise create a new SpanState with ourselves + extensions.insert(SpanState::new(name.to_string())) + } + } + + // if timing is already set (which it shouldn't be) + // don't create it again + if !extensions.get_mut::().is_none() { + return; + } + + if let Some(span_state) = extensions.get_mut::() { + // either we are an aggregator or nested within one + for group in span_state.group_times.keys() { + for record in self.groups.get(group).unwrap().records { + if name == record { + extensions.insert(Timings::new()); + return; + } + } + } + // if here we are an intermediate span that should not be recorded + } + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + span.name(); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.idle += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.busy += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let span = ctx.span(&id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + + // if timings set + record -> add to total??? marker/field? + // if aggregate + marker/field? -> consume() + + if let Some(timing) = extensions.get_mut::() { + let mut t = *timing; + t.idle += (Instant::now() - t.last).as_nanos() as u64; + + if let Some(span_state) = extensions.get_mut::() { + let group_times = span_state.group_times.clone(); + 'aggregate: for group in group_times.keys() { + for record in self.groups.get(group).unwrap().records { + if name == record { + span_state.increment(group.to_string(), t); + continue 'aggregate; + } + } + } + // IF we are aggregator + // CONSUME + match self.groups.get(name) { + Some(metrics_group) => { + (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) + } + _ => (), + } + } + } + } +} + +// mylayer.gather("aggregate_on", timings| pr.vomit(timings), ["datastore"]) + +// mylayer.gather("aggregate_on", ["datastore"]).consumer("aggregate_on", |timings| pr.vomit(timings)) + +// write a consumer function, takes a function that does something with the timings +// Fn +// fn consumer(timings: Timings) -> () {}