diff --git a/proto/plan_common.proto b/proto/plan_common.proto index a88242a572693..d4c7a2e04f138 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -106,6 +106,7 @@ enum EncodeType { ENCODE_TYPE_PROTOBUF = 4; ENCODE_TYPE_JSON = 5; ENCODE_TYPE_BYTES = 6; + ENCODE_TYPE_TEMPLATE = 7; } enum RowFormatType { diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 05f7bfa8b6f3e..ff468e8b20063 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -116,11 +116,9 @@ impl SinkFormatterImpl { Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) } SinkEncode::Avro => err_unsupported(), - SinkEncode::Template => { - return Err(SinkError::Config(anyhow!( - "Template only support with redis sink" - ))) - } + SinkEncode::Template => Err(SinkError::Config(anyhow!( + "Template only support with redis sink" + ))), } } SinkFormat::Debezium => { @@ -184,9 +182,9 @@ impl SinkFormatterImpl { let key_encoder = JsonEncoder::new( schema.clone(), Some(pk_indices), - TimestampHandlingMode::Milli, + TimestampHandlingMode::String, ); - let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String); match format_desc.format { SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson( AppendOnlyFormatter::new(Some(key_encoder), val_encoder), @@ -227,11 +225,9 @@ impl SinkFormatterImpl { ))), } } - _ => { - return Err(SinkError::Config(anyhow!( - "Redis sink only support Json and Template" - ))) - } + _ => Err(SinkError::Config(anyhow!( + "Redis sink only support Json and Template" + ))), } } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 2a7738ac30e74..ab141d01f6a20 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -71,7 +71,7 @@ pub struct RedisSink { format_desc: SinkFormatDesc, } -fn check_string_format(format: &String, set: &HashSet) -> Result<()> { +fn check_string_format(format: &str, set: &HashSet) -> Result<()> { // We will check if the string inside {} corresponds to a column name in rw. // In other words, the content within {} should exclusively consist of column names from rw, // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. @@ -380,7 +380,7 @@ mod test { let format_desc = SinkFormatDesc { format: SinkFormat::AppendOnly, encode: SinkEncode::Template, - options: BTreeMap::default(), + options: btree_map, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap();