diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 0c725a51e34e4..2ba9ba67feb39 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -12,4 +12,4 @@ FROM primary_key = 'user_id', connector = 'redis', redis.url= 'redis://127.0.0.1:6379/', -)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', redis_key_format = 'UserID:{user_id}', redis_value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); \ No newline at end of file +)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); \ No newline at end of file diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 1d2b56c03ee95..ca3a09e7f2eda 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -140,7 +140,6 @@ impl SinkFormatDesc { use crate::sink::kafka::KafkaSink; use crate::sink::kinesis::KinesisSink; use crate::sink::pulsar::PulsarSink; - use crate::sink::redis::RedisSink; use crate::sink::Sink as _; let format = match r#type { @@ -155,10 +154,9 @@ impl SinkFormatDesc { } }; let encode = match connector { - KafkaSink::SINK_NAME - | KinesisSink::SINK_NAME - | PulsarSink::SINK_NAME - | RedisSink::SINK_NAME => SinkEncode::Json, + KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => { + SinkEncode::Json + } _ => return Ok(None), }; Ok(Some(Self { diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs index 85f085989b6c4..97d8271f9e83a 100644 --- a/src/connector/src/sink/encoder/template.rs +++ b/src/connector/src/sink/encoder/template.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use regex::Regex; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::ToText; use super::{Result, RowEncoder}; +use crate::sink::SinkError; /// Encode a row according to a specified string template `user_id:{user_id}` pub struct TemplateEncoder { @@ -34,6 +38,24 @@ impl TemplateEncoder { template, } } + + pub fn check_string_format(format: &str, set: &HashSet) -> Result<()> { + // We will check if the string inside {} corresponds to a column name in rw. + // In other words, the content within {} should exclusively consist of column names from rw, + // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. + let re = Regex::new(r"\{([^}]*)\}").unwrap(); + if !re.is_match(format) { + return Err(SinkError::Redis( + "Can't find {} in key_format or value_format".to_string(), + )); + } + for capture in re.captures_iter(format) { + if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ + return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) + } + } + Ok(()) + } } impl RowEncoder for TemplateEncoder { diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index ff468e8b20063..17cb708292890 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -29,7 +29,7 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; use super::encoder::KafkaConnectParams; -use super::redis::{REDIS_KEY_FORMAT, REDIS_VALUE_FORMAT}; +use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode}; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, @@ -93,7 +93,7 @@ impl SinkFormatterImpl { let key_encoder = (!pk_indices.is_empty()).then(|| { JsonEncoder::new( schema.clone(), - Some(pk_indices), + Some(pk_indices.clone()), TimestampHandlingMode::Milli, ) }); @@ -116,9 +116,28 @@ impl SinkFormatterImpl { Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) } SinkEncode::Avro => err_unsupported(), - SinkEncode::Template => Err(SinkError::Config(anyhow!( - "Template only support with redis sink" - ))), + SinkEncode::Template => { + let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = + format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'redis_value_format',please set it or use JSON" + )) + })?; + let key_encoder = TemplateEncoder::new( + schema.clone(), + Some(pk_indices), + key_format.clone(), + ); + let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); + Ok(SinkFormatterImpl::AppendOnlyTemplate( + AppendOnlyFormatter::new(Some(key_encoder), val_encoder), + )) + } } } SinkFormat::Debezium => { @@ -135,99 +154,66 @@ impl SinkFormatterImpl { ))) } SinkFormat::Upsert => { - if format_desc.encode != SinkEncode::Json { - return err_unsupported(); - } + match format_desc.encode { + SinkEncode::Json => { + let mut key_encoder = JsonEncoder::new( + schema.clone(), + Some(pk_indices), + TimestampHandlingMode::Milli, + ); + let mut val_encoder = + JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - let mut key_encoder = JsonEncoder::new( - schema.clone(), - Some(pk_indices), - TimestampHandlingMode::Milli, - ); - let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - - if let Some(s) = format_desc.options.get("schemas.enable") { - match s.to_lowercase().parse::() { - Ok(true) => { - let kafka_connect = KafkaConnectParams { - schema_name: format!("{}.{}", db_name, sink_from_name), - }; - key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone()); - val_encoder = val_encoder.with_kafka_connect(kafka_connect); - } - Ok(false) => {} - _ => { - return Err(SinkError::Config(anyhow!( - "schemas.enable is expected to be `true` or `false`, got {}", - s - ))); - } + if let Some(s) = format_desc.options.get("schemas.enable") { + match s.to_lowercase().parse::() { + Ok(true) => { + let kafka_connect = KafkaConnectParams { + schema_name: format!("{}.{}", db_name, sink_from_name), + }; + key_encoder = + key_encoder.with_kafka_connect(kafka_connect.clone()); + val_encoder = val_encoder.with_kafka_connect(kafka_connect); + } + Ok(false) => {} + _ => { + return Err(SinkError::Config(anyhow!( + "schemas.enable is expected to be `true` or `false`, got {}", + s + ))); + } + } + }; + + // Initialize the upsert_stream + let formatter = UpsertFormatter::new(key_encoder, val_encoder); + Ok(SinkFormatterImpl::UpsertJson(formatter)) } - }; - - // Initialize the upsert_stream - let formatter = UpsertFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::UpsertJson(formatter)) - } - } - } - - pub fn new_with_redis( - schema: Schema, - pk_indices: Vec, - format_desc: &SinkFormatDesc, - ) -> Result { - match format_desc.encode { - SinkEncode::Json => { - let key_encoder = JsonEncoder::new( - schema.clone(), - Some(pk_indices), - TimestampHandlingMode::String, - ); - let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String); - match format_desc.format { - SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson( - AppendOnlyFormatter::new(Some(key_encoder), val_encoder), - )), - SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new( - key_encoder, - val_encoder, - ))), - _ => Err(SinkError::Config(anyhow!( - "Redis sink only support Append_Only and Upsert" - ))), - } - } - SinkEncode::Template => { - let key_format = format_desc.options.get(REDIS_KEY_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_key_format',please set it or use JSON" - )) - })?; - let value_format = - format_desc.options.get(REDIS_VALUE_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_value_format',please set it or use JSON" - )) - })?; - let key_encoder = - TemplateEncoder::new(schema.clone(), Some(pk_indices), key_format.clone()); - let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); - match format_desc.format { - SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyTemplate( - AppendOnlyFormatter::new(Some(key_encoder), val_encoder), - )), - SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertTemplate( - UpsertFormatter::new(key_encoder, val_encoder), - )), - _ => Err(SinkError::Config(anyhow!( - "Redis sink only support Append_Only and Upsert" - ))), + SinkEncode::Template => { + let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = + format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'redis_value_format',please set it or use JSON" + )) + })?; + let key_encoder = TemplateEncoder::new( + schema.clone(), + Some(pk_indices), + key_format.clone(), + ); + let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); + Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new( + key_encoder, + val_encoder, + ))) + } + _ => err_unsupported(), } } - _ => Err(SinkError::Config(anyhow!( - "Redis sink only support Json and Template" - ))), } } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index ab141d01f6a20..6120075a049df 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -18,13 +18,13 @@ use anyhow::anyhow; use async_trait::async_trait; use redis::aio::Connection; use redis::{Client as RedisClient, Pipeline}; -use regex::Regex; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use super::catalog::SinkFormatDesc; +use super::encoder::template::TemplateEncoder; use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; use super::{SinkError, SinkParam}; @@ -33,8 +33,8 @@ use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const REDIS_SINK: &str = "redis"; -pub const REDIS_KEY_FORMAT: &str = "redis_key_format"; -pub const REDIS_VALUE_FORMAT: &str = "redis_value_format"; +pub const KEY_FORMAT: &str = "key_format"; +pub const VALUE_FORMAT: &str = "value_format"; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct RedisCommon { #[serde(rename = "redis.url")] @@ -69,24 +69,8 @@ pub struct RedisSink { schema: Schema, pk_indices: Vec, format_desc: SinkFormatDesc, -} - -fn check_string_format(format: &str, set: &HashSet) -> Result<()> { - // We will check if the string inside {} corresponds to a column name in rw. - // In other words, the content within {} should exclusively consist of column names from rw, - // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. - let re = Regex::new(r"\{([^}]*)\}").unwrap(); - if !re.is_match(format) { - return Err(SinkError::Redis( - "Can't find {} in key_format or value_format".to_string(), - )); - } - for capture in re.captures_iter(format) { - if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ - return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) - } - } - Ok(()) + db_name: String, + sink_from_name: String, } #[async_trait] @@ -107,6 +91,8 @@ impl TryFrom for RedisSink { format_desc: param .format_desc .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, + db_name: param.db_name, + sink_from_name: param.sink_from_name, }) } } @@ -123,6 +109,8 @@ impl Sink for RedisSink { self.schema.clone(), self.pk_indices.clone(), &self.format_desc, + self.db_name.clone(), + self.sink_from_name.clone(), ) .await? .into_log_sinker(writer_param.sink_metrics)) @@ -149,26 +137,18 @@ impl Sink for RedisSink { self.format_desc.encode, super::catalog::SinkEncode::Template ) { - let key_format = self - .format_desc - .options - .get(REDIS_KEY_FORMAT) - .ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_key_format',please set it or use JSON" - )) - })?; - let value_format = self - .format_desc - .options - .get(REDIS_VALUE_FORMAT) - .ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_value_format',please set it or use JSON" - )) - })?; - check_string_format(key_format, &pk_set)?; - check_string_format(value_format, &all_set)?; + let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'value_format',please set it or use JSON" + )) + })?; + TemplateEncoder::check_string_format(key_format, &pk_set)?; + TemplateEncoder::check_string_format(value_format, &all_set)?; } Ok(()) } @@ -231,10 +211,18 @@ impl RedisSinkWriter { schema: Schema, pk_indices: Vec, format_desc: &SinkFormatDesc, + db_name: String, + sink_from_name: String, ) -> Result { let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?; - let formatter = - SinkFormatterImpl::new_with_redis(schema.clone(), pk_indices.clone(), format_desc)?; + let formatter = SinkFormatterImpl::new( + format_desc, + schema.clone(), + pk_indices.clone(), + db_name, + sink_from_name, + ) + .await?; Ok(Self { schema, @@ -246,13 +234,19 @@ impl RedisSinkWriter { } #[cfg(test)] - pub fn mock( + pub async fn mock( schema: Schema, pk_indices: Vec, format_desc: &SinkFormatDesc, ) -> Result { - let formatter = - SinkFormatterImpl::new_with_redis(schema.clone(), pk_indices.clone(), format_desc)?; + let formatter = SinkFormatterImpl::new( + format_desc, + schema.clone(), + pk_indices.clone(), + "d1".to_string(), + "t1".to_string(), + ) + .await?; Ok(Self { schema, pk_indices, @@ -320,7 +314,9 @@ mod test { options: BTreeMap::default(), }; - let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap(); + let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) + .await + .unwrap(); let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert], @@ -372,9 +368,9 @@ mod test { ]); let mut btree_map = BTreeMap::default(); - btree_map.insert(REDIS_KEY_FORMAT.to_string(), "key-{id}".to_string()); + btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); btree_map.insert( - REDIS_VALUE_FORMAT.to_string(), + VALUE_FORMAT.to_string(), "values:{id:{id},name:{name}}".to_string(), ); let format_desc = SinkFormatDesc { @@ -383,7 +379,9 @@ mod test { options: btree_map, }; - let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap(); + let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) + .await + .unwrap(); let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert],