From caa060b65117433acd3b557ad115e89828ecf4e6 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Fri, 12 Jan 2024 15:40:56 +0800 Subject: [PATCH] RFC: use `include ... as ...` to ingest more message parts (#79) * new rfc * rename * Update 0079-include-key-as.md update offset and partition column type to make it consistent with existing source impl * Update rfcs/0079-include-key-as.md Co-authored-by: xxchan --------- Co-authored-by: xxchan --- rfcs/0079-include-key-as.md | 94 +++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 rfcs/0079-include-key-as.md diff --git a/rfcs/0079-include-key-as.md b/rfcs/0079-include-key-as.md new file mode 100644 index 0000000..68b02e7 --- /dev/null +++ b/rfcs/0079-include-key-as.md @@ -0,0 +1,94 @@ +--- +feature: New Syntax Include Key As +authors: + - "TabVersion" +start_date: "2023/11/22" +--- + +# New Syntax: Include \ as \ + +## Background + +> This RFC is a follow-up to [RFC-0064](./0064-new-source-ddl.md). + +In previous implementation, we only load message key into a [~~hidden~~](https://github.com/risingwavelabs/risingwave/pull/13521) +column called `_rw_key` as `bytea` on default. + +Here are some cases when RisingWave derives the `_rw_key` column. + +* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic. + * An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of + the message key. This behavior is more like `format plain encode json` with PK constraint. + * **To Be Discussed**: whether we can afford to perform a breaking change for the above exception. +* For all connectors with `format plain`, RisingWave derives the `_rw_key` column as a normal one + ([ref](https://github.com/risingwavelabs/risingwave/pull/13278)) at a POC request. This enforces an additional column + for all tables, which is not ideal. + +Aside from message keys, users may want to ingest the timestamp as well as the topic headers into the table for further +analysis. + +This RFC proposes a new syntax to allow users to specify the column name for message components other than payload. + +## Proposed Syntax + +```sql +create source/table ( + .., + primary key ( ) +) + [ include key [ as ] ] + [ include timestamp [ as ] ] + [ include header [as ] ] + [ include ... [ as ... ] ] +with ( ... ) +format ... encode ... ( ... ) +[ key encode [ ( ... ) ] ] +``` + +* If `as` name is not specified, a connector-component naming template will be applied + * For connector kafka and component key, the derived message key column name is `_rw_kafka_key`. +* The default type for message key column is `bytea`. The priority of the type definition is: + `key encode` > infer from `format ... encode ...` > default type +* **Important**: `include key` is required for `format upsert` and RisingWave will use the key column as one and + only primary key to perform upsert semantic. It does not allow to specify multiple columns as primary key + even if they are part of the key. + +## Specifications + +### Kafka + +| Allowed Components | Default Type | Note | +|--------------------|--------------------------------------------|---------------------------------------------------| +| key | `bytea` | Allow overwritten by `encode` and `key encode` | +| timestamp | `timestamp with time zone` (i64 in millis) | Refer to `CreateTime` rather than `LogAppendTime` | +| partition | `varchar` | The message is from which partition | +| offset | `varchar` | The offset in the partition | +| header | `struct[]` | KV pairs along with message | + +### Pulsar + +| Allowed Components | Default Type | Note | +|--------------------|--------------|-------------------------------------------------------------------------------------------| +| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `MessageMetadata::partition_key` | +| partition | `varchar` | The message is from which partition | +| offset | `varchar` | The offset in the partition | + +More components are available at [here](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html). + +### Kinesis + +| Allowed Components | Default Type | Note | +|--------------------|------------------------------------------|----------------------------------------------------------------------------------| +| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `Record::partition_key` | +| timestamp | `timestamp with time zone` (from chrono) | refer to `Record::approximate_arrival_timestamp` | +| partition | `varchar` | The message is from which partition | +| offset | `varchar` | The offset in the partition. Corresponds to Kinesis sequence numbers. | + +More components are available at [here](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/types/struct.Record.html). + +### S3/GCS + +| Allowed Components | Default Type | Note | +|--------------------|--------------|-----------------------------------| +| file | `varchar` | The record comes from which file. | +| offset | `varchar` | The offset in the file |