Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow kafka sink to encode as PLAIN AVRO #16912

Closed
maingoh opened this issue May 23, 2024 · 8 comments
Closed

Allow kafka sink to encode as PLAIN AVRO #16912

maingoh opened this issue May 23, 2024 · 8 comments

Comments

@maingoh
Copy link

maingoh commented May 23, 2024

Is your feature request related to a problem? Please describe.

I would like to export the results of a materialized view into kafka with the avro format but it looks like it is not possible. Is it much different than with PLAIN JSON ?

Bind error: connector kafka does not support format Plain with encode Avro

Describe the solution you'd like

Be able to do:

{{ config(materialized="sink") }}
CREATE SINK {{ sink }} FROM {{ mv }} WITH (
  connector = 'kafka',
  topic = '{{ sink_topic }}',
  ...
) FORMAT PLAIN ENCODE AVRO (
  ...
);

Describe alternatives you've considered

No response

Additional context

No response

@github-actions github-actions bot added this to the release-1.10 milestone May 23, 2024
@xiangjinwu xiangjinwu self-assigned this May 24, 2024
@xiangjinwu
Copy link
Contributor

It is doable. Just to confirm:

  • Is your avro schema definition stored in confluent schema registry (or one compatible with it, like redpanda or upstash)?
  • Are you going to use (a) a subset of output columns as avro-encoded kafka key (b) a single simple output column as string/text kafka key (c) no kafka key and round-robin for partition?

@maingoh
Copy link
Author

maingoh commented May 24, 2024

Thank you for your quick reply ! I might have misunderstood a bit the UPSERT keyword. It didn't really make sense to me as kafka is append only. So I tried using an UPSERT AVRO sink and it actually append the message with the same primary key, which is the behavior I want but It looks more like a PLAIN AVRO to me.

To answer your questions:

  • Yes it is the case, however it should not be mandatory in both case PLAIN and UPSERT no ?
  • I actually use an "id" column from my materialized view as key. Though it should not be mandatory neither ?

@tabVersion
Copy link
Contributor

I might have misunderstood a bit the UPSERT keyword. It didn't really make sense to me as kafka is append only. So I tried using an UPSERT AVRO sink and it actually append the message with the same primary key, which is the behavior I want but It looks more like a PLAIN AVRO to me.

Yes, it is the tricky part. If the Kafka's key is aligned with the downstream's pk, it is upsert format in RisingWave. If the Kafka's key is not the downstream's pk, just for doing partition, it is append-only format with a specified key in RisingWave. For Kafka itself, the two methods look the same.
Besides, Kafka has a "compaction" func working in the broker, if two messages have the same message key, it will delete the prior only keep the later one. So we want to make sure whether users want to do upsert.

@xiangjinwu
Copy link
Contributor

  • Yes it is the case, however it should not be mandatory in both case PLAIN and UPSERT no ?

Use of schema registry is mandatory for avro unless the schema never evolves. Unlike json or protobuf, to decode an avro message encoded with one version of schema into another compatible version, the definitions (avsc) of both versions must be available during decoding.

  • I actually use an "id" column from my materialized view as key. Though it should not be mandatory neither ?

It is not mandatory for plain as I mentioned option-c for round-robin partition. It is mandatory for upsert as @tabVersion explained above.

@maingoh
Copy link
Author

maingoh commented Jun 20, 2024

Yes, it is the tricky part. If the Kafka's key is aligned with the downstream's pk, it is upsert format in RisingWave. If the Kafka's key is not the downstream's pk, just for doing partition, it is append-only format with a specified key in RisingWave. For Kafka itself, the two methods look the same.
Besides, Kafka has a "compaction" func working in the broker, if two messages have the same message key, it will delete the prior only keep the later one. So we want to make sure whether users want to do upsert.

It looks to me that the compaction is a topic property / kafka responsability so RW should not really care if the downstream system is upserting or appending. So yes in my opinion for kafka PLAIN or UPSERT are the same (and could be optional because they don't really make sense ?).

I think my initial issue has been fixed by #17216, I am missing something ?

@xiangjinwu
Copy link
Contributor

I think my initial issue has been fixed by #17216, I am missing something ?

Yes.

It looks to me that the compaction is a topic property / kafka responsability so RW should not really care if the downstream system is upserting or appending. So yes in my opinion for kafka PLAIN or UPSERT are the same (and could be optional because they don't really make sense ?).

Yes it is kafka's responsibility and outside RisingWave, but the RisingWave options are supposed to follow its conventions. To avoid confusion let me focus on RisingWave's behavior difference:

  • When stream to be sinked contains no DELETE, there is no difference.
  • When there is DELETE, upsert writes out a message with key but no value, while plain (force_append_only = true) would skip this entry.

@maingoh
Copy link
Author

maingoh commented Jun 27, 2024

Thank you for the details @xiangjinwu ! I understand the difference now, maybe they could be highlighted in the documentation ?

@xiangjinwu
Copy link
Contributor

Yes I am working on a document refactor of the relevant pages:
https://github.com/risingwavelabs/risingwave-docs/pull/2294

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants