Skip to content

Commit

Permalink
feat(services/redis): add support of list operation
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Nov 10, 2024
1 parent 20b7f34 commit e6d6a0b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
services-redb = ["dep:redb", "internal-tokio-rt"]
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp", "dep:ouroboros"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
Expand Down
88 changes: 79 additions & 9 deletions core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
// under the License.

use bb8::RunError;
use futures::Stream;
use futures::StreamExt;
use http::Uri;
use ouroboros::self_referencing;
use redis::cluster::ClusterClient;
use redis::cluster::ClusterClientBuilder;
use redis::AsyncIter;
use redis::Client;
use redis::ConnectionAddr;
use redis::ConnectionInfo;
Expand All @@ -27,6 +31,9 @@ use redis::RedisConnectionInfo;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::OnceCell;

Expand Down Expand Up @@ -291,7 +298,23 @@ impl Debug for Adapter {

impl Adapter {
async fn conn(&self) -> Result<bb8::PooledConnection<'_, RedisConnectionManager>> {
let pool = self
let pool = self.pool().await?;
Adapter::conn_from_pool(pool).await
}

async fn conn_from_pool(
pool: &bb8::Pool<RedisConnectionManager>,
) -> Result<bb8::PooledConnection<RedisConnectionManager>> {
pool.get().await.map_err(|err| match err {
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
}
RunError::User(err) => err,
})
}

async fn pool(&self) -> Result<&bb8::Pool<RedisConnectionManager>> {
Ok(self
.conn
.get_or_try_init(|| async {
bb8::Pool::builder()
Expand All @@ -302,13 +325,7 @@ impl Adapter {
.set_source(err)
})
})
.await?;
pool.get().await.map_err(|err| match err {
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
}
RunError::User(err) => err,
})
.await?)
}

fn get_redis_connection_manager(&self) -> RedisConnectionManager {
Expand All @@ -326,8 +343,43 @@ impl Adapter {
}
}

#[self_referencing]
struct RedisAsyncConnIter<'a> {
conn: bb8::PooledConnection<'a, RedisConnectionManager>,

#[borrows(mut conn)]
#[not_covariant]
iter: AsyncIter<'this, String>,
}

#[self_referencing]
pub struct RedisScanner {
pool: bb8::Pool<RedisConnectionManager>,
path: String,

#[borrows(pool, path)]
#[not_covariant]
inner: RedisAsyncConnIter<'this>,
}

unsafe impl Sync for RedisScanner {}

impl Stream for RedisScanner {
type Item = Result<String>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.with_inner_mut(|s| s.with_iter_mut(|v| v.poll_next_unpin(cx).map(|v| v.map(Ok))))
}
}

impl kv::Scan for RedisScanner {
async fn next(&mut self) -> Result<Option<String>> {
<Self as StreamExt>::next(self).await.transpose()
}
}

impl kv::Adapter for Adapter {
type Scanner = ();
type Scanner = RedisScanner;

fn info(&self) -> kv::Info {
kv::Info::new(
Expand All @@ -336,6 +388,7 @@ impl kv::Adapter for Adapter {
Capability {
read: true,
write: true,
list: true,

..Default::default()
},
Expand Down Expand Up @@ -366,4 +419,21 @@ impl kv::Adapter for Adapter {
conn.append(key, value).await?;
Ok(())
}

async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let pool = self.pool().await?.clone();

Ok(
RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| {
Box::pin(async {
let conn = Adapter::conn_from_pool(pool).await?;
Ok(RedisAsyncConnIter::try_new_async_send(conn, |conn| {
Box::pin(async { conn.scan(path).await })
})
.await?)
})
})
.await?,
)
}
}
13 changes: 13 additions & 0 deletions core/src/services/redis/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use redis::from_redis_value;
use redis::AsyncCommands;
use redis::AsyncIter;
use redis::Client;
use redis::RedisError;

Expand Down Expand Up @@ -105,6 +106,18 @@ impl RedisConnection {
}
Ok(())
}

pub async fn scan(&mut self, prefix: &str) -> crate::Result<AsyncIter<'_, String>> {
let pattern = format!("{}*", prefix);
Ok(match self {
RedisConnection::Normal(ref mut conn) => {
conn.scan_match(pattern).await.map_err(format_redis_error)?
}
RedisConnection::Cluster(ref mut conn) => {
conn.scan_match(pattern).await.map_err(format_redis_error)?
}
})
}
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/redis/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This service can be used to:
- [x] delete
- [x] copy
- [x] rename
- [ ] ~~list~~
- [x] list
- [ ] ~~presign~~
- [ ] blocking

Expand Down

0 comments on commit e6d6a0b

Please sign in to comment.