diff --git a/limitador/src/storage/distributed/cr_counter_value.rs b/limitador/src/storage/distributed/cr_counter_value.rs index ebf0216c..2caaa666 100644 --- a/limitador/src/storage/distributed/cr_counter_value.rs +++ b/limitador/src/storage/distributed/cr_counter_value.rs @@ -133,15 +133,12 @@ impl CrCounterValue { (expiry.into_inner(), map) } - pub fn into_ourselves_inner(self) -> (SystemTime, A, u64) { - let Self { - ourselves, - max_value: _, - value, - others: _, - expiry, - } = self; - (expiry.into_inner(), ourselves, value.into_inner()) + pub fn local_values(&self) -> (SystemTime, &A, u64) { + ( + self.expiry.clone().into_inner(), + &self.ourselves, + self.value.load(Ordering::Relaxed), + ) } fn reset(&self, expiry: SystemTime) { diff --git a/limitador/src/storage/distributed/mod.rs b/limitador/src/storage/distributed/mod.rs index e2ca8c42..00506b24 100644 --- a/limitador/src/storage/distributed/mod.rs +++ b/limitador/src/storage/distributed/mod.rs @@ -251,7 +251,6 @@ impl CrInMemoryStorage { { let limits = limits.clone(); tokio::spawn(async move { - let limits = limits.clone(); while let Some(sender) = re_sync_queue_rx.recv().await { process_re_sync(&limits, sender).await; } @@ -311,11 +310,11 @@ async fn process_re_sync( let update = { let limits = limits.read().unwrap(); limits.get(&key).and_then(|store_value| { - let (expiry, ourself, value) = store_value.clone().into_ourselves_inner(); - if value == 0 { + let (expiry, ourself, value) = store_value.local_values(); + if value == 0 || expiry <= SystemTime::now() { None // no point in sending a counter that is empty } else { - let values = HashMap::from([(ourself, value)]); + let values = HashMap::from([(ourself.clone(), value)]); Some(CounterUpdate { key: key.clone(), values,