Skip to content

Commit

Permalink
Added trace spans to disk's datastore accesses?
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Dec 21, 2023
1 parent 4421564 commit d5ebf71
Showing 1 changed file with 51 additions and 19 deletions.
70 changes: 51 additions & 19 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rocksdb::{
};
use std::collections::{BTreeSet, HashSet};
use std::time::{Duration, SystemTime};
use tracing::trace_span;

pub struct RocksDbStorage {
db: DBWithThreadMode<MultiThreaded>,
Expand Down Expand Up @@ -49,7 +50,12 @@ impl CounterStorage for RocksDbStorage {
for counter in &mut *counters {
let key = key_for_counter(counter);
let slice: &[u8] = key.as_ref();
let (val, ttl) = match self.db.get(slice)? {
let entry = {
let span = trace_span!("datastore");
let _entered = span.enter();
self.db.get(slice)?
};
let (val, ttl) = match entry {
None => (0, Duration::from_secs(counter.limit().seconds())),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
Expand Down Expand Up @@ -84,25 +90,38 @@ impl CounterStorage for RocksDbStorage {
let mut counters = HashSet::default();
let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect();
for ns in namepaces {
for entry in self.db.prefix_iterator(prefix_for_namespace(ns)) {
let (key, value) = entry?;
let mut counter = partial_counter_from_counter_key(key.as_ref());
if counter.namespace().as_ref() != ns {
break;
}
let value: ExpiringValue = value.as_ref().try_into()?;
for limit in limits {
if limit == counter.limit() {
counter.update_to_limit(limit);
let ttl = value.ttl();
counter.set_expires_in(ttl);
counter.set_remaining(limit.max_value() - value.value());
break;
let mut iterator = self.db.prefix_iterator(prefix_for_namespace(ns));
loop {
let option = {
let span = trace_span!("datastore");
let _entered = span.enter();
iterator.next()
};
let next = option;
match next {
None => break,
Some(entry) => {
let (key, value) = entry?;
let mut counter = partial_counter_from_counter_key(key.as_ref());
if counter.namespace().as_ref() != ns {
break;
}
let value: ExpiringValue = value.as_ref().try_into()?;
for limit in limits {
if limit == counter.limit() {
counter.update_to_limit(limit);
let ttl = value.ttl();
counter.set_expires_in(ttl);
counter.set_remaining(limit.max_value() - value.value());
break;
}
}
if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO
{
counters.insert(counter);
}
}
}
if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO {
counters.insert(counter);
}
}
}
Ok(counters)
Expand All @@ -112,14 +131,20 @@ impl CounterStorage for RocksDbStorage {
fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
let counters = self.get_counters(&limits)?;
for counter in &counters {
let span = trace_span!("datastore");
let _entered = span.enter();
self.db.delete(key_for_counter(counter))?;
}
Ok(())
}

#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
let span = trace_span!("datastore");
let _entered = span.enter();
for entry in self.db.iterator(IteratorMode::Start) {
let span = trace_span!("datastore");
let _entered = span.enter();
self.db.delete(entry?.0)?
}
Ok(())
Expand Down Expand Up @@ -170,7 +195,12 @@ impl RocksDbStorage {
delta: i64,
) -> Result<ExpiringValue, StorageErr> {
let now = SystemTime::now();
let value = match self.db.get(key)? {
let entry = {
let span = trace_span!("datastore");
let _entered = span.enter();
self.db.get(key)?
};
let value = match entry {
None => ExpiringValue::default(),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
Expand All @@ -180,6 +210,8 @@ impl RocksDbStorage {
if value.value_at(now) + delta <= counter.max_value() {
let expiring_value =
ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds()));
let span = trace_span!("datastore");
let _entered = span.enter();
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
Expand Down

0 comments on commit d5ebf71

Please sign in to comment.