Skip to content

Commit

Permalink
fix(multitenancy): add a fallback for get commands in redis (#7043)
Browse files Browse the repository at this point in the history
  • Loading branch information
dracarys18 authored Jan 22, 2025
1 parent d53ade2 commit 1d76f2c
Show file tree
Hide file tree
Showing 34 changed files with 362 additions and 204 deletions.
10 changes: 5 additions & 5 deletions crates/drainer/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,29 @@ impl HealthCheckInterface for Store {
let redis_conn = self.redis_conn.clone();

redis_conn
.serialize_and_set_key_with_expiry("test_key", "test_value", 30)
.serialize_and_set_key_with_expiry(&"test_key".into(), "test_value", 30)
.await
.change_context(HealthCheckRedisError::SetFailed)?;

logger::debug!("Redis set_key was successful");

redis_conn
.get_key::<()>("test_key")
.get_key::<()>(&"test_key".into())
.await
.change_context(HealthCheckRedisError::GetFailed)?;

logger::debug!("Redis get_key was successful");

redis_conn
.delete_key("test_key")
.delete_key(&"test_key".into())
.await
.change_context(HealthCheckRedisError::DeleteFailed)?;

logger::debug!("Redis delete_key was successful");

redis_conn
.stream_append_entry(
TEST_STREAM_NAME,
&TEST_STREAM_NAME.into(),
&redis_interface::RedisEntryId::AutoGeneratedID,
TEST_STREAM_DATA.to_vec(),
)
Expand Down Expand Up @@ -216,7 +216,7 @@ impl HealthCheckInterface for Store {

redis_conn
.stream_trim_entries(
TEST_STREAM_NAME,
&TEST_STREAM_NAME.into(),
(
redis_interface::StreamCapKind::MinID,
redis_interface::StreamCapTrim::Exact,
Expand Down
8 changes: 4 additions & 4 deletions crates/drainer/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Store {

match self
.redis_conn
.set_key_if_not_exists_with_expiry(stream_key_flag.as_str(), true, None)
.set_key_if_not_exists_with_expiry(&stream_key_flag.as_str().into(), true, None)
.await
{
Ok(resp) => resp == redis::types::SetnxReply::KeySet,
Expand All @@ -43,7 +43,7 @@ impl Store {
}

pub async fn make_stream_available(&self, stream_name_flag: &str) -> errors::DrainerResult<()> {
match self.redis_conn.delete_key(stream_name_flag).await {
match self.redis_conn.delete_key(&stream_name_flag.into()).await {
Ok(redis::DelReply::KeyDeleted) => Ok(()),
Ok(redis::DelReply::KeyNotDeleted) => {
logger::error!("Tried to unlock a stream which is already unlocked");
Expand Down Expand Up @@ -87,14 +87,14 @@ impl Store {
common_utils::date_time::time_it::<errors::DrainerResult<_>, _, _>(|| async {
let trim_result = self
.redis_conn
.stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id))
.stream_trim_entries(&stream_name.into(), (trim_kind, trim_type, trim_id))
.await
.map_err(errors::DrainerError::from)?;

// Since xtrim deletes entries below given id excluding the given id.
// Hence, deleting the minimum entry id
self.redis_conn
.stream_delete_entries(stream_name, minimum_entry_id)
.stream_delete_entries(&stream_name.into(), minimum_entry_id)
.await
.map_err(errors::DrainerError::from)?;

Expand Down
3 changes: 3 additions & 0 deletions crates/redis_interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ rust-version.workspace = true
readme = "README.md"
license.workspace = true

[features]
multitenancy_fallback = []

[dependencies]
error-stack = "0.4.1"
fred = { version = "7.1.2", features = ["metrics", "partial-tracing", "subscriber-client", "check-unresponsive"] }
Expand Down
Loading

0 comments on commit 1d76f2c

Please sign in to comment.