Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: 'primary_key' is allowed by sink types other than 'upsert' #9443

Closed
neverchanje opened this issue Apr 26, 2023 · 10 comments
Closed

bug: 'primary_key' is allowed by sink types other than 'upsert' #9443

neverchanje opened this issue Apr 26, 2023 · 10 comments
Assignees

Comments

@neverchanje
Copy link
Contributor

Describe the bug

create sink invalid_pk_sink from t with (connector = 'blackhole', primary_key = 'v1');

To Reproduce

As above.

Expected behavior

We should only allow primary_key when upsert is enabled.

Additional context

No response

@github-actions
Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@neverchanje
Copy link
Contributor Author

neverchanje commented Oct 11, 2023

This bug still persists in the current version. Could you help with the fix? @jetjinser

@xiangjinwu
Copy link
Contributor

As far as I can tell from digging into the actual code implementation, there are 4 possible states of type = for a sink today:

  • upsert, which requires primary key.
  • Unspecified or specified to an unrecognized value. This is the case for the example statement above (blackhole), as well as elasticsearch sink. It will actually default the SinkType enum to upsert (src). This is likely a bug and we would require format xxx soon under the new syntax. We need to clarify the actual behavior, connector by connector, for example:
    • blackhole seems to act as if append-only, when unspecified. Then primary_key is unnecessary.
    • elasticsearch seems to act as if upsert, when unspecificed. Then primary_key should actually be required.
  • debezium, which also requires primary key (src). Due to the same logic above, its SinkType is still upsert.
  • append-only. In the current implementation, providing primary_key does make a difference:
    • Without primary_key, the all kafka messages would have key = {} (empty json object) and they would be sent to same kafka partition, leading to data skew.
    • With primary_key, the values of these columns would be included in kafka key, which is then hashed to decide which kafka partition a message should write to.
    • Practically, empty primary_key is allowed but is bad practice right now. We shall first fix the behavior to round-robin before disallowing primary_key here. Or do we still want to give user the ability to select partition based on some columns, even when the sink is append-only?

@neverchanje @tabVersion Could you help clarify what should we do in each cases?

@tabVersion
Copy link
Contributor

Here are my thoughts.

  • connector = 'es' only works with format upsert encode native, and I think it always works in an upsert way so primary_key is required.
  • connector = 'blackhole' does nothing (works with format append_only encode native). I am ok with seeing it as append-only. So we should reject primary_key in this case, jusr for blackhole sink.
  • connector = mq and format debezium encode json/avro/protobuf is another case of upsert and primary_key is required.

Overall, we are going to require primary_key for upsert and debezium format but I want to make it optional for append-only format. I've seen requirements that user just wants to partition the data by some column and we can offer the ability without effort.

Without primary_key, the all kafka messages would have key = {} (empty json object) and they would be sent to same kafka partition, leading to data skew.

Not true, Kafka will use Round Robin for these messages and I don't think there is data skew.

@xiangjinwu
Copy link
Contributor

Not true, Kafka will use Round Robin for these messages and I don't think there is data skew.

Kafka would use Round Robin when key is unset. But our current implementation always sets key = {} (2 bytes). This is a minor implementation bug we need to fix.

@tabVersion
Copy link
Contributor

Not true, Kafka will use Round Robin for these messages and I don't think there is data skew.

Kafka would use Round Robin when key is unset. But our current implementation always sets key = {} (2 bytes). This is a minor implementation bug we need to fix.

OMG, that's a bug indeed.

@xiangjinwu
Copy link
Contributor

Seems to be a misunderstanding because #9768 was not done.

@neverchanje
Copy link
Contributor Author

neverchanje commented Oct 17, 2023

I've seen requirements that user just wants to partition the data by some column and we can offer the ability without effort.

In that case, the user should use the upsert mode + primary_key, right? Does it lead to a different behavior with append_only + primary_key?

Overall, I think when the mode is upsert or debezium, the primary_key should be set. It should be unset when the mode is otherwise.

@fuyufjh fuyufjh removed this from the release-1.4 milestone Nov 8, 2023
@fuyufjh
Copy link
Member

fuyufjh commented Nov 9, 2023

Not true, Kafka will use Round Robin for these messages and I don't think there is data skew.

Kafka would use Round Robin when key is unset. But our current implementation always sets key = {} (2 bytes). This is a minor implementation bug we need to fix.

OMG, that's a bug indeed.

Let's follow this?

https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L28-L32

 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.

@fuyufjh
Copy link
Member

fuyufjh commented Nov 9, 2023

Seems nothing else to do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants