Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: use include ... as ... to ingest more message parts #79

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions rfcs/0079-include-key-as.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
feature: New Syntax Include Key As
authors:
- "TabVersion"
start_date: "2023/11/22"
---

# New Syntax: Include \<column\> as \<new name\>

## Background

> This RFC is a follow-up to [RFC-0064](./0064-new-source-ddl.md).

In previous implementation, we only load message key into a [~~hidden~~](https://github.com/risingwavelabs/risingwave/pull/13521)
column called `_rw_key` as `bytea` on default.

Here are some cases when RisingWave derives the `_rw_key` column.

* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic.
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of
the message key. This behavior is more like `format plain encode json` with PK constraint.
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even more, We need a method to define pk from the part of the whole json message key. Not sure if it can be the default behavior when user defining the primary key constraint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • An exception: for format upsert encode json, RisingWave allows the PK to be part of the message payload instead of the message key. This behavior is more like format plain encode json with PK constraint.

IIRC, according to our previous discussion, this will be format insert + normal PK definition i.e. primary key (foo, bar).

The exception looks very inconsistent and I'd like to get rid of it.

Copy link
Contributor

@st1page st1page Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, user want to determine the primary key of the table for some use cases such as

  • better batching/ serving performance
  • streaming temporal join

And in those cases, user can make sure the primary keys column is exactly the message key in Kafka.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If format upsert is specific to MQ with delete tombsonte (null value), I think the PK must be the MQ message key and cannot be a field in the value.

Copy link
Member

@fuyufjh fuyufjh Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either of the following behavior is acceptable:

  1. All fields in message key are used as PK, and empty message body will be considered as delete tomb
  2. All/Some fields in message key/body are used as PK, and empty message body will not be considered as delete tomb.

1 corresponds to upsert semantics, while 2 corresponds to plain(aka.insert) semantics. Whether to consider empty message as delete tomb is the key difference between upsert and plain.

Example:

/* message key:  {"a":1,"b":2} 
   message body: {"a":1,"b":2,"c":12,"d":34} */

-- case 1 --
create table t ( a int, b int, c int, d int)
format upsert
key encode json include key as pk;
/* NOTE: The output table will contain a `pk jsonb` column */

-- case 2 --
create table t ( a int, b int, c int, d int, primary key (a,b))
format plain 
[key encode json include key as pk] -- optional!

* For all connectors with `format plain`, RisingWave derives the `_rw_key` column as a normal one
([ref](https://github.com/risingwavelabs/risingwave/pull/13278)) at a POC request. This enforces an additional column
for all tables, which is not ideal.

Aside from message keys, users may want to ingest the timestamp as well as the topic headers into the table for further
analysis.

This RFC proposes a new syntax to allow users to specify the column name for message components other than payload.

## Proposed Syntax

```sql
create source/table (
..,
primary key ( <key-column> )
)
[ include key [ as <key-column> ] ]
[ include timestamp [ as <timestamp-column>] ]
[ include header [as <header-column>] ]
[ include ... [ as ... ] ]
xxchan marked this conversation as resolved.
Show resolved Hide resolved
with ( ... )
format ... encode ... ( ... )
[ key encode [ ( ... ) ] ]
```

* If `as` name is not specified, a connector-component naming template will be applied
* For connector kafka and component key, the derived message key column name is `_rw_kafka_key`.
* The default type for message key column is `bytea`. The priority of the type definition is:
`key encode` > infer from `format ... encode ...` > default type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infer from format ... encode ...

What dose it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a schema registry and we can infer the key schema from it.

* **Important**: `include key` is required for `format upsert` and RisingWave will use the key column as one and
only primary key to perform upsert semantic. It does not allow to specify multiple columns as primary key
even if they are part of the key.
Comment on lines +52 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So,

  • if I specify key encode avro and include key as x, x will be struct
  • if I specify key encode json and include key as x, x will be JSONB

Right?


## Specifications

### Kafka

| Allowed Components | Default Type | Note |
|--------------------|--------------------------------------------|---------------------------------------------------|
| key | `bytea` | Allow overwritten by `encode` and `key encode` |
| timestamp | `timestamp with time zone` (i64 in millis) | Refer to `CreateTime` rather than `LogAppendTime` |
| partition | `i64` | The message is from which partition |
| offset | `i64` | The offset in the partition |
| header | `struct<varchar, bytea>[]` | KV pairs along with message |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plural: headers


### Pulsar

| Allowed Components | Default Type | Note |
|--------------------|--------------|-------------------------------------------------------------------------------------------|
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `MessageMetadata::partition_key` |

More components are available at [here](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we decide which metadata fields to expose? Do we intend to expose similar (not sure if same) concepts from different connectors using the same word? For example

  • kafka key, pulsar partition_key, kinesis partition_key
  • kafka offset, pulsar sequence_id, kinesis sequence_number
    • i64 vs u64 vs string
  • kafka timestamp, pulsar publish_time, kinesis approximate_arrival_timestamp
    • There is also pulsar event_time
  • kafka headers, pulsar properties
    • kafka headers is struct<varchar, bytea>[] but pulsar properties is struct<varchar, varchar>[]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intend to expose similar (not sure if same) concepts from different connectors using the same word?

I think it's not necessary to unify them. If there is ambiguity, I tend to let them have different names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: now we do want to unify partition and offset for them.. @tabVersion Can you add the rationale for that in the RFC? 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are going to unify the partition and offset column type for all connectors.
The basic reason is that we want to impl a source exec level throttling and requires a chunk to be cut anywhere with maintaining the offset info. Besides, the source backfill feature also rely on the behavior to tell when to end the backfill stage.


### Kinesis

| Allowed Components | Default Type | Note |
|--------------------|------------------------------------------|----------------------------------------------------------------------------------|
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `Record::partition_key` |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: partition key in kinesis is always a unicode string

Copy link
Contributor Author

@tabVersion tabVersion Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I make it bytea here to make it consistent with other connectors. There are planned reactors relying on the unified type in source state table.

| timestamp | `timestamp with time zone` (from chrono) | refer to `Record::approximate_arrival_timestamp` |

More components are available at [here](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/types/struct.Record.html).

### S3/GCS

| Allowed Components | Default Type | Note |
|--------------------|--------------|-----------------------------------|
| file | varchar | The record comes from which file. |