From b7e9e467eddd9a400f017fb2d15a900cc03f57cb Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:47:42 +0800 Subject: [PATCH] feat(source): `encode json` option `timestamptz.handling.mode` (#16265) --- e2e_test/source/basic/handling_mode.slt | 136 ++++++++++++++++++ .../json_timestamptz_handling_mode.1 | 4 + src/connector/benches/parser.rs | 1 + .../src/parser/debezium/debezium_parser.rs | 11 +- .../src/parser/debezium/simd_json_parser.rs | 23 ++- src/connector/src/parser/json_parser.rs | 15 +- .../src/parser/maxwell/simd_json_parser.rs | 1 + src/connector/src/parser/mod.rs | 9 +- src/connector/src/parser/plain_parser.rs | 3 +- src/connector/src/parser/unified/json.rs | 109 +++++++++++--- src/connector/src/schema/mod.rs | 2 +- .../src/source/datagen/source/generator.rs | 1 + src/frontend/src/handler/create_source.rs | 17 ++- .../src/executor/backfill/cdc/cdc_backfill.rs | 1 + 14 files changed, 294 insertions(+), 39 deletions(-) create mode 100644 e2e_test/source/basic/handling_mode.slt create mode 100644 scripts/source/test_data/json_timestamptz_handling_mode.1 diff --git a/e2e_test/source/basic/handling_mode.slt b/e2e_test/source/basic/handling_mode.slt new file mode 100644 index 0000000000000..caf9be9a01fce --- /dev/null +++ b/e2e_test/source/basic/handling_mode.slt @@ -0,0 +1,136 @@ +statement error unrecognized +create table t ( + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mode = 'mili'); + +# spelling error on config name would raise a non-fatal notice and use the default +statement ok +create table plain_guess ( + "case" varchar, + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mod = 'mili'); + +statement ok +create table plain_milli ( + "case" varchar, + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mode = 'milli'); + +statement ok +create table plain_micro ( + "case" varchar, + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mode = 'micro'); + +statement ok +create table plain_utc ( + "case" varchar, + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mode = 'utc_string'); + +statement ok +create table plain_naive ( + "case" varchar, + payload struct>) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format plain encode json (timestamptz.handling.mode = 'utc_without_suffix'); + +statement ok +create table debezium_milli ( + "case" varchar, at timestamptz, primary key("case")) +with ( + connector = 'kafka', + properties.bootstrap.server='message_queue:29092', + topic = 'json_timestamptz_handling_mode') +format debezium encode json (timestamptz.handling.mode = 'milli'); + +sleep 2s + +query TT +select "case", (payload).after.at from plain_guess order by 1; +---- +0 number small 1970-01-01 00:01:40+00:00 +1 number recent 2024-04-11 02:00:00.123456+00:00 +2 string utc 2024-04-11 02:00:00.654321+00:00 +3 string naive NULL + +query TT +select "case", (payload).after.at from plain_milli order by 1; +---- +0 number small 1970-01-01 00:00:00.100+00:00 +1 number recent 56246-07-01 08:02:03.456+00:00 +2 string utc 2024-04-11 02:00:00.654321+00:00 +3 string naive NULL + +query TT +select "case", (payload).after.at from plain_micro order by 1; +---- +0 number small 1970-01-01 00:00:00.000100+00:00 +1 number recent 2024-04-11 02:00:00.123456+00:00 +2 string utc 2024-04-11 02:00:00.654321+00:00 +3 string naive NULL + +query TT +select "case", (payload).after.at from plain_utc order by 1; +---- +0 number small NULL +1 number recent NULL +2 string utc 2024-04-11 02:00:00.654321+00:00 +3 string naive NULL + +query TT +select "case", (payload).after.at from plain_naive order by 1; +---- +0 number small NULL +1 number recent NULL +2 string utc NULL +3 string naive 2024-04-11 02:00:00.234321+00:00 + +query TT +select "case", at from debezium_milli order by 1; +---- +0 number small 1970-01-01 00:00:00.100+00:00 +1 number recent 56246-07-01 08:02:03.456+00:00 +2 string utc 2024-04-11 02:00:00.654321+00:00 +3 string naive NULL + +statement ok +drop table plain_guess; + +statement ok +drop table plain_milli; + +statement ok +drop table plain_micro; + +statement ok +drop table plain_utc; + +statement ok +drop table plain_naive; + +statement ok +drop table debezium_milli; diff --git a/scripts/source/test_data/json_timestamptz_handling_mode.1 b/scripts/source/test_data/json_timestamptz_handling_mode.1 new file mode 100644 index 0000000000000..4ff1440b47f29 --- /dev/null +++ b/scripts/source/test_data/json_timestamptz_handling_mode.1 @@ -0,0 +1,4 @@ +{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}} +{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}} +{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}} +{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}} diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index eff6f310bc54b..f0527e119086a 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -77,6 +77,7 @@ fn create_parser( key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Plain, }; diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 1903c00a3b96b..7fb497c399c61 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -21,6 +21,7 @@ use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; use crate::error::ConnectorResult; use crate::extract_key_config; use crate::parser::unified::debezium::DebeziumChangeEvent; +use crate::parser::unified::json::TimestamptzHandling; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties, @@ -69,8 +70,12 @@ async fn build_accessor_builder( DebeziumAvroAccessBuilder::new(config, encoding_type)?, )) } - EncodingProperties::Json(_) => Ok(AccessBuilderImpl::DebeziumJson( - DebeziumJsonAccessBuilder::new()?, + EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson( + DebeziumJsonAccessBuilder::new( + json_config + .timestamptz_handling + .unwrap_or(TimestamptzHandling::GuessNumberUnit), + )?, )), EncodingProperties::Protobuf(_) => { Ok(AccessBuilderImpl::new_default(config, encoding_type).await?) @@ -111,6 +116,7 @@ impl DebeziumParser { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; @@ -216,6 +222,7 @@ mod tests { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index f61545a5443fc..72c1688d95e75 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -20,18 +20,22 @@ use simd_json::BorrowedValue; use crate::error::ConnectorResult; use crate::parser::unified::debezium::MongoJsonAccess; -use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; +use crate::parser::unified::json::{JsonAccess, JsonParseOptions, TimestamptzHandling}; use crate::parser::unified::AccessImpl; use crate::parser::AccessBuilder; #[derive(Debug)] pub struct DebeziumJsonAccessBuilder { value: Option>, + json_parse_options: JsonParseOptions, } impl DebeziumJsonAccessBuilder { - pub fn new() -> ConnectorResult { - Ok(Self { value: None }) + pub fn new(timestamptz_handling: TimestamptzHandling) -> ConnectorResult { + Ok(Self { + value: None, + json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling), + }) } } @@ -51,7 +55,7 @@ impl AccessBuilder for DebeziumJsonAccessBuilder { Ok(AccessImpl::Json(JsonAccess::new_with_options( payload, - &JsonParseOptions::DEBEZIUM, + &self.json_parse_options, ))) } } @@ -59,11 +63,17 @@ impl AccessBuilder for DebeziumJsonAccessBuilder { #[derive(Debug)] pub struct DebeziumMongoJsonAccessBuilder { value: Option>, + json_parse_options: JsonParseOptions, } impl DebeziumMongoJsonAccessBuilder { pub fn new() -> anyhow::Result { - Ok(Self { value: None }) + Ok(Self { + value: None, + json_parse_options: JsonParseOptions::new_for_debezium( + TimestamptzHandling::GuessNumberUnit, + ), + }) } } @@ -82,7 +92,7 @@ impl AccessBuilder for DebeziumMongoJsonAccessBuilder { }; Ok(AccessImpl::MongoJson(MongoJsonAccess::new( - JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM), + JsonAccess::new_with_options(payload, &self.json_parse_options), ))) } } @@ -128,6 +138,7 @@ mod tests { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 47db36a0b2b8f..454e8f1d0dd56 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::ColumnDesc; use super::avro::schema_resolver::ConfluentSchemaResolver; use super::util::{bytes_from_url, get_kafka_topic}; -use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig}; +use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig}; use crate::error::ConnectorResult; use crate::only_parse_payload; use crate::parser::avro::util::avro_schema_to_column_descs; @@ -40,6 +40,7 @@ use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; pub struct JsonAccessBuilder { value: Option>, payload_start_idx: usize, + json_parse_options: JsonParseOptions, } impl AccessBuilder for JsonAccessBuilder { @@ -58,16 +59,21 @@ impl AccessBuilder for JsonAccessBuilder { value, // Debezium and Canal have their special json access builder and will not // use this - &JsonParseOptions::DEFAULT, + &self.json_parse_options, ))) } } impl JsonAccessBuilder { - pub fn new(use_schema_registry: bool) -> ConnectorResult { + pub fn new(config: JsonProperties) -> ConnectorResult { + let mut json_parse_options = JsonParseOptions::DEFAULT; + if let Some(mode) = config.timestamptz_handling { + json_parse_options.timestamptz_handling = mode; + } Ok(Self { value: None, - payload_start_idx: if use_schema_registry { 5 } else { 0 }, + payload_start_idx: if config.use_schema_registry { 5 } else { 0 }, + json_parse_options, }) } } @@ -595,6 +601,7 @@ mod tests { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Upsert, }; diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 45bb4dfbc0dc8..d018ef73b0506 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -36,6 +36,7 @@ mod tests { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Maxwell, }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c4cae0036fda9..eb7df338bcd3e 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -45,6 +45,7 @@ pub use self::mysql::mysql_row_to_owned_row; use self::plain_parser::PlainParser; pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; +pub use self::unified::json::TimestamptzHandling; use self::unified::AccessImpl; use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; @@ -876,7 +877,7 @@ impl AccessBuilderImpl { AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?) } EncodingProperties::Json(config) => { - AccessBuilderImpl::Json(JsonAccessBuilder::new(config.use_schema_registry)?) + AccessBuilderImpl::Json(JsonAccessBuilder::new(config)?) } _ => unreachable!(), }; @@ -1006,6 +1007,7 @@ impl SpecificParserConfig { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), protocol_config: ProtocolProperties::Plain, }; @@ -1046,6 +1048,7 @@ pub struct CsvProperties { #[derive(Debug, Default, Clone)] pub struct JsonProperties { pub use_schema_registry: bool, + pub timestamptz_handling: Option, } #[derive(Debug, Default, Clone)] @@ -1200,10 +1203,14 @@ impl SpecificParserConfig { SourceEncode::Json, ) => EncodingProperties::Json(JsonProperties { use_schema_registry: info.use_schema_registry, + timestamptz_handling: TimestamptzHandling::from_options( + &info.format_encode_options, + )?, }), (SourceFormat::DebeziumMongo, SourceEncode::Json) => { EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }) } (SourceFormat::Plain, SourceEncode::Bytes) => { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 4dfc59c1a1aa6..b50f0d57645fe 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -14,6 +14,7 @@ use risingwave_common::bail; +use super::unified::json::TimestamptzHandling; use super::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, SourceStreamChunkRowWriter, SpecificParserConfig, @@ -66,7 +67,7 @@ impl PlainParser { }; let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( - DebeziumJsonAccessBuilder::new()?, + DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?, )); Ok(Self { key_builder, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index a765f333ef314..ae3da8de4dc3b 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -34,6 +34,7 @@ use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult}; use crate::parser::common::json_object_get_case_insensitive; use crate::parser::unified::avro::extract_decimal; +use crate::schema::{bail_invalid_option_error, InvalidOptionError}; #[derive(Clone, Debug)] pub enum ByteaHandling { @@ -46,6 +47,44 @@ pub enum TimeHandling { Milli, Micro, } + +#[derive(Clone, Debug)] +pub enum TimestamptzHandling { + /// `"2024-04-11T02:00:00.123456Z"` + UtcString, + /// `"2024-04-11 02:00:00.123456"` + UtcWithoutSuffix, + /// `1712800800123` + Milli, + /// `1712800800123456` + Micro, + /// Both `1712800800123` (ms) and `1712800800123456` (us) maps to `2024-04-11`. + /// + /// Only works for `[1973-03-03 09:46:40, 5138-11-16 09:46:40)`. + /// + /// This option is backward compatible. + GuessNumberUnit, +} + +impl TimestamptzHandling { + pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; + + pub fn from_options( + options: &std::collections::BTreeMap, + ) -> Result, InvalidOptionError> { + let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { + Some("utc_string") => Self::UtcString, + Some("utc_without_suffix") => Self::UtcWithoutSuffix, + Some("micro") => Self::Micro, + Some("milli") => Self::Milli, + Some("guess_number_unit") => Self::GuessNumberUnit, + Some(v) => bail_invalid_option_error!("unrecognized {} value {}", Self::OPTION_KEY, v), + None => return Ok(None), + }; + Ok(Some(mode)) + } +} + #[derive(Clone, Debug)] pub enum JsonValueHandling { AsValue, @@ -95,6 +134,7 @@ pub enum StructHandling { pub struct JsonParseOptions { pub bytea_handling: ByteaHandling, pub time_handling: TimeHandling, + pub timestamptz_handling: TimestamptzHandling, pub json_value_handling: JsonValueHandling, pub numeric_handling: NumericHandling, pub boolean_handling: BooleanHandling, @@ -113,6 +153,7 @@ impl JsonParseOptions { pub const CANAL: JsonParseOptions = JsonParseOptions { bytea_handling: ByteaHandling::Standard, time_handling: TimeHandling::Micro, + timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible json_value_handling: JsonValueHandling::AsValue, numeric_handling: NumericHandling::Relax { string_parsing: true, @@ -125,24 +166,10 @@ impl JsonParseOptions { struct_handling: StructHandling::Strict, ignoring_keycase: true, }; - pub const DEBEZIUM: JsonParseOptions = JsonParseOptions { - bytea_handling: ByteaHandling::Base64, - time_handling: TimeHandling::Micro, - json_value_handling: JsonValueHandling::AsString, - numeric_handling: NumericHandling::Relax { - string_parsing: false, - }, - boolean_handling: BooleanHandling::Relax { - string_parsing: false, - string_integer_parsing: false, - }, - varchar_handling: VarcharHandling::Strict, - struct_handling: StructHandling::Strict, - ignoring_keycase: true, - }; pub const DEFAULT: JsonParseOptions = JsonParseOptions { bytea_handling: ByteaHandling::Standard, time_handling: TimeHandling::Micro, + timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible json_value_handling: JsonValueHandling::AsValue, numeric_handling: NumericHandling::Relax { string_parsing: true, @@ -153,6 +180,25 @@ impl JsonParseOptions { ignoring_keycase: true, }; + pub fn new_for_debezium(timestamptz_handling: TimestamptzHandling) -> Self { + Self { + bytea_handling: ByteaHandling::Base64, + time_handling: TimeHandling::Micro, + timestamptz_handling, + json_value_handling: JsonValueHandling::AsString, + numeric_handling: NumericHandling::Relax { + string_parsing: false, + }, + boolean_handling: BooleanHandling::Relax { + string_parsing: false, + string_integer_parsing: false, + }, + varchar_handling: VarcharHandling::Strict, + struct_handling: StructHandling::Strict, + ignoring_keycase: true, + } + } + pub fn parse( &self, value: &BorrowedValue<'_>, @@ -436,17 +482,34 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into(), // ---- Timestamptz ----- - (Some(DataType::Timestamptz), ValueType::String) => value - .as_str() - .unwrap() - .parse::() - .map_err(|_| create_error())? - .into(), + (Some(DataType::Timestamptz), ValueType::String) => match self.timestamptz_handling { + TimestamptzHandling::UtcWithoutSuffix => value + .as_str() + .unwrap() + .parse::() + .map(|naive_utc| Timestamptz::from_micros(naive_utc.0.timestamp_micros())) + .map_err(|_| create_error())? + .into(), + // Unless explicitly requested `utc_without_utc`, we parse string with `YYYY-MM-DDTHH:MM:SSZ`. + _ => value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into(), + } ( Some(DataType::Timestamptz), ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => i64_to_timestamptz(value.as_i64().unwrap()) - .map_err(|_| create_error())? + ) => value.as_i64() + .and_then(|num| match self.timestamptz_handling { + TimestamptzHandling::GuessNumberUnit => i64_to_timestamptz(num).ok(), + TimestamptzHandling::Micro => Some(Timestamptz::from_micros(num)), + TimestamptzHandling::Milli => Timestamptz::from_millis(num), + // When explicitly requested string format, number without units are rejected. + TimestamptzHandling::UtcString | TimestamptzHandling::UtcWithoutSuffix => None, + }) + .ok_or_else(create_error)? .into(), // ---- Interval ----- (Some(DataType::Interval), ValueType::String) => { diff --git a/src/connector/src/schema/mod.rs b/src/connector/src/schema/mod.rs index 28151e60895e9..585dd43fa8bf1 100644 --- a/src/connector/src/schema/mod.rs +++ b/src/connector/src/schema/mod.rs @@ -30,7 +30,7 @@ const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; #[derive(Debug, thiserror::Error, thiserror_ext::Macro)] #[error("Invalid option: {message}")] pub struct InvalidOptionError { - message: String, + pub message: String, // #[backtrace] // source: Option, } diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 1c05c6b4ffc8f..5716631dff620 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -276,6 +276,7 @@ mod tests { protocol_config: ProtocolProperties::Plain, encoding_config: EncodingProperties::Json(crate::parser::JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), key_encoding_config: None, }, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 301ce2d9a8969..48f49695ae159 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -32,7 +32,7 @@ use risingwave_connector::parser::additional_columns::{ }; use risingwave_connector::parser::{ schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig, - SpecificParserConfig, DEBEZIUM_IGNORE_KEY, + SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY, }; use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, @@ -432,6 +432,21 @@ pub(crate) async fn bind_columns_from_source( Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium, Encode::Json, ) => { + if matches!( + source_schema.format, + Format::Plain | Format::Upsert | Format::Debezium + ) { + // Parse the value but throw it away. + // It would be too late to report error in `SpecificParserConfig::new`, + // which leads to recovery loop. + TimestamptzHandling::from_options(&format_encode_options_to_consume) + .map_err(|err| InvalidInputSyntax(err.message))?; + try_consume_string_from_options( + &mut format_encode_options_to_consume, + TimestamptzHandling::OPTION_KEY, + ); + } + let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; stream_source_info.use_schema_registry = json_schema_infer_use_schema_registry(&schema_config); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 7218a6947de12..3956efc4218b1 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -607,6 +607,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, + timestamptz_handling: None, }), // the cdc message is generated internally so the key must exist. protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),