From c8b8e4d4d1d9da9ec4ad1823ea8d75207b4a73ff Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 20 Dec 2023 09:16:53 -0500 Subject: [PATCH] Added spans for data accesses, logged when DEBUG or above --- Cargo.lock | 1 + limitador-server/src/envoy_rls/server.rs | 1 + limitador-server/src/http_api/server.rs | 6 ++++++ limitador-server/src/main.rs | 18 ++++++++++-------- limitador/Cargo.toml | 1 + limitador/src/storage/disk/rocksdb_storage.rs | 7 +++++++ limitador/src/storage/in_memory.rs | 7 +++++++ .../storage/infinispan/infinispan_storage.rs | 6 ++++++ limitador/src/storage/redis/redis_async.rs | 6 ++++++ limitador/src/storage/redis/redis_cached.rs | 6 ++++++ limitador/src/storage/redis/redis_sync.rs | 7 +++++++ 11 files changed, 58 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca51f972..efb41d25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1532,6 +1532,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tracing", "ttl_cache", ] diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index 48413c0d..f393e9b8 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -40,6 +40,7 @@ impl MyRateLimiter { #[tonic::async_trait] impl RateLimitService for MyRateLimiter { + #[tracing::instrument(skip_all)] async fn should_rate_limit( &self, request: Request, diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index aad0c4e8..3509b5d4 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -45,6 +45,7 @@ async fn status() -> web::Json<()> { } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn metrics(data: web::Data>) -> String { match data.get_ref().as_ref() { Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(), @@ -53,6 +54,7 @@ async fn metrics(data: web::Data>) -> String { } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn get_limits( data: web::Data>, namespace: web::Path, @@ -67,6 +69,7 @@ async fn get_limits( } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn get_counters( data: web::Data>, namespace: web::Path, @@ -90,6 +93,7 @@ async fn get_counters( } #[api_v2_operation] +#[tracing::instrument(skip(state))] async fn check( state: web::Data>, request: web::Json, @@ -118,6 +122,7 @@ async fn check( } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn report( data: web::Data>, request: web::Json, @@ -140,6 +145,7 @@ async fn report( } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn check_and_report( data: web::Data>, request: web::Json, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 39fc1570..bd80d7b5 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -41,6 +41,7 @@ use sysinfo::{RefreshKind, System, SystemExt}; use thiserror::Error; use tokio::runtime::Handle; use tracing::level_filters::LevelFilter; +use tracing_subscriber::fmt::format::FmtSpan; mod envoy_rls; mod http_api; @@ -284,16 +285,17 @@ async fn main() -> Result<(), Box> { let config = { let (config, version) = create_config(); println!("{LIMITADOR_HEADER} {version}"); - let builder = if let Some(level) = config.log_level { - tracing_subscriber::fmt().with_max_level(level) + let level = config.log_level.unwrap_or_else(|| { + tracing_subscriber::filter::EnvFilter::from_default_env() + .max_level_hint() + .unwrap_or(LevelFilter::ERROR) + }); + let builder = if level >= LevelFilter::DEBUG { + tracing_subscriber::fmt().with_span_events(FmtSpan::CLOSE) } else { - tracing_subscriber::fmt().with_max_level( - tracing_subscriber::filter::EnvFilter::from_default_env() - .max_level_hint() - .unwrap_or(LevelFilter::ERROR), - ) + tracing_subscriber::fmt() }; - builder.init(); + builder.with_max_level(level).init(); info!("Version: {}", version); info!("Using config: {:?}", config); diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 6c15a85a..0f2336b7 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -34,6 +34,7 @@ async-trait = "0.1" cfg-if = "1" prometheus = "0.13" lazy_static = "1" +tracing = "0.1.40" # Optional dependencies rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] } diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index f99ea49d..2b959c4c 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -18,22 +18,26 @@ pub struct RocksDbStorage { } impl CounterStorage for RocksDbStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let key = key_for_counter(counter); let value = self.insert_or_update(&key, counter, 0)?; Ok(counter.max_value() >= value.value() + delta) } + #[tracing::instrument(skip_all)] fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let key = key_for_counter(counter); self.insert_or_update(&key, counter, delta)?; Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -75,6 +79,7 @@ impl CounterStorage for RocksDbStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut counters = HashSet::default(); let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect(); @@ -103,6 +108,7 @@ impl CounterStorage for RocksDbStorage { Ok(counters) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { let counters = self.get_counters(&limits)?; for counter in &counters { @@ -111,6 +117,7 @@ impl CounterStorage for RocksDbStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { for entry in self.db.iterator(IteratorMode::Start) { self.db.delete(entry?.0)? diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index a1b21dc5..35f2a681 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -17,6 +17,7 @@ pub struct InMemoryStorage { } impl CounterStorage for InMemoryStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); @@ -35,6 +36,7 @@ impl CounterStorage for InMemoryStorage { Ok(counter.max_value() >= value + delta) } + #[tracing::instrument(skip_all)] fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> { if limit.variables().is_empty() { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); @@ -47,6 +49,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); let now = SystemTime::now(); @@ -90,6 +93,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -175,6 +179,7 @@ impl CounterStorage for InMemoryStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -215,6 +220,7 @@ impl CounterStorage for InMemoryStorage { Ok(res) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { self.delete_counters_of_limit(&limit); @@ -222,6 +228,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { self.limits_for_namespace.write().unwrap().clear(); Ok(()) diff --git a/limitador/src/storage/infinispan/infinispan_storage.rs b/limitador/src/storage/infinispan/infinispan_storage.rs index f3829658..ac9d8349 100644 --- a/limitador/src/storage/infinispan/infinispan_storage.rs +++ b/limitador/src/storage/infinispan/infinispan_storage.rs @@ -30,6 +30,7 @@ pub struct InfinispanStorageBuilder { #[async_trait] impl AsyncCounterStorage for InfinispanStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let counter_key = key_for_counter(counter); let counter_val = @@ -41,6 +42,7 @@ impl AsyncCounterStorage for InfinispanStorage { } } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let counter_key = key_for_counter(counter); @@ -65,6 +67,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn check_and_update( &self, counters: &mut Vec, @@ -130,6 +133,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -158,6 +162,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(res) } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { self.delete_counters_associated_with_limit(&limit).await?; @@ -165,6 +170,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let _ = self .infinispan diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 65659390..b123d315 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -30,6 +30,7 @@ pub struct AsyncRedisStorage { #[async_trait] impl AsyncCounterStorage for AsyncRedisStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_manager.clone(); @@ -42,6 +43,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { } } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); @@ -57,6 +59,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn check_and_update( &self, counters: &mut Vec, @@ -110,6 +113,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -143,6 +147,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { Ok(res) } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { self.delete_counters_associated_with_limit(&limit).await?; @@ -150,6 +155,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); redis::cmd("FLUSHDB").query_async(&mut con).await?; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 34e05f3c..418892f5 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -47,12 +47,14 @@ pub struct CachedRedisStorage { #[async_trait] impl AsyncCounterStorage for CachedRedisStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { self.async_redis_storage .is_within_limits(counter, delta) .await } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { self.async_redis_storage .update_counter(counter, delta) @@ -63,6 +65,7 @@ impl AsyncCounterStorage for CachedRedisStorage { // limits. In order to do so, we'd need to run this whole function // atomically, but that'd be too slow. // This function trades accuracy for speed. + #[tracing::instrument(skip_all)] async fn check_and_update( &self, counters: &mut Vec, @@ -166,14 +169,17 @@ impl AsyncCounterStorage for CachedRedisStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { self.async_redis_storage.get_counters(limits).await } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { self.async_redis_storage.delete_counters(limits).await } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { self.async_redis_storage.clear().await } diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index 8d0861e5..d6dee2ec 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -23,6 +23,7 @@ pub struct RedisStorage { } impl CounterStorage for RedisStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_pool.get()?; @@ -32,10 +33,12 @@ impl CounterStorage for RedisStorage { } } + #[tracing::instrument(skip_all)] fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; @@ -50,6 +53,7 @@ impl CounterStorage for RedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -100,6 +104,7 @@ impl CounterStorage for RedisStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -132,6 +137,7 @@ impl CounterStorage for RedisStorage { Ok(res) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; @@ -147,6 +153,7 @@ impl CounterStorage for RedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; redis::cmd("FLUSHDB").execute(&mut *con);