-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support new syntax FORMAT ... ENCODE ...
similar to source
#12556
Conversation
Codecov Report
@@ Coverage Diff @@
## main #12556 +/- ##
==========================================
- Coverage 69.32% 69.27% -0.05%
==========================================
Files 1469 1469
Lines 241065 241256 +191
==========================================
+ Hits 167111 167129 +18
- Misses 73954 74127 +173
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 6 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Maybe some parser test is needed?
And should we notice user when they are using legacy syntax? #12373
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
but we still need a connector x format x encode
table for compatibility check
https://github.com/singularity-data/risingwave/blob/fff6d47d897b859e1128029a3fe19b5d41a029f8/src/frontend/src/handler/create_source.rs#L853
Good point. But the new syntax is not actually effective yet. So maybe we can add them when it actually works.
As of today the old syntax compatibility check happens in compute node (
That being said, we could move/add the check to frontend as a refactor later if appropriate. |
2910cba
to
e6942fc
Compare
Major changes since the reviewed 3c0cfc9:
|
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#11451
Old syntax:
New syntax (similar to source):
SinkFormatDesc
in rust and proto to pass around this info.SinkDesc
/SinkCatalog
/SinkParam
in rust andSinkDesc
/Sink
/SinkParam
in proto with an extra optional fieldformat_desc
.SinkType
andSINK_TYPE_OPTION
withSinkFormatDesc
in kafka/kinesis/pulsar usingSinkFormatterImpl
.Next step
In
SinkFormatterImpl::new
, convertparam.format_desc
toProtobufProperties
/AvroProperties
and reuseProtobufParserConfig::new
/AvroParserConfig::new
to getprost_reflect::MessageDescriptor
/apache_avro::Schema
needed by encoders #12425Compatibility
connector
is kafka/kinesis/pulsar. We will test other connectors and migrate them one by one.SinkFormatDesc
for the old syntax.SinkDesc
/Sink
/SinkParam
,SinkFormatDesc
will also be derived from old syntax.Limitations
Some refactor / reuse is possible but not done here to limit the scope:
SourceSchemaV2
is same asSinkSchema
(easy to rename).SinkDesc
/SinkCatalog
/SinkParam
may be extracted to a struct.SourceStruct
may be absorbed intoSpecificParserConfig
, then consolidated withSinkFormatDesc
.SinkType
is similar toSinkFormat
but cannot be replaced yet:Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
For kafka/kinesis/pulsar sink, the preferred syntax is now
FORMAT [PLAIN | UPSERT | DEBEZIUM] ENCODE JSON
rather thantype = ['append-only' | 'upsert' | 'debezium']
. This reads similar to source and enables support of different encodings later. Other connectors (clickhouse, jdbc, etc) still use the oldtype =
syntax as of now.