Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce proper key'ing #397

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,50 @@ impl CounterStorage for RocksDbStorage {

#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &[Counter],
delta: u64,
) -> Result<Authorization, StorageErr> {
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());

for counter in counters {
let key = key_for_counter(counter);
let slice: &[u8] = key.as_ref();
let entry = {
let span = debug_span!("datastore");
let _entered = span.enter();
self.db.get(slice)?
};
let (val, _) = match entry {
None => (0, Duration::from_secs(counter.limit().seconds())),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
let value: ExpiringValue = slice.try_into()?;
(value.value(), value.ttl())
}
};

if counter.max_value() < val + delta {
return Ok(Authorization::Limited(
counter.limit().name().map(|n| n.to_string()),
));
}

keys.push(key);
}

for (idx, counter) in counters.iter().enumerate() {
self.insert_or_update(&keys[idx], counter, delta)?;
}

Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());

Expand All @@ -66,15 +106,13 @@ impl CounterStorage for RocksDbStorage {
}
};

if load_counters {
counter.set_expires_in(ttl);
counter.set_remaining(
counter
.max_value()
.checked_sub(val + delta)
.unwrap_or_default(),
);
}
counter.set_expires_in(ttl);
counter.set_remaining(
counter
.max_value()
.checked_sub(val + delta)
.unwrap_or_default(),
);

if counter.max_value() < val + delta {
return Ok(Authorization::Limited(
Expand Down
100 changes: 80 additions & 20 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,14 @@ impl CounterStorage for CrInMemoryStorage {
#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &mut Vec<Counter>,
counters: &[Counter],
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut first_limited = None;
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
let now = SystemTime::now();

let mut process_counter =
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
if load_counters {
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or(0));
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
}
let process_counter =
|counter: &Counter, value: u64, delta: u64| -> Option<Authorization> {
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
Expand All @@ -118,7 +107,7 @@ impl CounterStorage for CrInMemoryStorage {
};

// Process simple counters
for counter in counters.iter_mut() {
for counter in counters.iter() {
let key = encode_counter_to_key(counter);

// most of the time the counter should exist, so first try with a read only lock
Expand All @@ -132,9 +121,7 @@ impl CounterStorage for CrInMemoryStorage {
if let Some(limited) =
process_counter(counter, store_value.value.read(), delta)
{
if !load_counters {
return Ok(limited);
}
return Ok(limited);
}
counter_values_to_update.push(key);
true
Expand All @@ -157,10 +144,83 @@ impl CounterStorage for CrInMemoryStorage {
}));

if let Some(limited) = process_counter(counter, store_value.value.read(), delta) {
if !load_counters {
return Ok(limited);
return Ok(limited);
}
counter_values_to_update.push(key);
}
}

// Update counters
let limits = self.limits.read().unwrap();
counter_values_to_update.into_iter().for_each(|key| {
let store_value = limits.get(&key).unwrap();
self.increment_counter(store_value.clone(), delta, now);
});

Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
) -> Result<Authorization, StorageErr> {
let mut first_limited = None;
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
let now = SystemTime::now();

let mut process_counter =
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or(0));
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
None
};

// Process simple counters
for counter in counters.iter_mut() {
let key = encode_counter_to_key(counter);

// most of the time the counter should exist, so first try with a read only lock
// since that will allow us to have higher concurrency
let counter_existed = {
let key = key.clone();
let limits = self.limits.read().unwrap();
match limits.get(&key) {
None => false,
Some(store_value) => {
let _ = process_counter(counter, store_value.value.read(), delta);
counter_values_to_update.push(key);
true
}
}
};

// we need to take the slow path since we need to mutate the limits map.
if !counter_existed {
// try again with a write lock to create the counter if it's still missing.
let mut limits = self.limits.write().unwrap();
let store_value = limits.entry(key.clone()).or_insert(Arc::new(CounterEntry {
key: key.clone(),
counter: counter.clone(),
value: CrCounterValue::new(
self.identifier.clone(),
counter.max_value(),
counter.window(),
),
}));

let _ = process_counter(counter, store_value.value.read(), delta);
counter_values_to_update.push(key);
}
}
Expand Down
90 changes: 70 additions & 20 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,71 @@ impl CounterStorage for InMemoryStorage {

#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &[Counter],
delta: u64,
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.simple_limits.read().unwrap();
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, Duration)> =
Vec::new();
let now = SystemTime::now();

let process_counter =
|counter: &Counter, value: u64, delta: u64| -> Option<Authorization> {
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
None
};

// Process simple counters
for counter in counters.iter().filter(|c| !c.is_qualified()) {
let atomic_expiring_value: &AtomicExpiringValue =
limits_by_namespace.get(counter.limit()).unwrap();

if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) {
return Ok(limited);
}
counter_values_to_update.push((atomic_expiring_value, counter.window()));
}

// Process qualified counters
for counter in counters.iter().filter(|c| c.is_qualified()) {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with_by_ref(counter, || {
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};

if let Some(limited) = process_counter(counter, value.value(), delta) {
return Ok(limited);
}

qualified_counter_values_to_updated.push((value, counter.window()));
}

// Update counters
counter_values_to_update.iter().for_each(|(v, ttl)| {
v.update(delta, *ttl, now);
});
qualified_counter_values_to_updated
.iter()
.for_each(|(v, ttl)| {
v.update(delta, *ttl, now);
});

Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.simple_limits.read().unwrap();
let mut first_limited = None;
Expand All @@ -83,14 +144,12 @@ impl CounterStorage for InMemoryStorage {

let mut process_counter =
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
if load_counters {
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or_default());
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or_default());
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
Expand All @@ -105,11 +164,7 @@ impl CounterStorage for InMemoryStorage {
let atomic_expiring_value: &AtomicExpiringValue =
limits_by_namespace.get(counter.limit()).unwrap();

if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) {
if !load_counters {
return Ok(limited);
}
}
let _ = process_counter(counter, atomic_expiring_value.value(), delta);
counter_values_to_update.push((atomic_expiring_value, counter.window()));
}

Expand All @@ -122,12 +177,7 @@ impl CounterStorage for InMemoryStorage {
Some(counter) => counter,
};

if let Some(limited) = process_counter(counter, value.value(), delta) {
if !load_counters {
return Ok(limited);
}
}

let _ = process_counter(counter, value.value(), delta);
qualified_counter_values_to_updated.push((value, counter.window()));
}

Expand Down
13 changes: 10 additions & 3 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ impl Storage {
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
self.counters
.check_and_update(counters, delta, load_counters)
if load_counters {
self.counters.check_and_update_loading(counters, delta)
} else {
self.counters.check_and_update(counters, delta)
}
}

pub fn get_counters(&self, namespace: &Namespace) -> Result<HashSet<Counter>, StorageErr> {
Expand Down Expand Up @@ -281,10 +284,14 @@ pub trait CounterStorage: Sync + Send {
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr>;
fn update_counter(&self, counter: &Counter, delta: u64) -> Result<(), StorageErr>;
fn check_and_update(
&self,
counters: &[Counter],
delta: u64,
) -> Result<Authorization, StorageErr>;
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr>;
fn get_counters(&self, limits: &HashSet<Arc<Limit>>) -> Result<HashSet<Counter>, StorageErr>; // todo revise typing here?
fn delete_counters(&self, limits: &HashSet<Arc<Limit>>) -> Result<(), StorageErr>; // todo revise typing here?
Expand Down
Loading
Loading