From 201f90301041657d47a88ff3560febde16d4f123 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 30 Apr 2024 12:04:54 +0800 Subject: [PATCH] fix(Sink): fix redis sink Cluster error (#16405) --- ci/scripts/e2e-redis-sink-test.sh | 2 +- e2e_test/sink/redis_cluster_sink.slt | 8 +- src/connector/src/sink/redis.rs | 141 +++++++++++++++------------ 3 files changed, 83 insertions(+), 68 deletions(-) diff --git a/ci/scripts/e2e-redis-sink-test.sh b/ci/scripts/e2e-redis-sink-test.sh index c7567dd51ed0..45e578b9268a 100755 --- a/ci/scripts/e2e-redis-sink-test.sh +++ b/ci/scripts/e2e-redis-sink-test.sh @@ -56,7 +56,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt' redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt line_count=$(wc -l < query_result_1.txt) -if [ "$line_count" -eq 4 ]; then +if [ "$line_count" -eq 16 ]; then echo "Redis sink check passed" else cat ./query_result_1.txt diff --git a/e2e_test/sink/redis_cluster_sink.slt b/e2e_test/sink/redis_cluster_sink.slt index 5d2d84b77336..03d197485777 100644 --- a/e2e_test/sink/redis_cluster_sink.slt +++ b/e2e_test/sink/redis_cluster_sink.slt @@ -14,13 +14,7 @@ FROM )FORMAT PLAIN ENCODE JSON(force_append_only='true'); statement ok -INSERT INTO t6 VALUES (1, 1); - -statement ok -INSERT INTO t6 VALUES (2, 2); - -statement ok -INSERT INTO t6 VALUES (3, 3); +INSERT INTO t6 VALUES (1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10),(11, 11),(12, 12),(13, 13),(14, 14),(15, 15); statement ok FLUSH; diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 96b524d5b7b2..10ecff85f2cc 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use async_trait::async_trait; -use redis::aio::{ConnectionLike, MultiplexedConnection}; -use redis::cluster::ClusterClient; -use redis::cluster_async::ClusterConnection; +use redis::aio::MultiplexedConnection; +use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline}; use redis::{Client as RedisClient, Pipeline}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; @@ -49,46 +48,62 @@ pub struct RedisCommon { #[serde(rename = "redis.url")] pub url: String, } -pub enum RedisConn { - // Redis deployed as a cluster, clusters with only one node should also use this conn - Cluster(ClusterConnection), - // Redis is not deployed as a cluster - Single(MultiplexedConnection), +pub enum RedisPipe { + Cluster(ClusterPipeline), + Single(Pipeline), } +impl RedisPipe { + pub async fn query( + &self, + conn: &mut RedisConn, + ) -> ConnectorResult { + match (self, conn) { + (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?), + (RedisPipe::Single(pipe), RedisConn::Single(conn)) => { + Ok(pipe.query_async(conn).await?) + } + _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_string()).into()), + } + } -impl ConnectionLike for RedisConn { - fn req_packed_command<'a>( - &'a mut self, - cmd: &'a redis::Cmd, - ) -> redis::RedisFuture<'a, redis::Value> { + pub fn clear(&mut self) { match self { - RedisConn::Cluster(conn) => conn.req_packed_command(cmd), - RedisConn::Single(conn) => conn.req_packed_command(cmd), + RedisPipe::Cluster(pipe) => pipe.clear(), + RedisPipe::Single(pipe) => pipe.clear(), } } - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a redis::Pipeline, - offset: usize, - count: usize, - ) -> redis::RedisFuture<'a, Vec> { + pub fn set(&mut self, k: String, v: Vec) { match self { - RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count), - RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count), - } + RedisPipe::Cluster(pipe) => { + pipe.set(k, v); + } + RedisPipe::Single(pipe) => { + pipe.set(k, v); + } + }; } - fn get_db(&self) -> i64 { + pub fn del(&mut self, k: String) { match self { - RedisConn::Cluster(conn) => conn.get_db(), - RedisConn::Single(conn) => conn.get_db(), - } + RedisPipe::Cluster(pipe) => { + pipe.del(k); + } + RedisPipe::Single(pipe) => { + pipe.del(k); + } + }; } } +pub enum RedisConn { + // Redis deployed as a cluster, clusters with only one node should also use this conn + Cluster(ClusterConnection), + // Redis is not deployed as a cluster + Single(MultiplexedConnection), +} impl RedisCommon { - pub async fn build_conn(&self) -> ConnectorResult { + pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> { match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) { Ok(v) => { if let Value::Array(list) = v { @@ -107,15 +122,19 @@ impl RedisCommon { .collect::>>()?; let client = ClusterClient::new(list)?; - Ok(RedisConn::Cluster(client.get_async_connection().await?)) + Ok(( + RedisConn::Cluster(client.get_connection()?), + RedisPipe::Cluster(redis::cluster::cluster_pipe()), + )) } else { Err(SinkError::Redis("redis.url must be array or string".to_string()).into()) } } Err(_) => { let client = RedisClient::open(self.url.clone())?; - Ok(RedisConn::Single( - client.get_multiplexed_async_connection().await?, + Ok(( + RedisConn::Single(client.get_multiplexed_async_connection().await?), + RedisPipe::Single(redis::pipe()), )) } } @@ -191,7 +210,7 @@ impl Sink for RedisSink { } async fn validate(&self) -> Result<()> { - let _conn = self.config.common.build_conn().await?; + self.config.common.build_conn_and_pipe().await?; let all_set: HashSet = self .schema .fields() @@ -239,13 +258,12 @@ struct RedisSinkPayloadWriter { // connection to redis, one per executor conn: Option, // the command pipeline for write-commit - pipe: Pipeline, + pipe: RedisPipe, } impl RedisSinkPayloadWriter { pub async fn new(config: RedisConfig) -> Result { - let conn = config.common.build_conn().await?; + let (conn, pipe) = config.common.build_conn_and_pipe().await?; let conn = Some(conn); - let pipe = redis::pipe(); Ok(Self { conn, pipe }) } @@ -253,7 +271,7 @@ impl RedisSinkPayloadWriter { #[cfg(test)] pub fn mock() -> Self { let conn = None; - let pipe = redis::pipe(); + let pipe = RedisPipe::Single(redis::pipe()); Self { conn, pipe } } @@ -264,7 +282,7 @@ impl RedisSinkPayloadWriter { return Ok(()); } } - self.pipe.query_async(self.conn.as_mut().unwrap()).await?; + self.pipe.query(self.conn.as_mut().unwrap()).await?; self.pipe.clear(); Ok(()) } @@ -353,6 +371,7 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { #[cfg(test)] mod test { + use core::panic; use std::collections::BTreeMap; use rdkafka::message::FromBytes; @@ -413,17 +432,18 @@ mod test { (2, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n"), ]; - redis_sink_writer - .payload_writer - .pipe - .cmd_iter() - .enumerate() - .zip_eq_debug(expected_a.clone()) - .for_each(|((i, cmd), (exp_i, exp_cmd))| { - if exp_i == i { - assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) - } - }); + if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe { + pipe.cmd_iter() + .enumerate() + .zip_eq_debug(expected_a.clone()) + .for_each(|((i, cmd), (exp_i, exp_cmd))| { + if exp_i == i { + assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) + } + }); + } else { + panic!("pipe type not match") + } } #[tokio::test] @@ -488,16 +508,17 @@ mod test { ), ]; - redis_sink_writer - .payload_writer - .pipe - .cmd_iter() - .enumerate() - .zip_eq_debug(expected_a.clone()) - .for_each(|((i, cmd), (exp_i, exp_cmd))| { - if exp_i == i { - assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) - } - }); + if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe { + pipe.cmd_iter() + .enumerate() + .zip_eq_debug(expected_a.clone()) + .for_each(|((i, cmd), (exp_i, exp_cmd))| { + if exp_i == i { + assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) + } + }); + } else { + panic!("pipe type not match") + }; } }