Skip to content

Commit

Permalink
Wire prometheus metrics in AsyncRedis counter store
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Dec 21, 2023
1 parent 0641a34 commit 6f92ad6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 15 deletions.
3 changes: 2 additions & 1 deletion limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,10 @@ impl AsyncRateLimiter {
});
}

let access = self.prometheus_metrics.counter_accesses();
let check_result = self
.storage
.check_and_update(&mut counters, delta, load_counters)
.check_and_update(&mut counters, delta, load_counters, access)
.await?;

let counters = if load_counters {
Expand Down
60 changes: 60 additions & 0 deletions limitador/src/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ impl PrometheusMetrics {
self.counter_latency.observe(duration.as_secs_f64());
}

#[must_use]
pub fn counter_accesses(&self) -> CounterAccess {
CounterAccess {
metrics: self,
duration: Duration::ZERO,
}
}

pub fn gather_metrics(&self) -> String {
let mut buffer = Vec::new();

Expand Down Expand Up @@ -163,6 +171,25 @@ impl PrometheusMetrics {
}
}

pub struct CounterAccess<'a> {
metrics: &'a PrometheusMetrics,
duration: Duration,
}

impl CounterAccess<'_> {
pub fn observe(&mut self, duration: Duration) {
self.duration += duration;
}
}

