diff --git a/Cargo.lock b/Cargo.lock index 7a553227..ae2fd8eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,8 +39,8 @@ dependencies = [ "encoding_rs", "flate2", "futures-core", - "h2", - "http", + "h2 0.3.26", + "http 0.2.12", "httparse", "httpdate", "itoa", @@ -75,7 +75,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d22475596539443685426b6bdadb926ad0ecaefdfc5fb05e5e3441f15463c511" dependencies = [ "bytestring", - "http", + "http 0.2.12", "regex", "serde", "tracing", @@ -359,9 +359,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -385,8 +385,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -1110,7 +1110,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", "indexmap 2.2.6", "slab", "tokio", @@ -1148,6 +1167,9 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", +] [[package]] name = "heapless" @@ -1192,6 +1214,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1199,7 +1232,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1225,9 +1281,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1239,13 +1295,34 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.4", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -1258,10 +1335,46 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.3.1", + "hyper-util", "native-tls", "tokio", "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1301,7 +1414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d961790ad3dae0ea7ec1a4ab2c32de1a0bf5f3d7d7fddb0a0f2059b96ecafe0a" dependencies = [ "base64 0.13.1", - "http", + "http 0.2.12", "reqwest", "serde", "serde_json", @@ -1481,6 +1594,7 @@ dependencies = [ "futures", "getrandom", "infinispan", + "metrics", "moka", "paste", "postcard", @@ -1511,6 +1625,8 @@ dependencies = [ "lazy_static", "limitador", "log", + "metrics", + "metrics-exporter-prometheus", "notify", "openssl", "opentelemetry", @@ -1518,7 +1634,6 @@ dependencies = [ "opentelemetry-stdout", "opentelemetry_sdk", "paperclip", - "prometheus", "prost", "prost-types", "serde", @@ -1606,6 +1721,52 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "metrics" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d58e362dc7206e9456ddbcdbd53c71ba441020e62104703075a69151e38d85f" +dependencies = [ + "base64 0.22.0", + "http-body-util", + "hyper 1.3.1", + "hyper-tls 0.6.0", + "hyper-util", + "indexmap 2.2.6", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.3", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1864,7 +2025,7 @@ checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" dependencies = [ "async-trait", "futures-core", - "http", + "http 0.2.12", "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", @@ -2007,7 +2168,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce6e25ce2c5362c8d48dc89e0f9ca076d507f7c1eabd04f0d593cdf5addff90c" dependencies = [ "heck 0.4.1", - "http", + "http 0.2.12", "lazy_static", "mime", "proc-macro-error", @@ -2129,6 +2290,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "postcard" version = "1.0.8" @@ -2196,21 +2363,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", -] - [[package]] name = "prost" version = "0.12.4" @@ -2264,12 +2416,6 @@ dependencies = [ "prost", ] -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "quanta" version = "0.12.3" @@ -2464,11 +2610,11 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-tls", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", @@ -2764,6 +2910,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -3094,10 +3246,10 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index 2214d216..67ae9909 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" edition = "2021" [features] -infinispan = [ "limitador/infinispan_storage" ] +infinispan = ["limitador/infinispan_storage"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -46,7 +46,8 @@ lazy_static = "1.4.0" clap = "4.3" sysinfo = "0.30.10" openssl = { version = "0.10.57", features = ["vendored"] } -prometheus = "0.13.3" +metrics = "0.22.3" +metrics-exporter-prometheus = "0.14.0" [build-dependencies] diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index d58a6199..3511b202 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -276,6 +276,7 @@ mod tests { use crate::envoy_rls::server::envoy::extensions::common::ratelimit::v3::rate_limit_descriptor::Entry; use crate::envoy_rls::server::envoy::extensions::common::ratelimit::v3::RateLimitDescriptor; + use crate::prometheus_metrics::tests::TEST_PROMETHEUS_HANDLE; use crate::Configuration; use super::*; @@ -311,7 +312,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::Blocking(limiter)), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { @@ -370,7 +374,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::new(Configuration::default()).await.unwrap()), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { @@ -401,7 +408,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::new(Configuration::default()).await.unwrap()), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { @@ -444,7 +454,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::Blocking(limiter)), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { @@ -503,7 +516,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::Blocking(limiter)), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { @@ -569,7 +585,10 @@ mod tests { let rate_limiter = MyRateLimiter::new( Arc::new(Limiter::Blocking(limiter)), RateLimitHeaders::DraftVersion03, - Arc::new(PrometheusMetrics::default()), + Arc::new(PrometheusMetrics::new_with_handle( + false, + TEST_PROMETHEUS_HANDLE.clone(), + )), ); let req = RateLimitRequest { diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index d5063d3e..24bf26af 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -232,6 +232,7 @@ pub async fn run_http_server( #[cfg(test)] mod tests { use super::*; + use crate::prometheus_metrics::tests::TEST_PROMETHEUS_HANDLE; use crate::Configuration; use actix_web::{test, web}; use limitador::limit::Limit as LimitadorLimit; @@ -258,7 +259,9 @@ mod tests { async fn test_metrics() { let rate_limiter: Arc = Arc::new(Limiter::new(Configuration::default()).await.unwrap()); - let prometheus_metrics: Arc = Arc::new(PrometheusMetrics::default()); + let prometheus_metrics: Arc = Arc::new( + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()), + ); let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics)); let app = test::init_service( App::new() @@ -283,7 +286,9 @@ mod tests { let limit = create_test_limit(&limiter, namespace, 10).await; let rate_limiter: Arc = Arc::new(limiter); - let prometheus_metrics: Arc = Arc::new(PrometheusMetrics::default()); + let prometheus_metrics: Arc = Arc::new( + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()), + ); let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics)); let app = test::init_service( App::new() @@ -310,7 +315,9 @@ mod tests { let namespace = "test_namespace"; let _limit = create_test_limit(&limiter, namespace, 1).await; let rate_limiter: Arc = Arc::new(limiter); - let prometheus_metrics: Arc = Arc::new(PrometheusMetrics::default()); + let prometheus_metrics: Arc = Arc::new( + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()), + ); let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics)); let app = test::init_service( App::new() @@ -355,7 +362,9 @@ mod tests { let _limit = create_test_limit(&limiter, namespace, 1).await; let rate_limiter: Arc = Arc::new(limiter); - let prometheus_metrics: Arc = Arc::new(PrometheusMetrics::default()); + let prometheus_metrics: Arc = Arc::new( + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()), + ); let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics)); let app = test::init_service( App::new() diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index a1989eb2..569d8202 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -14,6 +14,7 @@ use crate::config::{ use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; use crate::metrics::MetricsLayer; +use ::metrics::histogram; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; use limitador::counter::Counter; @@ -256,7 +257,7 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option { #[actix_rt::main] async fn main() -> Result<(), Box> { - let (config, prometheus_metrics) = { + let config = { let (config, version) = create_config(); println!("{LIMITADOR_HEADER} {version}"); let level = config.log_level.unwrap_or_else(|| { @@ -270,14 +271,9 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::layer() }; - let limit_name_in_metrics = config.limit_name_in_labels; - let prometheus_metrics = - Arc::new(PrometheusMetrics::new_with_options(limit_name_in_metrics)); - let metrics = prometheus_metrics.clone(); - let metrics_layer = MetricsLayer::new().gather( "should_rate_limit", - move |timings| metrics.counter_access(Duration::from(timings)), + |timings| histogram!("counter_latency").record(Duration::from(timings).as_secs_f64()), vec!["datastore"], ); @@ -311,9 +307,13 @@ async fn main() -> Result<(), Box> { info!("Version: {}", version); info!("Using config: {:?}", config); - (config, prometheus_metrics) + config }; + let prometheus_metrics = Arc::new(PrometheusMetrics::new_with_options( + config.limit_name_in_labels, + )); + let limit_file = config.limits_file.clone(); let envoy_rls_address = config.rlp_address(); let http_api_address = config.http_address(); diff --git a/limitador-server/src/prometheus_metrics.rs b/limitador-server/src/prometheus_metrics.rs index 3fbe807a..67dd60f5 100644 --- a/limitador-server/src/prometheus_metrics.rs +++ b/limitador-server/src/prometheus_metrics.rs @@ -1,17 +1,14 @@ +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge}; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; +use std::sync::Arc; + use limitador::limit::Namespace; -use prometheus::{ - Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder, -}; -use std::time::Duration; const NAMESPACE_LABEL: &str = "limitador_namespace"; const LIMIT_NAME_LABEL: &str = "limit_name"; pub struct PrometheusMetrics { - registry: Registry, - authorized_calls: IntCounterVec, - limited_calls: IntCounterVec, - counter_latency: Histogram, + prometheus_handle: Arc, use_limit_name_label: bool, } @@ -34,118 +31,83 @@ impl PrometheusMetrics { Self::new_with_options(true) } + pub fn new_with_options(use_limit_name_label: bool) -> Self { + Self::new_with_handle(use_limit_name_label, Arc::new(Self::init_handle())) + } + + pub(crate) fn new_with_handle( + use_limit_name_label: bool, + prometheus_handle: Arc, + ) -> Self { + describe_histogram!( + "counter_latency", + "Latency to the underlying counter datastore" + ); + describe_counter!("authorized_calls", "Authorized calls"); + describe_counter!("limited_calls", "Limited calls"); + describe_gauge!("limitador_up", "Limitador is running"); + gauge!("limitador_up").set(1); + Self { + use_limit_name_label, + prometheus_handle, + } + } + + // Creates and installs the prometheus exporter as global recorder + // Only one recorder can be registered for the lifetime of the application + fn init_handle() -> PrometheusHandle { + let prom_builder = PrometheusBuilder::new(); + prom_builder + .install_recorder() + .expect("failed to create prometheus metrics exporter") + } + pub fn incr_authorized_calls(&self, namespace: &Namespace) { - self.authorized_calls - .with_label_values(&[namespace.as_ref()]) - .inc(); + counter!("authorized_calls", NAMESPACE_LABEL => namespace.as_ref().to_string()).increment(1) } pub fn incr_limited_calls<'a, LN>(&self, namespace: &Namespace, limit_name: LN) where LN: Into>, { - let mut labels = vec![namespace.as_ref()]; + let mut labels = vec![(NAMESPACE_LABEL, namespace.as_ref().to_string())]; if self.use_limit_name_label { // If we have configured the metric to accept 2 labels we need to // set values for them. - labels.push(limit_name.into().unwrap_or("")); + labels.push(( + LIMIT_NAME_LABEL, + limit_name.into().unwrap_or("").to_string(), + )); } - - self.limited_calls.with_label_values(&labels).inc(); - } - - pub fn counter_access(&self, duration: Duration) { - self.counter_latency.observe(duration.as_secs_f64()); + counter!("limited_calls", &labels).increment(1) } pub fn gather_metrics(&self) -> String { - let mut buffer = Vec::new(); - - TextEncoder::new() - .encode(&self.registry.gather(), &mut buffer) - .unwrap(); - - String::from_utf8(buffer).unwrap() - } - - pub fn new_with_options(use_limit_name_label: bool) -> Self { - let authorized_calls_counter = Self::authorized_calls_counter(); - let limited_calls_counter = Self::limited_calls_counter(use_limit_name_label); - let limitador_up_gauge = Self::limitador_up_gauge(); - let counter_latency = Self::counter_latency(); - - let registry = Registry::new(); - - registry - .register(Box::new(authorized_calls_counter.clone())) - .unwrap(); - - registry - .register(Box::new(limited_calls_counter.clone())) - .unwrap(); - - registry - .register(Box::new(limitador_up_gauge.clone())) - .unwrap(); - - registry - .register(Box::new(counter_latency.clone())) - .unwrap(); - - limitador_up_gauge.set(1); - - Self { - registry, - authorized_calls: authorized_calls_counter, - limited_calls: limited_calls_counter, - counter_latency, - use_limit_name_label, - } - } - - fn authorized_calls_counter() -> IntCounterVec { - IntCounterVec::new( - Opts::new("authorized_calls", "Authorized calls"), - &[NAMESPACE_LABEL], - ) - .unwrap() - } - - fn limited_calls_counter(use_limit_name_label: bool) -> IntCounterVec { - let mut labels = vec![NAMESPACE_LABEL]; - - if use_limit_name_label { - labels.push(LIMIT_NAME_LABEL); - } - - IntCounterVec::new(Opts::new("limited_calls", "Limited calls"), &labels).unwrap() - } - - fn limitador_up_gauge() -> IntGauge { - IntGauge::new("limitador_up", "Limitador is running").unwrap() - } - - fn counter_latency() -> Histogram { - Histogram::with_opts(HistogramOpts::new( - "counter_latency", - "Latency to the underlying counter datastore", - )) - .unwrap() + self.prometheus_handle.render() } } #[cfg(test)] -mod tests { +pub mod tests { use super::*; + use lazy_static::lazy_static; + use metrics_exporter_prometheus::PrometheusHandle; + + // Setting recorder once for all test cases + lazy_static! { + pub static ref TEST_PROMETHEUS_HANDLE: Arc = + Arc::new(PrometheusMetrics::init_handle()); + } #[test] fn shows_authorized_calls_by_namespace() { - let prometheus_metrics = PrometheusMetrics::new(); + let prometheus_metrics = + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()); let namespaces_with_auth_counts = [ - ("some_namespace".into(), 2), - ("another_namespace".into(), 3), + ("auth_calls_by_namespace".into(), 2), + ("auth_calls_by_namespace_two".into(), 3), ]; namespaces_with_auth_counts @@ -171,11 +133,12 @@ mod tests { #[test] fn shows_limited_calls_by_namespace() { - let prometheus_metrics = PrometheusMetrics::new(); + let prometheus_metrics = + PrometheusMetrics::new_with_handle(false, TEST_PROMETHEUS_HANDLE.clone()); let namespaces_with_limited_counts = [ - ("some_namespace".into(), 2), - ("another_namespace".into(), 3), + ("limited_calls_by_namespace".into(), 2), + ("limited_calls_by_namespace_two".into(), 3), ]; namespaces_with_limited_counts @@ -201,11 +164,12 @@ mod tests { #[test] fn can_show_limited_calls_by_limit_name() { - let prometheus_metrics = PrometheusMetrics::new_with_counters_by_limit_name(); + let prometheus_metrics = + PrometheusMetrics::new_with_handle(true, TEST_PROMETHEUS_HANDLE.clone()); let limits_with_counts = [ - ("some_namespace".into(), "Some limit", 2), - ("some_namespace".into(), "Another limit", 3), + ("limited_calls_by_limit_name".into(), "Some limit", 2), + ("limited_calls_by_limit_name".into(), "Another limit", 3), ]; limits_with_counts @@ -234,8 +198,9 @@ mod tests { #[test] fn incr_limited_calls_uses_empty_string_when_no_name() { - let prometheus_metrics = PrometheusMetrics::new_with_counters_by_limit_name(); - let namespace = "some namespace".into(); + let prometheus_metrics = + PrometheusMetrics::new_with_handle(true, TEST_PROMETHEUS_HANDLE.clone()); + let namespace = "limited_calls_empty_name".into(); prometheus_metrics.incr_limited_calls(&namespace, None); let metrics_output = prometheus_metrics.gather_metrics(); @@ -252,7 +217,9 @@ mod tests { #[test] fn shows_limitador_up_set_to_1() { - let metrics_output = PrometheusMetrics::new().gather_metrics(); + let metrics_output = + PrometheusMetrics::new_with_handle(true, TEST_PROMETHEUS_HANDLE.clone()) + .gather_metrics(); assert!(metrics_output.contains("limitador_up 1")) } @@ -276,10 +243,10 @@ mod tests { limit_name: &str, ) -> String { format!( - "{}{{limit_name=\"{}\",limitador_namespace=\"{}\"}} {}", + "{}{{limitador_namespace=\"{}\",limit_name=\"{}\"}} {}", metric_name, - limit_name, namespace.as_ref(), + limit_name, count, ) } diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 7f791fe8..0e300804 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -33,6 +33,7 @@ futures = "0.3" async-trait = "0.1" cfg-if = "1" tracing = "0.1.40" +metrics = "0.22.3" # Optional dependencies rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] } diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 76547974..eba2bed3 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -9,6 +9,7 @@ use dashmap::DashMap; use moka::sync::Cache; use std::collections::HashMap; use std::future::Future; +use std::ops::Not; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -16,6 +17,7 @@ use tokio::select; use tokio::sync::Notify; use tokio::sync::Semaphore; +#[derive(Debug)] pub struct CachedCounterValue { value: AtomicExpiringValue, initial_value: AtomicI64, @@ -41,7 +43,7 @@ impl CachedCounterValue { temp_value, now + Duration::from_secs(counter.seconds()), ), - initial_value: AtomicI64::new(temp_value), + initial_value: AtomicI64::new(0), expiry: AtomicExpiryTime::from_now(Duration::from_secs(counter.seconds())), from_authority: AtomicBool::new(false), } @@ -65,6 +67,7 @@ impl CachedCounterValue { .update(delta, counter.seconds(), SystemTime::now()); if value == delta { // new window, invalidate initial value + // which happens _after_ the self.value was reset, see `pending_writes` self.initial_value.store(0, Ordering::SeqCst); } value @@ -77,9 +80,11 @@ impl CachedCounterValue { value } else { let writes = value - start; - if writes > 0 { + if writes >= 0 { writes } else { + // self.value expired, is now less than the writes of the previous window + // which have not yet been reset... it'll be 0, so treat it as such. value } }; @@ -90,7 +95,7 @@ impl CachedCounterValue { Ok(_) => Ok(offset), Err(newer) => { if newer == 0 { - // We got expired in the meantime, this fresh value can wait the next iteration + // We got reset because of expiry, this fresh value can wait the next iteration Ok(0) } else { // Concurrent call to this method? @@ -124,7 +129,7 @@ impl CachedCounterValue { } pub fn requires_fast_flush(&self, within: &Duration) -> bool { - self.from_authority.load(Ordering::Acquire) || &self.value.ttl() <= within + self.from_authority.load(Ordering::Acquire).not() || &self.value.ttl() <= within } } @@ -397,20 +402,102 @@ mod tests { use crate::limit::Limit; use std::collections::HashMap; + mod cached_counter_value { + use crate::storage::redis::counters_cache::tests::test_counter; + use crate::storage::redis::counters_cache::CachedCounterValue; + use std::ops::Not; + use std::time::{Duration, SystemTime}; + + #[test] + fn records_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + assert_eq!(value.pending_writes(), Ok(0)); + value.delta(&counter, 5); + assert_eq!(value.pending_writes(), Ok(5)); + } + + #[test] + fn consumes_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert_eq!(value.pending_writes(), Ok(5)); + assert_eq!(value.pending_writes(), Ok(0)); + } + + #[test] + fn no_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert!(value.no_pending_writes().not()); + assert!(value.pending_writes().is_ok()); + assert!(value.no_pending_writes()); + } + + #[test] + fn setting_from_auth_resets_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert!(value.no_pending_writes().not()); + value.set_from_authority(&counter, 6, Duration::from_secs(1)); + assert!(value.no_pending_writes()); + assert_eq!(value.pending_writes(), Ok(0)); + } + + #[test] + fn from_authority_no_need_to_flush() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(10)); + assert!(value.requires_fast_flush(&Duration::from_secs(30)).not()); + } + + #[test] + fn from_authority_needs_to_flush_within_ttl() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + assert!(value.requires_fast_flush(&Duration::from_secs(90))); + } + + #[test] + fn fake_needs_to_flush_within_ttl() { + let counter = test_counter(10, None); + let value = CachedCounterValue::load_from_authority_asap(&counter, 0); + assert!(value.requires_fast_flush(&Duration::from_secs(30))); + } + + #[test] + fn expiry_of_cached_entry() { + let counter = test_counter(10, None); + let cache_entry_ttl = Duration::from_secs(1); + let value = CachedCounterValue::from_authority(&counter, 0, cache_entry_ttl); + let now = SystemTime::now(); + assert!(value.expired_at(now).not()); + assert!(value.expired_at(now + cache_entry_ttl)); + } + + #[test] + fn delegates_to_underlying_value() { + let hits = 4; + + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, hits); + assert!(value.to_next_window() > Duration::from_millis(59999)); + assert_eq!(value.hits(&counter), hits); + let remaining = counter.max_value() - hits; + assert_eq!(value.remaining(&counter), remaining); + assert!(value.is_limited(&counter, 1).not()); + assert!(value.is_limited(&counter, remaining).not()); + assert!(value.is_limited(&counter, remaining + 1)); + } + } + #[test] fn get_existing_counter() { - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - 10, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(10, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -426,18 +513,7 @@ mod tests { #[test] fn get_non_existing_counter() { - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - 10, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(10, None); let cache = CountersCacheBuilder::new().build(Duration::default()); @@ -448,18 +524,7 @@ mod tests { fn insert_saves_the_given_value_when_is_some() { let max_val = 10; let current_value = max_val / 2; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - max_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(max_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -479,18 +544,7 @@ mod tests { #[test] fn insert_saves_zero_when_redis_val_is_none() { let max_val = 10; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - max_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(max_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -508,18 +562,7 @@ mod tests { async fn increase_by() { let current_val = 10; let increase_by = 8; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - current_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(current_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -536,4 +579,22 @@ mod tests { (current_val + increase_by) ); } + + fn test_counter(max_val: i64, other_values: Option>) -> Counter { + let mut values = HashMap::new(); + values.insert("app_id".to_string(), "1".to_string()); + if let Some(overrides) = other_values { + values.extend(overrides); + } + Counter::new( + Limit::new( + "test_namespace", + max_val, + 60, + vec!["req.method == 'POST'"], + vec!["app_id"], + ), + values, + ) + } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index d5e8b0d1..3ca9feb1 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -12,6 +12,7 @@ use crate::storage::redis::{ }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; +use metrics::gauge; use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; @@ -219,8 +220,10 @@ fn flip_partitioned(storage: &AtomicBool, partition: bool) -> bool { .is_ok(); if we_flipped { if partition { + gauge!("datastore_partitioned").set(1); error!("Partition to Redis detected!") } else { + gauge!("datastore_partitioned").set(0); warn!("Partition to Redis resolved!"); } } @@ -298,8 +301,8 @@ async fn update_counters( return Ok(res); } - for (counter, delta) in counters_and_deltas { - let delta = delta.pending_writes().expect("State machine is wrong!"); + for (counter, value) in counters_and_deltas { + let delta = value.pending_writes().expect("State machine is wrong!"); if delta > 0 { script_invocation.key(key_for_counter(&counter)); script_invocation.key(key_for_counters_of_limit(counter.limit())); @@ -418,14 +421,13 @@ mod tests { Default::default(), ); - counters_and_deltas.insert( - counter.clone(), - Arc::new(CachedCounterValue::from_authority( - &counter, - 1, - Duration::from_secs(60), - )), - ); + let arc = Arc::new(CachedCounterValue::from_authority( + &counter, + 1, + Duration::from_secs(60), + )); + arc.delta(&counter, 1); + counters_and_deltas.insert(counter.clone(), arc); let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); @@ -437,7 +439,7 @@ mod tests { .arg(key_for_counters_of_limit(counter.limit())) .arg(60) .arg(1), - Ok(mock_response.clone()), + Ok(mock_response), )]); let result = update_counters(&mut mock_client, counters_and_deltas).await; @@ -476,7 +478,7 @@ mod tests { .arg(key_for_counters_of_limit(counter.limit())) .arg(60) .arg(2), - Ok(mock_response.clone()), + Ok(mock_response), )]); let cache = CountersCacheBuilder::new().build(Duration::from_millis(1)); @@ -484,11 +486,7 @@ mod tests { .batcher() .add( counter.clone(), - Arc::new(CachedCounterValue::from_authority( - &counter, - 2, - Duration::from_secs(60), - )), + Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 2)), ) .await; cache.insert(