From cc0532783ab9ff2302ef819d7c7c8977ff69e801 Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Wed, 25 Jan 2023 18:15:25 +0200 Subject: [PATCH] Added API to read and trim a stream. (#266) * Added API to read and trim a stream. * fix command flags * Apply suggestions from code review Co-authored-by: Guy Korland * Format fixes * Fix complition. * Format fixes * Added reverse option. Co-authored-by: Guy Korland --- Cargo.toml | 4 ++ examples/stream.rs | 46 +++++++++++++++++++++ src/key.rs | 39 +++++++++++++++++- src/lib.rs | 1 + src/stream.rs | 98 ++++++++++++++++++++++++++++++++++++++++++++ tests/integration.rs | 40 ++++++++++++++++++ 6 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 examples/stream.rs create mode 100644 src/stream.rs diff --git a/Cargo.toml b/Cargo.toml index bb164968..03bac1fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,10 @@ required-features = [] name = "scan_keys" crate-type = ["cdylib"] +[[example]] +name = "stream" +crate-type = ["cdylib"] + [dependencies] bitflags = "1.2" libc = "0.2" diff --git a/examples/stream.rs b/examples/stream.rs new file mode 100644 index 00000000..6f13fa32 --- /dev/null +++ b/examples/stream.rs @@ -0,0 +1,46 @@ +#[macro_use] +extern crate redis_module; + +use redis_module::raw::{KeyType, RedisModuleStreamID}; +use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue}; + +fn stream_read_from(ctx: &Context, args: Vec) -> RedisResult { + let mut args = args.into_iter().skip(1); + + let stream_key = args.next_arg()?; + + let stream = ctx.open_key(&stream_key); + let key_type = stream.key_type(); + + if key_type != KeyType::Stream { + return Err(RedisError::WrongType); + } + + let mut iter = stream.get_stream_iterator(false)?; + let element = iter.next(); + let id_to_keep = iter.next().as_ref().map_or_else( + || RedisModuleStreamID { + ms: u64::MAX, + seq: u64::MAX, + }, + |e| e.id, + ); + + let stream = ctx.open_key_writable(&stream_key); + stream.trim_stream_by_id(id_to_keep, false)?; + Ok(match element { + Some(e) => RedisValue::BulkString(format!("{}-{}", e.id.ms, e.id.seq)), + None => RedisValue::Null, + }) +} + +////////////////////////////////////////////////////// + +redis_module! { + name: "stream", + version: 1, + data_types: [], + commands: [ + ["STREAM_POP", stream_read_from, "write", 1, 1, 1], + ], +} diff --git a/src/key.rs b/src/key.rs index ef4e7945..d2a3245e 100644 --- a/src/key.rs +++ b/src/key.rs @@ -12,6 +12,7 @@ use raw::KeyType; use crate::native_types::RedisType; use crate::raw; use crate::redismodule::REDIS_OK; +use crate::stream::StreamIterator; use crate::RedisError; use crate::RedisResult; use crate::RedisString; @@ -32,8 +33,8 @@ pub enum KeyMode { #[derive(Debug)] pub struct RedisKey { - ctx: *mut raw::RedisModuleCtx, - key_inner: *mut raw::RedisModuleKey, + pub(crate) ctx: *mut raw::RedisModuleCtx, + pub(crate) key_inner: *mut raw::RedisModuleKey, } impl RedisKey { @@ -136,6 +137,20 @@ impl RedisKey { }; Ok(val) } + + pub fn get_stream_iterator(&self, reverse: bool) -> Result { + StreamIterator::new(self, None, None, false, reverse) + } + + pub fn get_stream_range_iterator( + &self, + from: Option, + to: Option, + exclusive: bool, + reverse: bool, + ) -> Result { + StreamIterator::new(self, from, to, exclusive, reverse) + } } impl Drop for RedisKey { @@ -351,6 +366,26 @@ impl RedisKeyWritable { status.into() } + + pub fn trim_stream_by_id( + &self, + mut id: raw::RedisModuleStreamID, + approx: bool, + ) -> Result { + let flags = if approx { + raw::REDISMODULE_STREAM_TRIM_APPROX + } else { + 0 + }; + let res = unsafe { + raw::RedisModule_StreamTrimByID.unwrap()(self.key_inner, flags as i32, &mut id) + }; + if res <= 0 { + Err(RedisError::Str("Failed trimming the stream")) + } else { + Ok(res as usize) + } + } } /// Opaque type used to hold multi-get results. Use the provided methods to convert diff --git a/src/lib.rs b/src/lib.rs index ec0907db..ac708498 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod rediserror; mod redismodule; pub mod redisraw; pub mod redisvalue; +pub mod stream; mod context; pub mod key; diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 00000000..a05dbef6 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,98 @@ +use crate::key::RedisKey; +use crate::raw; +use crate::RedisError; +use crate::RedisString; +use crate::Status; +use std::os::raw::c_long; +use std::ptr; + +pub struct StreamRecord { + pub id: raw::RedisModuleStreamID, + pub fields: Vec<(RedisString, RedisString)>, +} + +pub struct StreamIterator<'key> { + key: &'key RedisKey, +} + +impl<'key> StreamIterator<'key> { + pub(crate) fn new( + key: &RedisKey, + mut from: Option, + mut to: Option, + exclusive: bool, + reverse: bool, + ) -> Result { + let mut flags = if exclusive { + raw::REDISMODULE_STREAM_ITERATOR_EXCLUSIVE as i32 + } else { + 0 + }; + + flags |= if reverse { + raw::REDISMODULE_STREAM_ITERATOR_REVERSE as i32 + } else { + 0 + }; + + let res = unsafe { + raw::RedisModule_StreamIteratorStart.unwrap()( + key.key_inner, + flags, + from.as_mut().map_or(ptr::null_mut(), |v| v), + to.as_mut().map_or(ptr::null_mut(), |v| v), + ) + }; + if Status::Ok == res.into() { + Ok(StreamIterator { key }) + } else { + Err(RedisError::Str("Failed creating stream iterator")) + } + } +} + +impl<'key> Iterator for StreamIterator<'key> { + type Item = StreamRecord; + + fn next(&mut self) -> Option { + let mut id = raw::RedisModuleStreamID { ms: 0, seq: 0 }; + let mut num_fields: c_long = 0; + let mut field_name: *mut raw::RedisModuleString = ptr::null_mut(); + let mut field_val: *mut raw::RedisModuleString = ptr::null_mut(); + if Status::Ok + != unsafe { + raw::RedisModule_StreamIteratorNextID.unwrap()( + self.key.key_inner, + &mut id, + &mut num_fields, + ) + } + .into() + { + return None; + } + let mut fields = Vec::new(); + while Status::Ok + == unsafe { + raw::RedisModule_StreamIteratorNextField.unwrap()( + self.key.key_inner, + &mut field_name, + &mut field_val, + ) + .into() + } + { + fields.push(( + RedisString::from_redis_module_string(self.key.ctx, field_name), + RedisString::from_redis_module_string(self.key.ctx, field_val), + )); + } + Some(StreamRecord { id, fields }) + } +} + +impl<'key> Drop for StreamIterator<'key> { + fn drop(&mut self) { + unsafe { raw::RedisModule_StreamIteratorDelete.unwrap()(self.key.key_inner) }; + } +} diff --git a/tests/integration.rs b/tests/integration.rs index 374715e0..aaba0f35 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -194,3 +194,43 @@ fn test_scan() -> Result<()> { Ok(()) } + +#[test] +fn test_stream_reader() -> Result<()> { + let port: u16 = 6487; + let _guards = vec![start_redis_server_with_module("stream", port) + .with_context(|| "failed to start redis server")?]; + let mut con = + get_redis_connection(port).with_context(|| "failed to connect to redis server")?; + + let _: String = redis::cmd("XADD") + .arg(&["s", "1-1", "foo", "bar"]) + .query(&mut con) + .with_context(|| "failed to add data to the stream")?; + + let _: String = redis::cmd("XADD") + .arg(&["s", "1-2", "foo", "bar"]) + .query(&mut con) + .with_context(|| "failed to add data to the stream")?; + + let res: String = redis::cmd("STREAM_POP") + .arg(&["s"]) + .query(&mut con) + .with_context(|| "failed to run keys_pos")?; + assert_eq!(res, "1-1"); + + let res: String = redis::cmd("STREAM_POP") + .arg(&["s"]) + .query(&mut con) + .with_context(|| "failed to run keys_pos")?; + assert_eq!(res, "1-2"); + + let res: usize = redis::cmd("XLEN") + .arg(&["s"]) + .query(&mut con) + .with_context(|| "failed to add data to the stream")?; + + assert_eq!(res, 0); + + Ok(()) +}