-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: use include ... as ...
to ingest more message parts
#79
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
--- | ||
feature: New Syntax Include Key As | ||
authors: | ||
- "TabVersion" | ||
start_date: "2023/11/22" | ||
--- | ||
|
||
# New Syntax: Include \<column\> as \<new name\> | ||
|
||
## Background | ||
|
||
> This RFC is a follow-up to [RFC-0064](./0064-new-source-ddl.md). | ||
|
||
In previous implementation, we only load message key into a [~~hidden~~](https://github.com/risingwavelabs/risingwave/pull/13521) | ||
column called `_rw_key` as `bytea` on default. | ||
|
||
Here are some cases when RisingWave derives the `_rw_key` column. | ||
|
||
* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic. | ||
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of | ||
the message key. This behavior is more like `format plain encode json` with PK constraint. | ||
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception. | ||
* For all connectors with `format plain`, RisingWave derives the `_rw_key` column as a normal one | ||
([ref](https://github.com/risingwavelabs/risingwave/pull/13278)) at a POC request. This enforces an additional column | ||
for all tables, which is not ideal. | ||
|
||
Aside from message keys, users may want to ingest the timestamp as well as the topic headers into the table for further | ||
analysis. | ||
|
||
This RFC proposes a new syntax to allow users to specify the column name for message components other than payload. | ||
|
||
## Proposed Syntax | ||
|
||
```sql | ||
create source/table ( | ||
.., | ||
primary key ( <key-column> ) | ||
) | ||
[ include key [ as <key-column> ] ] | ||
[ include timestamp [ as <timestamp-column>] ] | ||
[ include header [as <header-column>] ] | ||
[ include ... [ as ... ] ] | ||
xxchan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with ( ... ) | ||
format ... encode ... ( ... ) | ||
[ key encode [ ( ... ) ] ] | ||
``` | ||
|
||
* If `as` name is not specified, a connector-component naming template will be applied | ||
* For connector kafka and component key, the derived message key column name is `_rw_kafka_key`. | ||
* The default type for message key column is `bytea`. The priority of the type definition is: | ||
`key encode` > infer from `format ... encode ...` > default type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What dose it mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there is a schema registry and we can infer the key schema from it. |
||
* **Important**: `include key` is required for `format upsert` and RisingWave will use the key column as one and | ||
only primary key to perform upsert semantic. It does not allow to specify multiple columns as primary key | ||
even if they are part of the key. | ||
Comment on lines
+52
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So,
Right? |
||
|
||
## Specifications | ||
|
||
### Kafka | ||
|
||
| Allowed Components | Default Type | Note | | ||
|--------------------|--------------------------------------------|---------------------------------------------------| | ||
| key | `bytea` | Allow overwritten by `encode` and `key encode` | | ||
| timestamp | `timestamp with time zone` (i64 in millis) | Refer to `CreateTime` rather than `LogAppendTime` | | ||
| partition | `varchar` | The message is from which partition | | ||
| offset | `varchar` | The offset in the partition | | ||
| header | `struct<varchar, bytea>[]` | KV pairs along with message | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plural: |
||
|
||
### Pulsar | ||
|
||
| Allowed Components | Default Type | Note | | ||
|--------------------|--------------|-------------------------------------------------------------------------------------------| | ||
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `MessageMetadata::partition_key` | | ||
| partition | `varchar` | The message is from which partition | | ||
| offset | `varchar` | The offset in the partition | | ||
|
||
More components are available at [here](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we decide which metadata fields to expose? Do we intend to expose similar (not sure if same) concepts from different connectors using the same word? For example
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it's not necessary to unify them. If there is ambiguity, I tend to let them have different names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: now we do want to unify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we are going to unify the partition and offset column type for all connectors. |
||
|
||
### Kinesis | ||
|
||
| Allowed Components | Default Type | Note | | ||
|--------------------|------------------------------------------|----------------------------------------------------------------------------------| | ||
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `Record::partition_key` | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: partition key in kinesis is always a unicode string There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I make it |
||
| timestamp | `timestamp with time zone` (from chrono) | refer to `Record::approximate_arrival_timestamp` | | ||
| partition | `varchar` | The message is from which partition | | ||
| offset | `varchar` | The offset in the partition. Corresponds to Kinesis sequence numbers. | | ||
|
||
More components are available at [here](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/types/struct.Record.html). | ||
|
||
### S3/GCS | ||
|
||
| Allowed Components | Default Type | Note | | ||
|--------------------|--------------|-----------------------------------| | ||
| file | `varchar` | The record comes from which file. | | ||
| offset | `varchar` | The offset in the file | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even more, We need a method to define pk from the part of the whole json message key. Not sure if it can be the default behavior when user defining the primary key constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, according to our previous discussion, this will be
format insert
+ normal PK definition i.e.primary key (foo, bar)
.The exception looks very inconsistent and I'd like to get rid of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some cases, user want to determine the primary key of the table for some use cases such as
And in those cases, user can make sure the primary keys column is exactly the message key in Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
format upsert
is specific to MQ with delete tombsonte (null value), I think the PK must be the MQ message key and cannot be a field in the value.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either of the following behavior is acceptable:
1 corresponds to
upsert
semantics, while 2 corresponds toplain
(aka.insert
) semantics. Whether to consider empty message as delete tomb is the key difference betweenupsert
andplain
.Example: