Skip to content

Commit

Permalink
Allow lock Redis from global detached context. (#350)
Browse files Browse the repository at this point in the history
Allow lock Redis from global detached context.

Sometimes we need to perfrom operation on Redis from a background thread, for this we need to lock Redis.
We can use `ThreadSafeContext` but we might prefer not to for the following reasons:

1. Creating a `ThreadSafeContext` is costly.
2. `ThreadSafeContext` which is not attached to a client do not have the module pointer and this could cause some operations to fail.

The PR adds the ability to lock Redis using the global detached context. After locking, we will get `DetachedContextGuard` object
which will automatically unlock Redis when dispose. `DetachedContextGuard` implements `Deref<Context>` so it can be used just like
a regular `Context` to perform operations.

**Notice: This context should not be use to return any replies!!!**

Future improvement is to seperate contexts for command invocation and replies so those can not be accidently misstaken, notice that this PR do not introduce any regression regarding this topic because we already have this issue with `ThreadSafeContext`.
  • Loading branch information
MeirShpilraien authored Jun 20, 2023
1 parent 732b2bc commit 9fe7eac
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 9 deletions.
48 changes: 39 additions & 9 deletions examples/call.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use redis_module::{
redis_module, CallOptionResp, CallOptionsBuilder, CallReply, CallResult, Context,
PromiseCallReply, RedisError, RedisResult, RedisString, RedisValue, ThreadSafeContext,
redis_module, BlockedClient, CallOptionResp, CallOptionsBuilder, CallReply, CallResult,
Context, FutureCallReply, PromiseCallReply, RedisError, RedisResult, RedisString, RedisValue,
ThreadSafeContext,
};

use std::thread;

fn call_test(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
let res: String = ctx.call("ECHO", &["TEST"])?.try_into()?;
if "TEST" != &res {
Expand Down Expand Up @@ -110,23 +113,49 @@ fn call_test(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
Ok("pass".into())
}

fn call_blocking(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
fn call_blocking_internal(ctx: &Context) -> PromiseCallReply {
let call_options = CallOptionsBuilder::new().build_blocking();
let res = ctx.call_blocking("blpop", &call_options, &["list", "1"]);
ctx.call_blocking("blpop", &call_options, &["list", "1"])
}

fn call_blocking_handle_future(ctx: &Context, f: FutureCallReply, blocked_client: BlockedClient) {
let future_handler = f.set_unblock_handler(move |_ctx, reply| {
let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client);
thread_ctx.reply(reply.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())));
});
future_handler.dispose(ctx);
}

fn call_blocking(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
let res = call_blocking_internal(ctx);
match res {
PromiseCallReply::Resolved(r) => r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())),
PromiseCallReply::Future(f) => {
let blocked_client = ctx.block_client();
let future_handler = f.set_unblock_handler(move |_ctx, reply| {
let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client);
thread_ctx.reply(reply.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())));
});
future_handler.dispose(ctx);
call_blocking_handle_future(ctx, f, blocked_client);
Ok(RedisValue::NoReply)
}
}
}

fn call_blocking_from_detach_ctx(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
let blocked_client = ctx.block_client();
thread::spawn(move || {
let ctx_guard = redis_module::MODULE_CONTEXT.lock();
let res = call_blocking_internal(&ctx_guard);
match res {
PromiseCallReply::Resolved(r) => {
let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client);
thread_ctx.reply(r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())));
}
PromiseCallReply::Future(f) => {
call_blocking_handle_future(&ctx_guard, f, blocked_client);
}
}
});
Ok(RedisValue::NoReply)
}

//////////////////////////////////////////////////////

redis_module! {
Expand All @@ -137,5 +166,6 @@ redis_module! {
commands: [
["call.test", call_test, "", 0, 0, 0],
["call.blocking", call_blocking, "", 0, 0, 0],
["call.blocking_from_detached_ctx", call_blocking_from_detach_ctx, "", 0, 0, 0],
],
}
40 changes: 40 additions & 0 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::raw::{ModuleOptions, Version};
use crate::redisvalue::RedisValueKey;
use crate::{add_info_field_long_long, add_info_field_str, raw, utils, Status};
use crate::{RedisError, RedisResult, RedisString, RedisValue};
use std::ops::Deref;

use std::ffi::CStr;

Expand Down Expand Up @@ -159,6 +160,34 @@ impl Default for DetachedContext {
}
}

/// This object is returned after locking Redis from [DetachedContext].
/// On dispose, Redis will be unlocked.
/// This object implements [Deref] for [Context] so it can be used
/// just like any Redis [Context] for command invocation.
/// **This object should not be used to return replies** because there is
/// no real client behind this context to return replies to.
pub struct DetachedContextGuard {
pub(crate) ctx: Context,
}

unsafe impl RedisLockIndicator for DetachedContextGuard {}

impl Drop for DetachedContextGuard {
fn drop(&mut self) {
unsafe {
raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
};
}
}

impl Deref for DetachedContextGuard {
type Target = Context;

fn deref(&self) -> &Self::Target {
&self.ctx
}
}

impl DetachedContext {
pub fn log(&self, level: RedisLogLevel, message: &str) {
let c = self.ctx.load(Ordering::Relaxed);
Expand Down Expand Up @@ -190,6 +219,17 @@ impl DetachedContext {
self.ctx.store(ctx, Ordering::Relaxed);
Ok(())
}

/// Lock Redis for command invocation. Returns [DetachedContextGuard] which will unlock Redis when dispose.
/// [DetachedContextGuard] implements [Deref<Target = Context>] so it can be used just like any Redis [Context] for command invocation.
/// Locking Redis when Redis is already locked by the current thread is left unspecified.
/// However, this function will not return on the second call (it might panic or deadlock, for example)..
pub fn lock(&self) -> DetachedContextGuard {
let c = self.ctx.load(Ordering::Relaxed);
unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
let ctx = Context::new(c);
DetachedContextGuard { ctx }
}
}

unsafe impl Send for DetachedContext {}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ pub use crate::raw::NotifyEvent;

pub use crate::configuration::ConfigurationValue;
pub use crate::configuration::EnumConfigurationValue;
pub use crate::context::call_reply::FutureCallReply;
pub use crate::context::call_reply::{CallReply, CallResult, ErrorReply, PromiseCallReply};
pub use crate::context::commands;
pub use crate::context::keys_cursor::KeysCursor;
pub use crate::context::server_events;
pub use crate::context::AclPermissions;
#[cfg(feature = "min-redis-compatibility-version-7-2")]
pub use crate::context::BlockingCallOptions;
pub use crate::context::CallOptionResp;
pub use crate::context::CallOptions;
pub use crate::context::CallOptionsBuilder;
pub use crate::context::Context;
pub use crate::context::ContextFlags;
pub use crate::context::DetachedContext;
pub use crate::context::DetachedContextGuard;
pub use crate::raw::*;
pub use crate::redismodule::*;
use backtrace::Backtrace;
Expand Down
6 changes: 6 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,5 +600,11 @@ fn test_call_blocking() -> Result<()> {

assert_eq!(res, None);

let res: Option<String> = redis::cmd("call.blocking_from_detached_ctx")
.query(&mut con)
.with_context(|| "failed to run string.set")?;

assert_eq!(res, None);

Ok(())
}

0 comments on commit 9fe7eac

Please sign in to comment.