Skip to content

Commit

Permalink
feat(sink): support redis cluster url (#16034)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Apr 9, 2024
1 parent 6fbd81c commit bdcc939
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 10 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7000.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7000

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7000.conf"
9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7001.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7001

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7001.conf"
9 changes: 9 additions & 0 deletions ci/redis-conf/redis-7002.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port 7002

#enable cluster mode
cluster-enabled yes

#ms
cluster-node-timeout 15000

cluster-config-file "nodes-7002.conf"
20 changes: 20 additions & 0 deletions ci/scripts/e2e-redis-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,25 @@ else
exit 1
fi

echo "--- testing cluster sinks"
redis-server ./ci/redis-conf/redis-7000.conf --daemonize yes
redis-server ./ci/redis-conf/redis-7001.conf --daemonize yes
redis-server ./ci/redis-conf/redis-7002.conf --daemonize yes

echo "yes" | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002

sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt'

redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt

line_count=$(wc -l < query_result_1.txt)
if [ "$line_count" -eq 4 ]; then
echo "Redis sink check passed"
else
cat ./query_result_1.txt
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
risedev ci-kill
35 changes: 35 additions & 0 deletions e2e_test/sink/redis_cluster_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s61
FROM
mv6 WITH (
primary_key = 'v1',
connector = 'redis',
redis.url= '["redis://127.0.0.1:7000/","redis://127.0.0.1:7001/","redis://127.0.0.1:7002/"]',
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

statement ok
INSERT INTO t6 VALUES (1, 1);

statement ok
INSERT INTO t6 VALUES (2, 2);

statement ok
INSERT INTO t6 VALUES (3, 3);

statement ok
FLUSH;

statement ok
DROP SINK s61;

statement ok
DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] }
regex = "1.4"
reqwest = { version = "0.12.2", features = ["json"] }
risingwave_common = { workspace = true }
Expand Down
85 changes: 76 additions & 9 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use std::collections::{HashMap, HashSet};

use anyhow::anyhow;
use async_trait::async_trait;
use redis::aio::MultiplexedConnection;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use redis::{Client as RedisClient, Pipeline};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use with_options::WithOptions;

Expand All @@ -46,11 +49,76 @@ pub struct RedisCommon {
#[serde(rename = "redis.url")]
pub url: String,
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
}

impl ConnectionLike for RedisConn {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
RedisConn::Cluster(conn) => conn.req_packed_command(cmd),
RedisConn::Single(conn) => conn.req_packed_command(cmd),
}
}

fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count),
}
}

fn get_db(&self) -> i64 {
match self {
RedisConn::Cluster(conn) => conn.get_db(),
RedisConn::Single(conn) => conn.get_db(),
}
}
}

impl RedisCommon {
pub(crate) fn build_client(&self) -> ConnectorResult<RedisClient> {
let client = RedisClient::open(self.url.clone())?;
Ok(client)
pub async fn build_conn(&self) -> ConnectorResult<RedisConn> {
match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
Ok(v) => {
if let Value::Array(list) = v {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(SinkError::Redis(
"redis.url must be array of string".to_string(),
)
.into())
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
} else {
Err(SinkError::Redis("redis.url must be array or string".to_string()).into())
}
}
Err(_) => {
let client = RedisClient::open(self.url.clone())?;
Ok(RedisConn::Single(
client.get_multiplexed_async_connection().await?,
))
}
}
}
}
#[serde_as]
Expand Down Expand Up @@ -123,8 +191,7 @@ impl Sink for RedisSink {
}

async fn validate(&self) -> Result<()> {
let client = self.config.common.build_client()?;
client.get_connection()?;
let _conn = self.config.common.build_conn().await?;
let all_set: HashSet<String> = self
.schema
.fields()
Expand Down Expand Up @@ -170,14 +237,14 @@ pub struct RedisSinkWriter {

struct RedisSinkPayloadWriter {
// connection to redis, one per executor
conn: Option<MultiplexedConnection>,
conn: Option<RedisConn>,
// the command pipeline for write-commit
pipe: Pipeline,
}
impl RedisSinkPayloadWriter {
pub async fn new(config: RedisConfig) -> Result<Self> {
let client = config.common.build_client()?;
let conn = Some(client.get_multiplexed_async_connection().await?);
let conn = config.common.build_conn().await?;
let conn = Some(conn);
let pipe = redis::pipe();

Ok(Self { conn, pipe })
Expand Down

0 comments on commit bdcc939

Please sign in to comment.