Skip to content

Commit

Permalink
fix test err
Browse files Browse the repository at this point in the history
fix

use string for timestamp
  • Loading branch information
xxhZs committed Oct 23, 2023
1 parent a3fcb92 commit c8d0223
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 14 deletions.
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ enum EncodeType {
ENCODE_TYPE_PROTOBUF = 4;
ENCODE_TYPE_JSON = 5;
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
}

enum RowFormatType {
Expand Down
20 changes: 8 additions & 12 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,9 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => {
return Err(SinkError::Config(anyhow!(
"Template only support with redis sink"
)))
}
SinkEncode::Template => Err(SinkError::Config(anyhow!(
"Template only support with redis sink"
))),
}
}
SinkFormat::Debezium => {
Expand Down Expand Up @@ -184,9 +182,9 @@ impl SinkFormatterImpl {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
TimestampHandlingMode::String,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String);
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
Expand Down Expand Up @@ -227,11 +225,9 @@ impl SinkFormatterImpl {
))),
}
}
_ => {
return Err(SinkError::Config(anyhow!(
"Redis sink only support Json and Template"
)))
}
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Json and Template"
))),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct RedisSink {
format_desc: SinkFormatDesc,
}

fn check_string_format(format: &String, set: &HashSet<String>) -> Result<()> {
fn check_string_format(format: &str, set: &HashSet<String>) -> Result<()> {
// We will check if the string inside {} corresponds to a column name in rw.
// In other words, the content within {} should exclusively consist of column names from rw,
// which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect.
Expand Down Expand Up @@ -380,7 +380,7 @@ mod test {
let format_desc = SinkFormatDesc {
format: SinkFormat::AppendOnly,
encode: SinkEncode::Template,
options: BTreeMap::default(),
options: btree_map,
};

let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap();
Expand Down

0 comments on commit c8d0223

Please sign in to comment.