diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index 2b959c4c..a57a610d 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -12,6 +12,7 @@ use rocksdb::{ }; use std::collections::{BTreeSet, HashSet}; use std::time::{Duration, SystemTime}; +use tracing::trace_span; pub struct RocksDbStorage { db: DBWithThreadMode, @@ -49,7 +50,12 @@ impl CounterStorage for RocksDbStorage { for counter in &mut *counters { let key = key_for_counter(counter); let slice: &[u8] = key.as_ref(); - let (val, ttl) = match self.db.get(slice)? { + let entry = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + self.db.get(slice)? + }; + let (val, ttl) = match entry { None => (0, Duration::from_secs(counter.limit().seconds())), Some(raw) => { let slice: &[u8] = raw.as_ref(); @@ -84,25 +90,38 @@ impl CounterStorage for RocksDbStorage { let mut counters = HashSet::default(); let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect(); for ns in namepaces { - for entry in self.db.prefix_iterator(prefix_for_namespace(ns)) { - let (key, value) = entry?; - let mut counter = partial_counter_from_counter_key(key.as_ref()); - if counter.namespace().as_ref() != ns { - break; - } - let value: ExpiringValue = value.as_ref().try_into()?; - for limit in limits { - if limit == counter.limit() { - counter.update_to_limit(limit); - let ttl = value.ttl(); - counter.set_expires_in(ttl); - counter.set_remaining(limit.max_value() - value.value()); - break; + let mut iterator = self.db.prefix_iterator(prefix_for_namespace(ns)); + loop { + let option = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + iterator.next() + }; + let next = option; + match next { + None => break, + Some(entry) => { + let (key, value) = entry?; + let mut counter = partial_counter_from_counter_key(key.as_ref()); + if counter.namespace().as_ref() != ns { + break; + } + let value: ExpiringValue = value.as_ref().try_into()?; + for limit in limits { + if limit == counter.limit() { + counter.update_to_limit(limit); + let ttl = value.ttl(); + counter.set_expires_in(ttl); + counter.set_remaining(limit.max_value() - value.value()); + break; + } + } + if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO + { + counters.insert(counter); + } } } - if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO { - counters.insert(counter); - } } } Ok(counters) @@ -112,6 +131,8 @@ impl CounterStorage for RocksDbStorage { fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { let counters = self.get_counters(&limits)?; for counter in &counters { + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db.delete(key_for_counter(counter))?; } Ok(()) @@ -119,7 +140,11 @@ impl CounterStorage for RocksDbStorage { #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { + let span = trace_span!("datastore"); + let _entered = span.enter(); for entry in self.db.iterator(IteratorMode::Start) { + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db.delete(entry?.0)? } Ok(()) @@ -170,7 +195,12 @@ impl RocksDbStorage { delta: i64, ) -> Result { let now = SystemTime::now(); - let value = match self.db.get(key)? { + let entry = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + self.db.get(key)? + }; + let value = match entry { None => ExpiringValue::default(), Some(raw) => { let slice: &[u8] = raw.as_ref(); @@ -180,6 +210,8 @@ impl RocksDbStorage { if value.value_at(now) + delta <= counter.max_value() { let expiring_value = ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds())); + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db .merge(key, >>::into(expiring_value))?; return Ok(value.update(delta, counter.seconds(), now));