-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): encode json
option timestamptz.handling.mode
#16265
Conversation
@@ -66,7 +67,7 @@ impl PlainParser { | |||
}; | |||
|
|||
let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( | |||
DebeziumJsonAccessBuilder::new()?, | |||
DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@StrikeW Why do we have DebeziumJsonAccessBuilder
as part of PlainParser
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shared cdc source use the plain parser, and to parse transaction metadata, we need a Debezium parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it is system metadata I believe [1973-03-03 09:46:40, 5138-11-16 09:46:40)
is enough. Additionally, the structure of the metadata may be static rather than dynamic, so maybe we can derive(serde::Deserialize)
rather than accessing a json Value
. But this can be a separate issue.
@@ -607,6 +607,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { | |||
key_encoding_config: None, | |||
encoding_config: EncodingProperties::Json(JsonProperties { | |||
use_schema_registry: false, | |||
timestamptz_handling: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@StrikeW Is it possible timestamptz here is always one of micro
/milli
/utc_string
? Being specific can read better than the legacy default guess_number_unit
, but if you are unsure now, at least the current implementation is semantically equivalent as previous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is possible. But we need to implement a CustomConverter to enforce that. The default behavior is up to the precision of upstream data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So this code path would remain limited by [1973-03-03 09:46:40, 5138-11-16 09:46:40)
.
The approach is LGTM since the guessing one can be wrong sometimes. Just wonder what we do if the schema contains multiple timestamptz fields and they have different units? 😇 |
When there are multiple timestamptz fields with different units, a per-column
See #16097 (comment) for a concrete example of This PR supports the 3rd use case and we would support the former 2 kafka connect |
@@ -66,7 +67,7 @@ impl PlainParser { | |||
}; | |||
|
|||
let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( | |||
DebeziumJsonAccessBuilder::new()?, | |||
DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shared cdc source use the plain parser, and to parse transaction metadata, we need a Debezium parser.
@@ -607,6 +607,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { | |||
key_encoding_config: None, | |||
encoding_config: EncodingProperties::Json(JsonProperties { | |||
use_schema_registry: false, | |||
timestamptz_handling: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is possible. But we need to implement a CustomConverter to enforce that. The default behavior is up to the precision of upstream data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, do we have plan on publishing other params in JsonParseOptions
Yes. And unifying them with sink ones (similar yet with minor differences). But it does seem |
… (#16281) Co-authored-by: xiangjinwu <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Similar to sink, we allow specifying timestamptz number units:
micro
milli
guess_number_unit
: This has been the default, but limits the range to[1973-03-03 09:46:40, 5138-11-16 09:46:40)
utc_string
: This format is least ambiguous and can mostly be inferred correctly without the need to be specified.utc_without_suffix
: This allows the user to acknowledge naive timestamp is in UTC rather than local time.Note the corresponding
format
has to be one ofplain | upsert | debezium
. The followingformat
s do NOT accept this option as of now:debezium_mongo
canal
maxwell
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
https://docs.risingwave.com/docs/current/supported-sources-and-formats/#debezium-json
When using
format plain | upsert | debezium encode json
(but notformat debezium_mongo | canal | maxwell encode json
), there is a new optiontimestamptz.handling.mode
. There are 5 possible choices. See description above for the meaning of each one.