Skip to content

Commit

Permalink
feat(sink): support redis cluster url (#16034) (#16227)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Apr 10, 2024
1 parent 6705db9 commit 3ac8791
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 15 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7000.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7000

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7000.conf"
9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7001.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7001

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7001.conf"
9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7002.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7002

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7002.conf"
20 changes: 20 additions & 0 deletions ci/scripts/e2e-redis-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,25 @@ else
exit 1
fi

echo "--- testing cluster sinks"
redis-server ./ci/redis-conf/redis-7000.conf --daemonize yes
redis-server ./ci/redis-conf/redis-7001.conf --daemonize yes
redis-server ./ci/redis-conf/redis-7002.conf --daemonize yes

echo "yes" | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002

sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt'

redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt

line_count=$(wc -l < query_result_1.txt)
if [ "$line_count" -eq 4 ]; then
echo "Redis sink check passed"
else
cat ./query_result_1.txt
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
cargo make ci-kill
35 changes: 35 additions & 0 deletions e2e_test/sink/redis_cluster_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s61
FROM
mv6 WITH (
primary_key = 'v1',
connector = 'redis',
redis.url= '["redis://127.0.0.1:7000/","redis://127.0.0.1:7001/","redis://127.0.0.1:7002/"]',
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

statement ok
INSERT INTO t6 VALUES (1, 1);

statement ok
INSERT INTO t6 VALUES (2, 2);

statement ok
INSERT INTO t6 VALUES (3, 3);

statement ok
FLUSH;

statement ok
DROP SINK s61;

statement ok
DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;
6 changes: 1 addition & 5 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redis = { version = "0.25", features = [
"aio",
"tokio-comp",
"async-std-comp",
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] }
regex = "1.4"
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
Expand Down
85 changes: 76 additions & 9 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use std::collections::{HashMap, HashSet};

use anyhow::anyhow;
use async_trait::async_trait;
use redis::aio::MultiplexedConnection;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use redis::{Client as RedisClient, Pipeline};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use with_options::WithOptions;

Expand All @@ -46,11 +49,76 @@ pub struct RedisCommon {
#[serde(rename = "redis.url")]
pub url: String,
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
}

impl ConnectionLike for RedisConn {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
RedisConn::Cluster(conn) => conn.req_packed_command(cmd),
RedisConn::Single(conn) => conn.req_packed_command(cmd),
}
}

fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count),
}
}

fn get_db(&self) -> i64 {
match self {
RedisConn::Cluster(conn) => conn.get_db(),
RedisConn::Single(conn) => conn.get_db(),
}
}
}

impl RedisCommon {
pub(crate) fn build_client(&self) -> ConnectorResult<RedisClient> {
let client = RedisClient::open(self.url.clone())?;
Ok(client)
pub async fn build_conn(&self) -> ConnectorResult<RedisConn> {
match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
Ok(v) => {
if let Value::Array(list) = v {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(SinkError::Redis(
"redis.url must be array of string".to_string(),
)
.into())
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
} else {
Err(SinkError::Redis("redis.url must be array or string".to_string()).into())
}
}
Err(_) => {
let client = RedisClient::open(self.url.clone())?;
Ok(RedisConn::Single(
client.get_multiplexed_async_connection().await?,
))
}
}
}
}
#[serde_as]
Expand Down Expand Up @@ -123,8 +191,7 @@ impl Sink for RedisSink {
}

async fn validate(&self) -> Result<()> {
let client = self.config.common.build_client()?;
client.get_connection()?;
let _conn = self.config.common.build_conn().await?;
let all_set: HashSet<String> = self
.schema
.fields()
Expand Down Expand Up @@ -170,14 +237,14 @@ pub struct RedisSinkWriter {

struct RedisSinkPayloadWriter {
// connection to redis, one per executor
conn: Option<MultiplexedConnection>,
conn: Option<RedisConn>,
// the command pipeline for write-commit
pipe: Pipeline,
}
impl RedisSinkPayloadWriter {
pub async fn new(config: RedisConfig) -> Result<Self> {
let client = config.common.build_client()?;
let conn = Some(client.get_multiplexed_async_connection().await?);
let conn = config.common.build_conn().await?;
let conn = Some(conn);
let pipe = redis::pipe();

Ok(Self { conn, pipe })
Expand Down
2 changes: 1 addition & 1 deletion src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ quote = { version = "1" }
rand = { version = "0.8", features = ["small_rng"] }
rand_chacha = { version = "0.3" }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
redis = { version = "0.25", features = ["async-std-comp", "tokio-comp"] }
redis = { version = "0.25", features = ["async-std-comp", "cluster-async", "tokio-comp"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] }
regex-syntax = { version = "0.8" }
Expand Down

0 comments on commit 3ac8791

Please sign in to comment.