From f88df1c1b8decc22dcaa01f21913fb11714e83fe Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 28 Mar 2024 19:50:43 +0800 Subject: [PATCH 1/5] save save fix ci --- Cargo.lock | 10 +++ e2e_test/sink/redis_sink.slt | 4 +- integration_tests/redis-sink/create_sink.sql | 8 +- src/connector/Cargo.toml | 2 +- src/connector/src/sink/redis.rs | 77 ++++++++++++++++---- src/workspace-hack/Cargo.toml | 2 +- 6 files changed, 82 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93596a423bf8..94a3ebf6d6cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2537,6 +2537,12 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32c" version = "0.6.4" @@ -8425,10 +8431,14 @@ dependencies = [ "async-trait", "bytes", "combine", + "crc16", + "futures", "futures-util", "itoa", + "log", "percent-encoding", "pin-project-lite", + "rand", "ryu", "sha1_smol", "socket2 0.5.3", diff --git a/e2e_test/sink/redis_sink.slt b/e2e_test/sink/redis_sink.slt index 7475a80ae696..92122fed8184 100644 --- a/e2e_test/sink/redis_sink.slt +++ b/e2e_test/sink/redis_sink.slt @@ -10,7 +10,7 @@ FROM mv6 WITH ( primary_key = 'v1', connector = 'redis', - redis.url= 'redis://redis-server:6379/', + redis.url= '["redis://redis-server:6379/"]', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); statement ok @@ -19,7 +19,7 @@ FROM mv6 WITH ( primary_key = 'v1', connector = 'redis', - redis.url= 'redis://redis-server:6379/', + redis.url= '["redis://redis-server:6379/"]', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'V1:{v1}', value_format = 'V2:{v2},V3:{v3}'); statement ok diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 61ffb6732622..1dc26a0facac 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -3,7 +3,7 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= 'redis://redis:6379/', + redis.url= '["redis://127.0.0.1:6379/"]', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK bhv_redis_sink_2 @@ -11,7 +11,7 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= 'redis://redis:6379/', + redis.url= '["redis://redis:6379/"]', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); CREATE SINK redis_types_json_sink @@ -19,7 +19,7 @@ FROM redis_types WITH ( primary_key = 'types_id', connector = 'redis', - redis.url= 'redis://redis:6379/', + redis.url= '["redis://redis:6379/"]', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK redis_types_template_sink @@ -27,7 +27,7 @@ FROM redis_types WITH ( primary_key = 'types_id', connector = 'redis', - redis.url= 'redis://redis:6379/', + redis.url= '["redis://redis:6379/"]', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'TYPESID:{types_id}', value_format = ' diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 043e542e4a5c..c013b60dc710 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -106,7 +106,7 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] } +redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } regex = "1.4" reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 1a579660ca91..7b5a28d112e7 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -16,12 +16,16 @@ use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use async_trait::async_trait; -use redis::aio::MultiplexedConnection; +use redis::aio::{ConnectionLike, MultiplexedConnection}; +use redis::cluster::ClusterClient; +use redis::cluster_async::ClusterConnection; use redis::{Client as RedisClient, Pipeline}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use serde_derive::Deserialize; +use serde_json::Value; use serde_with::serde_as; +use simd_json::prelude::ArrayTrait; use with_options::WithOptions; use super::catalog::SinkFormatDesc; @@ -44,13 +48,57 @@ pub const VALUE_FORMAT: &str = "value_format"; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct RedisCommon { #[serde(rename = "redis.url")] - pub url: String, + pub url: Vec, +} +pub enum RedisConn { + Cluster(ClusterConnection), + Single(MultiplexedConnection), +} + +impl ConnectionLike for RedisConn { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConn::Cluster(conn) => conn.req_packed_command(cmd), + RedisConn::Single(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + match self { + RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConn::Cluster(conn) => conn.get_db(), + RedisConn::Single(conn) => conn.get_db(), + } + } } impl RedisCommon { - pub(crate) fn build_client(&self) -> ConnectorResult { - let client = RedisClient::open(self.url.clone())?; - Ok(client) + pub async fn build_conn(&self) -> ConnectorResult { + if self.url.is_empty() { + Err(SinkError::Redis("Missing redis.url".to_string()).into()) + } else if self.url.len() == 1 { + let client = RedisClient::open(self.url.get(0).unwrap().clone())?; + Ok(RedisConn::Single( + client.get_multiplexed_async_connection().await?, + )) + } else { + let client = ClusterClient::new(self.url.clone())?; + Ok(RedisConn::Cluster(client.get_async_connection().await?)) + } } } #[serde_as] @@ -62,9 +110,13 @@ pub struct RedisConfig { impl RedisConfig { pub fn from_hashmap(properties: HashMap) -> Result { - let config = - serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; + let mut json_map = serde_json::Map::new(); + for (key, value) in properties { + let json_value = serde_json::from_str(&value).unwrap_or(Value::Null); + json_map.insert(key, json_value); + } + let config = serde_json::from_value::(Value::Object(json_map)) + .map_err(|e| SinkError::Config(anyhow!(e)))?; Ok(config) } } @@ -123,8 +175,7 @@ impl Sink for RedisSink { } async fn validate(&self) -> Result<()> { - let client = self.config.common.build_client()?; - client.get_connection()?; + let _conn = self.config.common.build_conn().await?; let all_set: HashSet = self .schema .fields() @@ -170,14 +221,14 @@ pub struct RedisSinkWriter { struct RedisSinkPayloadWriter { // connection to redis, one per executor - conn: Option, + conn: Option, // the command pipeline for write-commit pipe: Pipeline, } impl RedisSinkPayloadWriter { pub async fn new(config: RedisConfig) -> Result { - let client = config.common.build_client()?; - let conn = Some(client.get_multiplexed_async_connection().await?); + let conn = config.common.build_conn().await?; + let conn = Some(conn); let pipe = redis::pipe(); Ok(Self { conn, pipe }) diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 1ba32e7c0198..1ae67e6d0295 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -108,7 +108,7 @@ quote = { version = "1" } rand = { version = "0.8", features = ["small_rng"] } rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } -redis = { version = "0.25", features = ["async-std-comp", "tokio-comp"] } +redis = { version = "0.25", features = ["async-std-comp", "cluster-async", "tokio-comp"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8" } From aaf91c3c260579af76a00eff35a54dfdd08081c1 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 2 Apr 2024 14:24:16 +0800 Subject: [PATCH 2/5] fix ci --- src/connector/with_options_sink.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 4d776cc1689f..6c8cbd903b0b 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -516,7 +516,7 @@ PulsarConfig: RedisConfig: fields: - name: redis.url - field_type: String + field_type: Vec required: true StarrocksConfig: fields: From 1c9371255d087820b03f315abf7c1833afd43427 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 7 Apr 2024 18:04:13 +0800 Subject: [PATCH 3/5] add ci --- ci/redis-conf/redis-7000.conf | 9 +++++++ ci/redis-conf/redis-7001.conf | 9 +++++++ ci/redis-conf/redis-7002.conf | 9 +++++++ ci/scripts/e2e-redis-sink-test.sh | 19 +++++++++++++++ e2e_test/sink/redis_cluster_sink.slt | 35 ++++++++++++++++++++++++++++ 5 files changed, 81 insertions(+) create mode 100644 ci/redis-conf/redis-7000.conf create mode 100644 ci/redis-conf/redis-7001.conf create mode 100644 ci/redis-conf/redis-7002.conf create mode 100644 e2e_test/sink/redis_cluster_sink.slt diff --git a/ci/redis-conf/redis-7000.conf b/ci/redis-conf/redis-7000.conf new file mode 100644 index 000000000000..a69a6f8d5cb0 --- /dev/null +++ b/ci/redis-conf/redis-7000.conf @@ -0,0 +1,9 @@ +port 7000 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7000.conf" \ No newline at end of file diff --git a/ci/redis-conf/redis-7001.conf b/ci/redis-conf/redis-7001.conf new file mode 100644 index 000000000000..db8ff90d1866 --- /dev/null +++ b/ci/redis-conf/redis-7001.conf @@ -0,0 +1,9 @@ +port 7001 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7001.conf" \ No newline at end of file diff --git a/ci/redis-conf/redis-7002.conf b/ci/redis-conf/redis-7002.conf new file mode 100644 index 000000000000..ed68ddbfe21f --- /dev/null +++ b/ci/redis-conf/redis-7002.conf @@ -0,0 +1,9 @@ +port 7002 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7002.conf" \ No newline at end of file diff --git a/ci/scripts/e2e-redis-sink-test.sh b/ci/scripts/e2e-redis-sink-test.sh index cf64662db405..6d19c450d2d5 100755 --- a/ci/scripts/e2e-redis-sink-test.sh +++ b/ci/scripts/e2e-redis-sink-test.sh @@ -44,5 +44,24 @@ else exit 1 fi +echo "--- testing cluster sinks" +redis-server ./ci/redis-conf/redis-7000.conf --daemonize yes +redis-server ./ci/redis-conf/redis-7001.conf --daemonize yes +redis-server ./ci/redis-conf/redis-7002.conf --daemonize yes + +echo "yes" | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 + +sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt' + +redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt + +if cat ./query_result_1.txt | tr '\n' '\0' | xargs -0 -n1 bash -c '[[ "$0" == "{\"v1\":1}" || "$0" == "{\"v2\":2}" || "$0" == "{\"v3\":3}" ]]'; then + echo "Redis sink check passed" +else + cat ./query_result_1.txt + echo "The output is not as expected." + exit 1 +fi + echo "--- Kill cluster" cargo make ci-kill \ No newline at end of file diff --git a/e2e_test/sink/redis_cluster_sink.slt b/e2e_test/sink/redis_cluster_sink.slt new file mode 100644 index 000000000000..33138f0cb143 --- /dev/null +++ b/e2e_test/sink/redis_cluster_sink.slt @@ -0,0 +1,35 @@ +statement ok +CREATE TABLE t6 (v1 int primary key, v2 int); + +statement ok +CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; + +statement ok +CREATE SINK s61 +FROM + mv6 WITH ( + primary_key = 'v1', + connector = 'redis', + redis.url= '["redis://redis-server:7000/","redis://redis-server:7001/","redis://redis-server:7002/"]', +)FORMAT PLAIN ENCODE JSON(force_append_only='true'); + +statement ok +INSERT INTO t6 VALUES (1, 1); + +statement ok +INSERT INTO t6 VALUES (2, 2); + +statement ok +INSERT INTO t6 VALUES (3, 3); + +statement ok +FLUSH; + +statement ok +DROP SINK s61; + +statement ok +DROP MATERIALIZED VIEW mv6; + +statement ok +DROP TABLE t6; \ No newline at end of file From c57e78dd426b9f589bc732f0089c7481978e6ca6 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 8 Apr 2024 15:25:37 +0800 Subject: [PATCH 4/5] add ci --- e2e_test/sink/redis_cluster_sink.slt | 2 +- e2e_test/sink/redis_sink.slt | 4 +- integration_tests/redis-sink/create_sink.sql | 8 +-- src/connector/src/sink/redis.rs | 51 ++++++++++++-------- 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/e2e_test/sink/redis_cluster_sink.slt b/e2e_test/sink/redis_cluster_sink.slt index 33138f0cb143..5d2d84b77336 100644 --- a/e2e_test/sink/redis_cluster_sink.slt +++ b/e2e_test/sink/redis_cluster_sink.slt @@ -10,7 +10,7 @@ FROM mv6 WITH ( primary_key = 'v1', connector = 'redis', - redis.url= '["redis://redis-server:7000/","redis://redis-server:7001/","redis://redis-server:7002/"]', + redis.url= '["redis://127.0.0.1:7000/","redis://127.0.0.1:7001/","redis://127.0.0.1:7002/"]', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); statement ok diff --git a/e2e_test/sink/redis_sink.slt b/e2e_test/sink/redis_sink.slt index 92122fed8184..7475a80ae696 100644 --- a/e2e_test/sink/redis_sink.slt +++ b/e2e_test/sink/redis_sink.slt @@ -10,7 +10,7 @@ FROM mv6 WITH ( primary_key = 'v1', connector = 'redis', - redis.url= '["redis://redis-server:6379/"]', + redis.url= 'redis://redis-server:6379/', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); statement ok @@ -19,7 +19,7 @@ FROM mv6 WITH ( primary_key = 'v1', connector = 'redis', - redis.url= '["redis://redis-server:6379/"]', + redis.url= 'redis://redis-server:6379/', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'V1:{v1}', value_format = 'V2:{v2},V3:{v3}'); statement ok diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 1dc26a0facac..61ffb6732622 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -3,7 +3,7 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= '["redis://127.0.0.1:6379/"]', + redis.url= 'redis://redis:6379/', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK bhv_redis_sink_2 @@ -11,7 +11,7 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= '["redis://redis:6379/"]', + redis.url= 'redis://redis:6379/', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); CREATE SINK redis_types_json_sink @@ -19,7 +19,7 @@ FROM redis_types WITH ( primary_key = 'types_id', connector = 'redis', - redis.url= '["redis://redis:6379/"]', + redis.url= 'redis://redis:6379/', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK redis_types_template_sink @@ -27,7 +27,7 @@ FROM redis_types WITH ( primary_key = 'types_id', connector = 'redis', - redis.url= '["redis://redis:6379/"]', + redis.url= 'redis://redis:6379/', )FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'TYPESID:{types_id}', value_format = ' diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 7b5a28d112e7..95eaf702ed7e 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -25,7 +25,6 @@ use risingwave_common::catalog::Schema; use serde_derive::Deserialize; use serde_json::Value; use serde_with::serde_as; -use simd_json::prelude::ArrayTrait; use with_options::WithOptions; use super::catalog::SinkFormatDesc; @@ -48,7 +47,7 @@ pub const VALUE_FORMAT: &str = "value_format"; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct RedisCommon { #[serde(rename = "redis.url")] - pub url: Vec, + pub url: String, } pub enum RedisConn { Cluster(ClusterConnection), @@ -88,16 +87,34 @@ impl ConnectionLike for RedisConn { impl RedisCommon { pub async fn build_conn(&self) -> ConnectorResult { - if self.url.is_empty() { - Err(SinkError::Redis("Missing redis.url".to_string()).into()) - } else if self.url.len() == 1 { - let client = RedisClient::open(self.url.get(0).unwrap().clone())?; - Ok(RedisConn::Single( - client.get_multiplexed_async_connection().await?, - )) - } else { - let client = ClusterClient::new(self.url.clone())?; - Ok(RedisConn::Cluster(client.get_async_connection().await?)) + let url: Value = + serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e)))?; + match url { + Value::String(s) => { + let client = RedisClient::open(s)?; + Ok(RedisConn::Single( + client.get_multiplexed_async_connection().await?, + )) + } + Value::Array(list) => { + let list = list + .into_iter() + .map(|s| { + if let Value::String(s) = s { + Ok(s) + } else { + Err( + SinkError::Redis("redis.url must be array of string".to_string()) + .into(), + ) + } + }) + .collect::>>()?; + + let client = ClusterClient::new(list)?; + Ok(RedisConn::Cluster(client.get_async_connection().await?)) + } + _ => Err(SinkError::Redis("redis.url must be array or string".to_string()).into()), } } } @@ -110,13 +127,9 @@ pub struct RedisConfig { impl RedisConfig { pub fn from_hashmap(properties: HashMap) -> Result { - let mut json_map = serde_json::Map::new(); - for (key, value) in properties { - let json_value = serde_json::from_str(&value).unwrap_or(Value::Null); - json_map.insert(key, json_value); - } - let config = serde_json::from_value::(Value::Object(json_map)) - .map_err(|e| SinkError::Config(anyhow!(e)))?; + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; Ok(config) } } From 3926983d8da4fb912c28734abacacc1e14023bc7 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 8 Apr 2024 17:47:52 +0800 Subject: [PATCH 5/5] fix fix fix ci fix ci fix ci --- ci/scripts/e2e-redis-sink-test.sh | 7 ++-- src/connector/src/sink/redis.rs | 51 +++++++++++++++------------- src/connector/with_options_sink.yaml | 2 +- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/ci/scripts/e2e-redis-sink-test.sh b/ci/scripts/e2e-redis-sink-test.sh index 6d19c450d2d5..210bf27bbe79 100755 --- a/ci/scripts/e2e-redis-sink-test.sh +++ b/ci/scripts/e2e-redis-sink-test.sh @@ -55,12 +55,13 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt' redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt -if cat ./query_result_1.txt | tr '\n' '\0' | xargs -0 -n1 bash -c '[[ "$0" == "{\"v1\":1}" || "$0" == "{\"v2\":2}" || "$0" == "{\"v3\":3}" ]]'; then +line_count=$(wc -l < query_result_1.txt) +if [ "$line_count" -eq 4 ]; then echo "Redis sink check passed" else cat ./query_result_1.txt - echo "The output is not as expected." - exit 1 + echo "The output is not as expected." + exit 1 fi echo "--- Kill cluster" diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 95eaf702ed7e..96b524d5b7b2 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -50,7 +50,9 @@ pub struct RedisCommon { pub url: String, } pub enum RedisConn { + // Redis deployed as a cluster, clusters with only one node should also use this conn Cluster(ClusterConnection), + // Redis is not deployed as a cluster Single(MultiplexedConnection), } @@ -87,34 +89,35 @@ impl ConnectionLike for RedisConn { impl RedisCommon { pub async fn build_conn(&self) -> ConnectorResult { - let url: Value = - serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e)))?; - match url { - Value::String(s) => { - let client = RedisClient::open(s)?; + match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) { + Ok(v) => { + if let Value::Array(list) = v { + let list = list + .into_iter() + .map(|s| { + if let Value::String(s) = s { + Ok(s) + } else { + Err(SinkError::Redis( + "redis.url must be array of string".to_string(), + ) + .into()) + } + }) + .collect::>>()?; + + let client = ClusterClient::new(list)?; + Ok(RedisConn::Cluster(client.get_async_connection().await?)) + } else { + Err(SinkError::Redis("redis.url must be array or string".to_string()).into()) + } + } + Err(_) => { + let client = RedisClient::open(self.url.clone())?; Ok(RedisConn::Single( client.get_multiplexed_async_connection().await?, )) } - Value::Array(list) => { - let list = list - .into_iter() - .map(|s| { - if let Value::String(s) = s { - Ok(s) - } else { - Err( - SinkError::Redis("redis.url must be array of string".to_string()) - .into(), - ) - } - }) - .collect::>>()?; - - let client = ClusterClient::new(list)?; - Ok(RedisConn::Cluster(client.get_async_connection().await?)) - } - _ => Err(SinkError::Redis("redis.url must be array or string".to_string()).into()), } } } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 1cba39927061..5ce2c96cc64e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -523,7 +523,7 @@ PulsarConfig: RedisConfig: fields: - name: redis.url - field_type: Vec + field_type: String required: true StarrocksConfig: fields: