Skip to content

Commit

Permalink
Update Redis crate and use pipeline to update counters
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Snaps <[email protected]>
  • Loading branch information
alexsnaps committed Oct 8, 2024
1 parent 9cb6336 commit b1d2651
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 34 deletions.
36 changes: 28 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ metrics = "0.22.3"

# Optional dependencies
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.25", optional = true, features = [
redis = { version = "0.27", optional = true, features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
"tokio-native-tls-comp",
"script",
] }
r2d2 = { version = "0.8", optional = true }
tokio = { version = "1", optional = true, features = [
Expand All @@ -62,8 +63,8 @@ time = "0.3.36"
[dev-dependencies]
serial_test = "3.0"
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
redis-test = { version = "0.4.0", features = ["aio"] }
redis = { version = "0.25", features = [
redis-test = { version = "0.6.0", features = ["aio"] }
redis = { version = "0.27", features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
Expand Down
31 changes: 19 additions & 12 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, ()>(&mut con)
.invoke_async::<_>(&mut con)
.instrument(info_span!("datastore"))
.await?;

Expand Down Expand Up @@ -112,18 +112,25 @@ impl AsyncCounterStorage for AsyncRedisStorage {
}
}

// TODO: this can be optimized by using pipelines with multiple updates
for (counter_idx, key) in counter_keys.into_iter().enumerate() {
let script = redis::Script::new(SCRIPT_UPDATE_COUNTER);
let mut pipeline = redis::pipe();
let mut pipeline = &mut pipeline;
for (counter_idx, key) in counter_keys.iter().enumerate() {
let counter = &counters[counter_idx];
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.instrument(info_span!("datastore"))
.await?
pipeline = pipeline
.invoke_script(
script
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta),
)
.ignore();
}
pipeline
.query_async(&mut con)
.instrument(info_span!("datastore"))
.await?;

Ok(Authorization::Ok)
}
Expand Down Expand Up @@ -191,7 +198,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
async fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();
redis::cmd("FLUSHDB")
.query_async::<_, ()>(&mut con)
.query_async::<_>(&mut con)
.instrument(info_span!("datastore"))
.await?;
Ok(())
Expand Down
18 changes: 8 additions & 10 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::storage::redis::{
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::gauge;
use redis::aio::{ConnectionLike, ConnectionManager};
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig};
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
Expand Down Expand Up @@ -170,15 +170,13 @@ impl CachedRedisStorage {
response_timeout: Duration,
) -> Result<Self, RedisError> {
let info = ConnectionInfo::from_str(redis_url)?;
let redis_conn_manager = ConnectionManager::new_with_backoff_and_timeouts(
let redis_conn_manager = ConnectionManager::new_with_config(
redis::Client::open(info)
.expect("This couldn't fail in the past, yet now it did somehow!"),
2,
100,
1,
response_timeout,
// TLS handshake might result in an additional 2 RTTs to Redis, adding some headroom as well
(response_timeout * 3) + Duration::from_millis(50),
ConnectionManagerConfig::default()
.set_connection_timeout((response_timeout * 3) + Duration::from_millis(50))
.set_response_timeout(response_timeout)
.set_number_of_retries(1),
)
.await?;

Expand Down Expand Up @@ -456,7 +454,7 @@ mod tests {
counters_and_deltas.insert(counter.clone(), arc);

let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1));
let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(NEW_VALUE_FROM_REDIS as i64),
Value::Int(
one_sec_from_now
Expand Down Expand Up @@ -510,7 +508,7 @@ mod tests {
Default::default(),
);

let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(8),
Value::Int(
SystemTime::now()
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CounterStorage for RedisStorage {
#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_pool.get()?;
redis::cmd("FLUSHDB").execute(&mut *con);
redis::cmd("FLUSHDB").exec(&mut *con)?;
Ok(())
}
}
Expand Down

0 comments on commit b1d2651

Please sign in to comment.