Skip to content

Commit

Permalink
Added spans for data accesses, logged when DEBUG or above
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Dec 20, 2023
1 parent 779dfef commit c8b8e4d
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl MyRateLimiter {

#[tonic::async_trait]
impl RateLimitService for MyRateLimiter {
#[tracing::instrument(skip_all)]
async fn should_rate_limit(
&self,
request: Request<RateLimitRequest>,
Expand Down
6 changes: 6 additions & 0 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn status() -> web::Json<()> {
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn metrics(data: web::Data<Arc<Limiter>>) -> String {
match data.get_ref().as_ref() {
Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(),
Expand All @@ -53,6 +54,7 @@ async fn metrics(data: web::Data<Arc<Limiter>>) -> String {
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn get_limits(
data: web::Data<Arc<Limiter>>,
namespace: web::Path<String>,
Expand All @@ -67,6 +69,7 @@ async fn get_limits(
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn get_counters(
data: web::Data<Arc<Limiter>>,
namespace: web::Path<String>,
Expand All @@ -90,6 +93,7 @@ async fn get_counters(
}

#[api_v2_operation]
#[tracing::instrument(skip(state))]
async fn check(
state: web::Data<Arc<Limiter>>,
request: web::Json<CheckAndReportInfo>,
Expand Down Expand Up @@ -118,6 +122,7 @@ async fn check(
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn report(
data: web::Data<Arc<Limiter>>,
request: web::Json<CheckAndReportInfo>,
Expand All @@ -140,6 +145,7 @@ async fn report(
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn check_and_report(
data: web::Data<Arc<Limiter>>,
request: web::Json<CheckAndReportInfo>,
Expand Down
18 changes: 10 additions & 8 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use sysinfo::{RefreshKind, System, SystemExt};
use thiserror::Error;
use tokio::runtime::Handle;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::fmt::format::FmtSpan;

mod envoy_rls;
mod http_api;
Expand Down Expand Up @@ -284,16 +285,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = {
let (config, version) = create_config();
println!("{LIMITADOR_HEADER} {version}");
let builder = if let Some(level) = config.log_level {
tracing_subscriber::fmt().with_max_level(level)
let level = config.log_level.unwrap_or_else(|| {
tracing_subscriber::filter::EnvFilter::from_default_env()
.max_level_hint()
.unwrap_or(LevelFilter::ERROR)
});
let builder = if level >= LevelFilter::DEBUG {
tracing_subscriber::fmt().with_span_events(FmtSpan::CLOSE)
} else {
tracing_subscriber::fmt().with_max_level(
tracing_subscriber::filter::EnvFilter::from_default_env()
.max_level_hint()
.unwrap_or(LevelFilter::ERROR),
)
tracing_subscriber::fmt()
};
builder.init();
builder.with_max_level(level).init();

info!("Version: {}", version);
info!("Using config: {:?}", config);
Expand Down
1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async-trait = "0.1"
cfg-if = "1"
prometheus = "0.13"
lazy_static = "1"
tracing = "0.1.40"

# Optional dependencies
rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] }
Expand Down
7 changes: 7 additions & 0 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@ pub struct RocksDbStorage {
}

impl CounterStorage for RocksDbStorage {
#[tracing::instrument(skip_all)]
fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let key = key_for_counter(counter);
let value = self.insert_or_update(&key, counter, 0)?;
Ok(counter.max_value() >= value.value() + delta)
}

#[tracing::instrument(skip_all)]
fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

#[tracing::instrument(skip_all)]
fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let key = key_for_counter(counter);
self.insert_or_update(&key, counter, delta)?;
Ok(())
}

#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &mut Vec<Counter>,
Expand Down Expand Up @@ -75,6 +79,7 @@ impl CounterStorage for RocksDbStorage {
Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn get_counters(&self, limits: &HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr> {
let mut counters = HashSet::default();
let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect();
Expand Down Expand Up @@ -103,6 +108,7 @@ impl CounterStorage for RocksDbStorage {
Ok(counters)
}

#[tracing::instrument(skip_all)]
fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
let counters = self.get_counters(&limits)?;
for counter in &counters {
Expand All @@ -111,6 +117,7 @@ impl CounterStorage for RocksDbStorage {
Ok(())
}

#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
for entry in self.db.iterator(IteratorMode::Start) {
self.db.delete(entry?.0)?
Expand Down
7 changes: 7 additions & 0 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct InMemoryStorage {
}

impl CounterStorage for InMemoryStorage {
#[tracing::instrument(skip_all)]
fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let limits_by_namespace = self.limits_for_namespace.read().unwrap();

Expand All @@ -35,6 +36,7 @@ impl CounterStorage for InMemoryStorage {
Ok(counter.max_value() >= value + delta)
}

#[tracing::instrument(skip_all)]
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> {
if limit.variables().is_empty() {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
Expand All @@ -47,6 +49,7 @@ impl CounterStorage for InMemoryStorage {
Ok(())
}

#[tracing::instrument(skip_all)]
fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let now = SystemTime::now();
Expand Down Expand Up @@ -90,6 +93,7 @@ impl CounterStorage for InMemoryStorage {
Ok(())
}

#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &mut Vec<Counter>,
Expand Down Expand Up @@ -175,6 +179,7 @@ impl CounterStorage for InMemoryStorage {
Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn get_counters(&self, limits: &HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr> {
let mut res = HashSet::new();

Expand Down Expand Up @@ -215,13 +220,15 @@ impl CounterStorage for InMemoryStorage {
Ok(res)
}

#[tracing::instrument(skip_all)]
fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
for limit in limits {
self.delete_counters_of_limit(&limit);
}
Ok(())
}

#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
self.limits_for_namespace.write().unwrap().clear();
Ok(())
Expand Down
6 changes: 6 additions & 0 deletions limitador/src/storage/infinispan/infinispan_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct InfinispanStorageBuilder {

#[async_trait]
impl AsyncCounterStorage for InfinispanStorage {
#[tracing::instrument(skip_all)]
async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let counter_key = key_for_counter(counter);
let counter_val =
Expand All @@ -41,6 +42,7 @@ impl AsyncCounterStorage for InfinispanStorage {
}
}

#[tracing::instrument(skip_all)]
async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let counter_key = key_for_counter(counter);

Expand All @@ -65,6 +67,7 @@ impl AsyncCounterStorage for InfinispanStorage {
Ok(())
}

#[tracing::instrument(skip_all)]
async fn check_and_update(
&self,
counters: &mut Vec<Counter>,
Expand Down Expand Up @@ -130,6 +133,7 @@ impl AsyncCounterStorage for InfinispanStorage {
Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
async fn get_counters(&self, limits: HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr> {
let mut res = HashSet::new();

Expand Down Expand Up @@ -158,13 +162,15 @@ impl AsyncCounterStorage for InfinispanStorage {
Ok(res)
}

#[tracing::instrument(skip_all)]
async fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
for limit in limits {
self.delete_counters_associated_with_limit(&limit).await?;
}
Ok(())
}

#[tracing::instrument(skip_all)]
async fn clear(&self) -> Result<(), StorageErr> {
let _ = self
.infinispan
Expand Down
6 changes: 6 additions & 0 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct AsyncRedisStorage {

#[async_trait]
impl AsyncCounterStorage for AsyncRedisStorage {
#[tracing::instrument(skip_all)]
async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let mut con = self.conn_manager.clone();

Expand All @@ -42,6 +43,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
}
}

#[tracing::instrument(skip_all)]
async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();

Expand All @@ -57,6 +59,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
Ok(())
}

#[tracing::instrument(skip_all)]
async fn check_and_update(
&self,
counters: &mut Vec<Counter>,
Expand Down Expand Up @@ -110,6 +113,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
async fn get_counters(&self, limits: HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr> {
let mut res = HashSet::new();

Expand Down Expand Up @@ -143,13 +147,15 @@ impl AsyncCounterStorage for AsyncRedisStorage {
Ok(res)
}

#[tracing::instrument(skip_all)]
async fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
for limit in limits {
self.delete_counters_associated_with_limit(&limit).await?;
}
Ok(())
}

#[tracing::instrument(skip_all)]
async fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();
redis::cmd("FLUSHDB").query_async(&mut con).await?;
Expand Down
6 changes: 6 additions & 0 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ pub struct CachedRedisStorage {

#[async_trait]
impl AsyncCounterStorage for CachedRedisStorage {
#[tracing::instrument(skip_all)]
async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
self.async_redis_storage
.is_within_limits(counter, delta)
.await
}

#[tracing::instrument(skip_all)]
async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
self.async_redis_storage
.update_counter(counter, delta)
Expand All @@ -63,6 +65,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
// limits. In order to do so, we'd need to run this whole function
// atomically, but that'd be too slow.
// This function trades accuracy for speed.
#[tracing::instrument(skip_all)]
async fn check_and_update(
&self,
counters: &mut Vec<Counter>,
Expand Down Expand Up @@ -166,14 +169,17 @@ impl AsyncCounterStorage for CachedRedisStorage {
Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
async fn get_counters(&self, limits: HashSet<Limit>) -> Result<HashSet<Counter>, StorageErr> {
self.async_redis_storage.get_counters(limits).await
}

#[tracing::instrument(skip_all)]
async fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
self.async_redis_storage.delete_counters(limits).await
}

#[tracing::instrument(skip_all)]
async fn clear(&self) -> Result<(), StorageErr> {
self.async_redis_storage.clear().await
}
Expand Down
Loading

0 comments on commit c8b8e4d

Please sign in to comment.