Skip to content

Commit

Permalink
feat(source): encode json option timestamptz.handling.mode (#16265)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and xiangjinwu committed Apr 12, 2024
1 parent 00cb2cc commit b7e9e46
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 39 deletions.
136 changes: 136 additions & 0 deletions e2e_test/source/basic/handling_mode.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

# spelling error on config name would raise a non-fatal notice and use the default
statement ok
create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

statement ok
create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

statement ok
create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

statement ok
create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

statement ok
create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

sleep 2s

query TT
select "case", (payload).after.at from plain_guess order by 1;
----
0 number small 1970-01-01 00:01:40+00:00
1 number recent 2024-04-11 02:00:00.123456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
select "case", (payload).after.at from plain_milli order by 1;
----
0 number small 1970-01-01 00:00:00.100+00:00
1 number recent 56246-07-01 08:02:03.456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
select "case", (payload).after.at from plain_micro order by 1;
----
0 number small 1970-01-01 00:00:00.000100+00:00
1 number recent 2024-04-11 02:00:00.123456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
select "case", (payload).after.at from plain_utc order by 1;
----
0 number small NULL
1 number recent NULL
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
select "case", (payload).after.at from plain_naive order by 1;
----
0 number small NULL
1 number recent NULL
2 string utc NULL
3 string naive 2024-04-11 02:00:00.234321+00:00

query TT
select "case", at from debezium_milli order by 1;
----
0 number small 1970-01-01 00:00:00.100+00:00
1 number recent 56246-07-01 08:02:03.456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

statement ok
drop table plain_guess;

statement ok
drop table plain_milli;

statement ok
drop table plain_micro;

statement ok
drop table plain_utc;

statement ok
drop table plain_naive;

statement ok
drop table debezium_milli;
4 changes: 4 additions & 0 deletions scripts/source/test_data/json_timestamptz_handling_mode.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
1 change: 1 addition & 0 deletions src/connector/benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ fn create_parser(
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Plain,
};
Expand Down
11 changes: 9 additions & 2 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::extract_key_config;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
Expand Down Expand Up @@ -69,8 +70,12 @@ async fn build_accessor_builder(
DebeziumAvroAccessBuilder::new(config, encoding_type)?,
))
}
EncodingProperties::Json(_) => Ok(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new()?,
EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new(
json_config
.timestamptz_handling
.unwrap_or(TimestamptzHandling::GuessNumberUnit),
)?,
)),
EncodingProperties::Protobuf(_) => {
Ok(AccessBuilderImpl::new_default(config, encoding_type).await?)
Expand Down Expand Up @@ -111,6 +116,7 @@ impl DebeziumParser {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down Expand Up @@ -216,6 +222,7 @@ mod tests {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
23 changes: 17 additions & 6 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ use simd_json::BorrowedValue;

use crate::error::ConnectorResult;
use crate::parser::unified::debezium::MongoJsonAccess;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions, TimestamptzHandling};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;

#[derive(Debug)]
pub struct DebeziumJsonAccessBuilder {
value: Option<Vec<u8>>,
json_parse_options: JsonParseOptions,
}

impl DebeziumJsonAccessBuilder {
pub fn new() -> ConnectorResult<Self> {
Ok(Self { value: None })
pub fn new(timestamptz_handling: TimestamptzHandling) -> ConnectorResult<Self> {
Ok(Self {
value: None,
json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling),
})
}
}

Expand All @@ -51,19 +55,25 @@ impl AccessBuilder for DebeziumJsonAccessBuilder {

Ok(AccessImpl::Json(JsonAccess::new_with_options(
payload,
&JsonParseOptions::DEBEZIUM,
&self.json_parse_options,
)))
}
}

#[derive(Debug)]
pub struct DebeziumMongoJsonAccessBuilder {
value: Option<Vec<u8>>,
json_parse_options: JsonParseOptions,
}

impl DebeziumMongoJsonAccessBuilder {
pub fn new() -> anyhow::Result<Self> {
Ok(Self { value: None })
Ok(Self {
value: None,
json_parse_options: JsonParseOptions::new_for_debezium(
TimestamptzHandling::GuessNumberUnit,
),
})
}
}

Expand All @@ -82,7 +92,7 @@ impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
};

Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM),
JsonAccess::new_with_options(payload, &self.json_parse_options),
)))
}
}
Expand Down Expand Up @@ -128,6 +138,7 @@ mod tests {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
15 changes: 11 additions & 4 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::util::{bytes_from_url, get_kafka_topic};
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::error::ConnectorResult;
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
Expand All @@ -40,6 +40,7 @@ use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
pub struct JsonAccessBuilder {
value: Option<Vec<u8>>,
payload_start_idx: usize,
json_parse_options: JsonParseOptions,
}

impl AccessBuilder for JsonAccessBuilder {
Expand All @@ -58,16 +59,21 @@ impl AccessBuilder for JsonAccessBuilder {
value,
// Debezium and Canal have their special json access builder and will not
// use this
&JsonParseOptions::DEFAULT,
&self.json_parse_options,
)))
}
}

impl JsonAccessBuilder {
pub fn new(use_schema_registry: bool) -> ConnectorResult<Self> {
pub fn new(config: JsonProperties) -> ConnectorResult<Self> {
let mut json_parse_options = JsonParseOptions::DEFAULT;
if let Some(mode) = config.timestamptz_handling {
json_parse_options.timestamptz_handling = mode;
}
Ok(Self {
value: None,
payload_start_idx: if use_schema_registry { 5 } else { 0 },
payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
json_parse_options,
})
}
}
Expand Down Expand Up @@ -595,6 +601,7 @@ mod tests {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Upsert,
};
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod tests {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Maxwell,
};
Expand Down
9 changes: 8 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub use self::mysql::mysql_row_to_owned_row;
use self::plain_parser::PlainParser;
pub use self::postgres::postgres_row_to_owned_row;
use self::simd_json_parser::DebeziumJsonAccessBuilder;
pub use self::unified::json::TimestamptzHandling;
use self::unified::AccessImpl;
use self::upsert_parser::UpsertParser;
use self::util::get_kafka_topic;
Expand Down Expand Up @@ -876,7 +877,7 @@ impl AccessBuilderImpl {
AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?)
}
EncodingProperties::Json(config) => {
AccessBuilderImpl::Json(JsonAccessBuilder::new(config.use_schema_registry)?)
AccessBuilderImpl::Json(JsonAccessBuilder::new(config)?)
}
_ => unreachable!(),
};
Expand Down Expand Up @@ -1006,6 +1007,7 @@ impl SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Plain,
};
Expand Down Expand Up @@ -1046,6 +1048,7 @@ pub struct CsvProperties {
#[derive(Debug, Default, Clone)]
pub struct JsonProperties {
pub use_schema_registry: bool,
pub timestamptz_handling: Option<TimestamptzHandling>,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -1200,10 +1203,14 @@ impl SpecificParserConfig {
SourceEncode::Json,
) => EncodingProperties::Json(JsonProperties {
use_schema_registry: info.use_schema_registry,
timestamptz_handling: TimestamptzHandling::from_options(
&info.format_encode_options,
)?,
}),
(SourceFormat::DebeziumMongo, SourceEncode::Json) => {
EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
})
}
(SourceFormat::Plain, SourceEncode::Bytes) => {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use risingwave_common::bail;

use super::unified::json::TimestamptzHandling;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
Expand Down Expand Up @@ -66,7 +67,7 @@ impl PlainParser {
};

let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new()?,
DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?,
));
Ok(Self {
key_builder,
Expand Down
Loading

0 comments on commit b7e9e46

Please sign in to comment.