-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): upsert avro with schema registry #13007
Conversation
0cf2f02
to
c1d6029
Compare
Codecov Report
@@ Coverage Diff @@
## main #13007 +/- ##
==========================================
- Coverage 68.57% 68.55% -0.02%
==========================================
Files 1496 1497 +1
Lines 251405 251538 +133
==========================================
+ Hits 172389 172447 +58
- Misses 79016 79091 +75
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 4 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
This comment was marked as off-topic.
This comment was marked as off-topic.
b32cc4e
to
9158f9b
Compare
Conflicts resolved and ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
pub async fn fetch_schema( | ||
format_options: &BTreeMap<String, String>, | ||
topic: &str, | ||
) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy from the source part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They will be merged eventually.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Note that apeend-only protobuf with file / https in kafka has been supported by #12858. We will fill the support matrix gradually.
SinkFormatterImpl::new
will be refactored in a followup.topic
is used in schema registry name strategy, but does not apply to Redis.db_name
andsink_from_name
is only useful for kafka connect (incl debezium) metadata.AwsAuthProps
is only useful when loading schema definition from s3.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Supports kafka sink with upsert avro using schema registry.
Example
create sink
command can be found ine2e_test/sink/kafka/avro.slt
Same as source, the following options can be used after
format upsert encode avro
:schema.registry
schema.registry.username
schema.registry.password
schema.registry.name.strategy
(optional)key.message
(only when name strategy is set torecord_name_strategy
ortopic_record_name_strategy
)message
(only when name strategy is set torecord_name_strategy
ortopic_record_name_strategy
)