Skip to content

Commit

Permalink
fix(Sink): fix redis sink Cluster error (#16405)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Apr 30, 2024
1 parent 6726ba7 commit 201f903
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 68 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-redis-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt'
redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt

line_count=$(wc -l < query_result_1.txt)
if [ "$line_count" -eq 4 ]; then
if [ "$line_count" -eq 16 ]; then
echo "Redis sink check passed"
else
cat ./query_result_1.txt
Expand Down
8 changes: 1 addition & 7 deletions e2e_test/sink/redis_cluster_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@ FROM
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

statement ok
INSERT INTO t6 VALUES (1, 1);

statement ok
INSERT INTO t6 VALUES (2, 2);

statement ok
INSERT INTO t6 VALUES (3, 3);
INSERT INTO t6 VALUES (1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10),(11, 11),(12, 12),(13, 13),(14, 14),(15, 15);

statement ok
FLUSH;
Expand Down
141 changes: 81 additions & 60 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet};

use anyhow::anyhow;
use async_trait::async_trait;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use redis::aio::MultiplexedConnection;
use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
use redis::{Client as RedisClient, Pipeline};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -49,46 +48,62 @@ pub struct RedisCommon {
#[serde(rename = "redis.url")]
pub url: String,
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
pub enum RedisPipe {
Cluster(ClusterPipeline),
Single(Pipeline),
}
impl RedisPipe {
pub async fn query<T: redis::FromRedisValue>(
&self,
conn: &mut RedisConn,
) -> ConnectorResult<T> {
match (self, conn) {
(RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
(RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
Ok(pipe.query_async(conn).await?)
}
_ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_string()).into()),
}
}

impl ConnectionLike for RedisConn {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
pub fn clear(&mut self) {
match self {
RedisConn::Cluster(conn) => conn.req_packed_command(cmd),
RedisConn::Single(conn) => conn.req_packed_command(cmd),
RedisPipe::Cluster(pipe) => pipe.clear(),
RedisPipe::Single(pipe) => pipe.clear(),
}
}

fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
pub fn set(&mut self, k: String, v: Vec<u8>) {
match self {
RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count),
}
RedisPipe::Cluster(pipe) => {
pipe.set(k, v);
}
RedisPipe::Single(pipe) => {
pipe.set(k, v);
}
};
}

fn get_db(&self) -> i64 {
pub fn del(&mut self, k: String) {
match self {
RedisConn::Cluster(conn) => conn.get_db(),
RedisConn::Single(conn) => conn.get_db(),
}
RedisPipe::Cluster(pipe) => {
pipe.del(k);
}
RedisPipe::Single(pipe) => {
pipe.del(k);
}
};
}
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
}

impl RedisCommon {
pub async fn build_conn(&self) -> ConnectorResult<RedisConn> {
pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
Ok(v) => {
if let Value::Array(list) = v {
Expand All @@ -107,15 +122,19 @@ impl RedisCommon {
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
Ok((
RedisConn::Cluster(client.get_connection()?),
RedisPipe::Cluster(redis::cluster::cluster_pipe()),
))
} else {
Err(SinkError::Redis("redis.url must be array or string".to_string()).into())
}
}
Err(_) => {
let client = RedisClient::open(self.url.clone())?;
Ok(RedisConn::Single(
client.get_multiplexed_async_connection().await?,
Ok((
RedisConn::Single(client.get_multiplexed_async_connection().await?),
RedisPipe::Single(redis::pipe()),
))
}
}
Expand Down Expand Up @@ -191,7 +210,7 @@ impl Sink for RedisSink {
}

async fn validate(&self) -> Result<()> {
let _conn = self.config.common.build_conn().await?;
self.config.common.build_conn_and_pipe().await?;
let all_set: HashSet<String> = self
.schema
.fields()
Expand Down Expand Up @@ -239,21 +258,20 @@ struct RedisSinkPayloadWriter {
// connection to redis, one per executor
conn: Option<RedisConn>,
// the command pipeline for write-commit
pipe: Pipeline,
pipe: RedisPipe,
}
impl RedisSinkPayloadWriter {
pub async fn new(config: RedisConfig) -> Result<Self> {
let conn = config.common.build_conn().await?;
let (conn, pipe) = config.common.build_conn_and_pipe().await?;
let conn = Some(conn);
let pipe = redis::pipe();

Ok(Self { conn, pipe })
}

#[cfg(test)]
pub fn mock() -> Self {
let conn = None;
let pipe = redis::pipe();
let pipe = RedisPipe::Single(redis::pipe());
Self { conn, pipe }
}

Expand All @@ -264,7 +282,7 @@ impl RedisSinkPayloadWriter {
return Ok(());
}
}
self.pipe.query_async(self.conn.as_mut().unwrap()).await?;
self.pipe.query(self.conn.as_mut().unwrap()).await?;
self.pipe.clear();
Ok(())
}
Expand Down Expand Up @@ -353,6 +371,7 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter {

#[cfg(test)]
mod test {
use core::panic;
use std::collections::BTreeMap;

use rdkafka::message::FromBytes;
Expand Down Expand Up @@ -413,17 +432,18 @@ mod test {
(2, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n"),
];

redis_sink_writer
.payload_writer
.pipe
.cmd_iter()
.enumerate()
.zip_eq_debug(expected_a.clone())
.for_each(|((i, cmd), (exp_i, exp_cmd))| {
if exp_i == i {
assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
}
});
if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
pipe.cmd_iter()
.enumerate()
.zip_eq_debug(expected_a.clone())
.for_each(|((i, cmd), (exp_i, exp_cmd))| {
if exp_i == i {
assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
}
});
} else {
panic!("pipe type not match")
}
}

#[tokio::test]
Expand Down Expand Up @@ -488,16 +508,17 @@ mod test {
),
];

redis_sink_writer
.payload_writer
.pipe
.cmd_iter()
.enumerate()
.zip_eq_debug(expected_a.clone())
.for_each(|((i, cmd), (exp_i, exp_cmd))| {
if exp_i == i {
assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
}
});
if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
pipe.cmd_iter()
.enumerate()
.zip_eq_debug(expected_a.clone())
.for_each(|((i, cmd), (exp_i, exp_cmd))| {
if exp_i == i {
assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
}
});
} else {
panic!("pipe type not match")
};
}
}

0 comments on commit 201f903

Please sign in to comment.