impl<'a> Drop for CounterAccess<'a> {
fn drop(&mut self) {
if self.duration > Duration::ZERO {
self.metrics.counter_access(self.duration);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -297,6 +324,39 @@ mod tests {
)
}

#[test]
fn collects_latencies() {
let metrics = PrometheusMetrics::new();
assert_eq!(metrics.counter_latency.get_sample_count(), 0);
{
let _access = metrics.counter_accesses();
}
assert_eq!(metrics.counter_latency.get_sample_count(), 0);
{
let mut access = metrics.counter_accesses();
access.observe(Duration::from_millis(12));
}
assert_eq!(metrics.counter_latency.get_sample_count(), 1);
assert_eq!(
metrics.counter_latency.get_sample_sum(),
Duration::from_millis(12).as_secs_f64()
);
{
let mut access = metrics.counter_accesses();
access.observe(Duration::from_millis(5));
assert_eq!(metrics.counter_latency.get_sample_count(), 1);
assert_eq!(
metrics.counter_latency.get_sample_sum(),
Duration::from_millis(12).as_secs_f64()
);
}
assert_eq!(metrics.counter_latency.get_sample_count(), 2);
assert_eq!(
metrics.counter_latency.get_sample_sum(),
Duration::from_millis(17).as_secs_f64()
);
}

fn formatted_counter_with_namespace_and_limit(
metric_name: &str,
count: i32,
Expand Down
4 changes: 3 additions & 1 deletion limitador/src/storage/infinispan/infinispan_storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::infinispan::counters::{Consistency, CounterOpts};
use crate::storage::infinispan::response::response_to_string;
use crate::storage::infinispan::{
Expand Down Expand Up @@ -68,11 +69,12 @@ impl AsyncCounterStorage for InfinispanStorage {
}

#[tracing::instrument(skip_all)]
async fn check_and_update(
async fn check_and_update<'a>(
&self,
counters: &mut Vec<Counter>,
delta: i64,
load_counters: bool,
_counter_access: CounterAccess<'a>,
) -> Result<Authorization, StorageErr> {
let mut counter_keys = Vec::with_capacity(counters.len());

Expand Down
9 changes: 6 additions & 3 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::counter::Counter;
use crate::limit::{Limit, Namespace};
use crate::prometheus_metrics::CounterAccess;
use crate::InMemoryStorage;
use async_trait::async_trait;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -237,14 +238,15 @@ impl AsyncStorage {
self.counters.update_counter(counter, delta).await
}

pub async fn check_and_update(
pub async fn check_and_update<'a>(
&self,
counters: &mut Vec<Counter>,
delta: i64,
load_counters: bool,
counter_access: CounterAccess<'a>,
) -> Result<Authorization, StorageErr> {
self.counters
.check_and_update(counters, delta, load_counters)
.check_and_update(counters, delta, load_counters, counter_access)
.await
}

Expand Down Expand Up @@ -281,11 +283,12 @@ pub trait CounterStorage: Sync + Send {
pub trait AsyncCounterStorage: Sync + Send {
async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr>;
async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>;
async fn check_and_update(
async fn check_and_update<'a>(
&self,
counters: &mut Vec<Counter>,
delta: i64,
load_counters: bool,
counter_access: CounterAccess<'a>,
) -> Result<Authorization, StorageErr>;
async fn get_counters(&self, limits: HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr>;
async fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr>;
Expand Down
31 changes: 22 additions & 9 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use self::redis::aio::ConnectionManager;
use self::redis::ConnectionInfo;
use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::keys::*;
use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
Expand All @@ -12,7 +13,7 @@ use async_trait::async_trait;
use redis::{AsyncCommands, RedisError};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{trace_span, Instrument};

// Note: this implementation does not guarantee exact limits. Ensuring that we
Expand Down Expand Up @@ -70,11 +71,12 @@ impl AsyncCounterStorage for AsyncRedisStorage {
}

#[tracing::instrument(skip_all)]
async fn check_and_update(
async fn check_and_update<'a>(
&self,
counters: &mut Vec<Counter>,
delta: i64,
load_counters: bool,
mut counter_access: CounterAccess<'a>,
) -> Result<Authorization, StorageErr> {
let mut con = self.conn_manager.clone();
let counter_keys: Vec<String> = counters.iter().map(key_for_counter).collect();
Expand All @@ -89,9 +91,14 @@ impl AsyncCounterStorage for AsyncRedisStorage {

let script_res: Vec<Option<i64>> = {
let span = trace_span!("datastore");
async { script_invocation.invoke_async(&mut con).await }
.instrument(span)
.await?
async {
let start = Instant::now();
let result = script_invocation.invoke_async(&mut con).await;
counter_access.observe(start.elapsed());
result
}
.instrument(span)
.await?
};
if let Some(res) = is_limited(counters, delta, script_res) {
return Ok(res);
Expand All @@ -100,10 +107,13 @@ impl AsyncCounterStorage for AsyncRedisStorage {
let counter_vals: Vec<Option<i64>> = {
let span = trace_span!("datastore");
async {
redis::cmd("MGET")
let start = Instant::now();
let result = redis::cmd("MGET")
.arg(counter_keys.clone())
.query_async(&mut con)
.await
.await;
counter_access.observe(start.elapsed());
result
}
.instrument(span)
.await?
Expand All @@ -124,14 +134,17 @@ impl AsyncCounterStorage for AsyncRedisStorage {
let counter = &counters[counter_idx];
let span = trace_span!("datastore");
async {
redis::Script::new(SCRIPT_UPDATE_COUNTER)
let start = Instant::now();
let result = redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.max_value())
.arg(counter.seconds())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.await
.await;
counter_access.observe(start.elapsed());
result
}
.instrument(span)
.await?
Expand Down
4 changes: 3 additions & 1 deletion limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::keys::*;
use crate::storage::redis::batcher::Batcher;
use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder};
Expand Down Expand Up @@ -66,11 +67,12 @@ impl AsyncCounterStorage for CachedRedisStorage {
// atomically, but that'd be too slow.
// This function trades accuracy for speed.
#[tracing::instrument(skip_all)]
async fn check_and_update(
async fn check_and_update<'a>(
&self,
counters: &mut Vec<Counter>,
delta: i64,
load_counters: bool,
_counter_access: CounterAccess<'a>,
) -> Result<Authorization, StorageErr> {
let mut con = self.redis_conn_manager.clone();

Expand Down

0 comments on commit 6f92ad6

Please sign in to comment.