From 028168abb6b8ba60022ce4a056a3be68704087bb Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 21 Dec 2023 13:45:06 -0500 Subject: [PATCH] Wire prometheus metrics in AsyncRedis counter store --- limitador/src/lib.rs | 3 +- limitador/src/prometheus_metrics.rs | 60 +++++++++++++++++++ .../storage/infinispan/infinispan_storage.rs | 4 +- limitador/src/storage/mod.rs | 9 ++- limitador/src/storage/redis/redis_async.rs | 31 +++++++--- limitador/src/storage/redis/redis_cached.rs | 4 +- 6 files changed, 96 insertions(+), 15 deletions(-) diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 838ab035..b3c298b9 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -596,9 +596,10 @@ impl AsyncRateLimiter { }); } + let access = self.prometheus_metrics.counter_accesses(); let check_result = self .storage - .check_and_update(&mut counters, delta, load_counters) + .check_and_update(&mut counters, delta, load_counters, access) .await?; let counters = if load_counters { diff --git a/limitador/src/prometheus_metrics.rs b/limitador/src/prometheus_metrics.rs index b7cc09f4..4b2e0f51 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador/src/prometheus_metrics.rs @@ -83,6 +83,14 @@ impl PrometheusMetrics { self.counter_latency.observe(duration.as_secs_f64()); } + #[must_use] + pub fn counter_accesses(&self) -> CounterAccess { + CounterAccess { + metrics: self, + duration: Duration::ZERO, + } + } + pub fn gather_metrics(&self) -> String { let mut buffer = Vec::new(); @@ -163,6 +171,25 @@ impl PrometheusMetrics { } } +pub struct CounterAccess<'a> { + metrics: &'a PrometheusMetrics, + duration: Duration, +} + +impl CounterAccess<'_> { + pub fn observe(&mut self, duration: Duration) { + self.duration += duration; + } +} + +impl<'a> Drop for CounterAccess<'a> { + fn drop(&mut self) { + if self.duration > Duration::ZERO { + self.metrics.counter_access(self.duration); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -297,6 +324,39 @@ mod tests { ) } + #[test] + fn collects_latencies() { + let metrics = PrometheusMetrics::new(); + assert_eq!(metrics.counter_latency.get_sample_count(), 0); + { + let _access = metrics.counter_accesses(); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 0); + { + let mut access = metrics.counter_accesses(); + access.observe(Duration::from_millis(12)); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 1); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(12).as_secs_f64() + ); + { + let mut access = metrics.counter_accesses(); + access.observe(Duration::from_millis(5)); + assert_eq!(metrics.counter_latency.get_sample_count(), 1); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(12).as_secs_f64() + ); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 2); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(17).as_secs_f64() + ); + } + fn formatted_counter_with_namespace_and_limit( metric_name: &str, count: i32, diff --git a/limitador/src/storage/infinispan/infinispan_storage.rs b/limitador/src/storage/infinispan/infinispan_storage.rs index ac9d8349..af790634 100644 --- a/limitador/src/storage/infinispan/infinispan_storage.rs +++ b/limitador/src/storage/infinispan/infinispan_storage.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::infinispan::counters::{Consistency, CounterOpts}; use crate::storage::infinispan::response::response_to_string; use crate::storage::infinispan::{ @@ -68,11 +69,12 @@ impl AsyncCounterStorage for InfinispanStorage { } #[tracing::instrument(skip_all)] - async fn check_and_update( + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + _counter_access: CounterAccess<'a>, ) -> Result { let mut counter_keys = Vec::with_capacity(counters.len()); diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index fec8b013..c6cb1c92 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; +use crate::prometheus_metrics::CounterAccess; use crate::InMemoryStorage; use async_trait::async_trait; use std::collections::{HashMap, HashSet}; @@ -237,14 +238,15 @@ impl AsyncStorage { self.counters.update_counter(counter, delta).await } - pub async fn check_and_update( + pub async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + counter_access: CounterAccess<'a>, ) -> Result { self.counters - .check_and_update(counters, delta, load_counters) + .check_and_update(counters, delta, load_counters, counter_access) .await } @@ -281,11 +283,12 @@ pub trait CounterStorage: Sync + Send { pub trait AsyncCounterStorage: Sync + Send { async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result; async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>; - async fn check_and_update( + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + counter_access: CounterAccess<'a>, ) -> Result; async fn get_counters(&self, limits: HashSet) -> Result, StorageErr>; async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr>; diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 92589c9f..bb94e933 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,6 +4,7 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; @@ -12,7 +13,7 @@ use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, Instant}; use tracing::{trace_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we @@ -70,11 +71,12 @@ impl AsyncCounterStorage for AsyncRedisStorage { } #[tracing::instrument(skip_all)] - async fn check_and_update( + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + mut counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.conn_manager.clone(); let counter_keys: Vec = counters.iter().map(key_for_counter).collect(); @@ -89,9 +91,14 @@ impl AsyncCounterStorage for AsyncRedisStorage { let script_res: Vec> = { let span = trace_span!("datastore"); - async { script_invocation.invoke_async(&mut con).await } - .instrument(span) - .await? + async { + let start = Instant::now(); + let result = script_invocation.invoke_async(&mut con).await; + counter_access.observe(start.elapsed()); + result + } + .instrument(span) + .await? }; if let Some(res) = is_limited(counters, delta, script_res) { return Ok(res); @@ -100,10 +107,13 @@ impl AsyncCounterStorage for AsyncRedisStorage { let counter_vals: Vec> = { let span = trace_span!("datastore"); async { - redis::cmd("MGET") + let start = Instant::now(); + let result = redis::cmd("MGET") .arg(counter_keys.clone()) .query_async(&mut con) - .await + .await; + counter_access.observe(start.elapsed()); + result } .instrument(span) .await? @@ -124,14 +134,17 @@ impl AsyncCounterStorage for AsyncRedisStorage { let counter = &counters[counter_idx]; let span = trace_span!("datastore"); async { - redis::Script::new(SCRIPT_UPDATE_COUNTER) + let start = Instant::now(); + let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) .arg(counter.max_value()) .arg(counter.seconds()) .arg(delta) .invoke_async::<_, _>(&mut con) - .await + .await; + counter_access.observe(start.elapsed()); + result } .instrument(span) .await? diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 418892f5..2f6e1fc1 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::batcher::Batcher; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; @@ -66,11 +67,12 @@ impl AsyncCounterStorage for CachedRedisStorage { // atomically, but that'd be too slow. // This function trades accuracy for speed. #[tracing::instrument(skip_all)] - async fn check_and_update( + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + _counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.redis_conn_manager.clone();