-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: use include ... as ...
to ingest more message parts
#79
Conversation
* If `as` name is not specified, a connector-component naming template will be applied | ||
* For connector kafka and component key, the derived message key column name is `_rw_kafka_key`. | ||
* The default type for message key column is `bytea`. The priority of the type definition is: | ||
`key encode` > infer from `format ... encode ...` > default type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infer from
format ... encode ...
What dose it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is a schema registry and we can infer the key schema from it.
* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic. | ||
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of | ||
the message key. This behavior is more like `format plain encode json` with PK constraint. | ||
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even more, We need a method to define pk from the part of the whole json message key. Not sure if it can be the default behavior when user defining the primary key constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- An exception: for
format upsert encode json
, RisingWave allows the PK to be part of the message payload instead of the message key. This behavior is more likeformat plain encode json
with PK constraint.
IIRC, according to our previous discussion, this will be format insert
+ normal PK definition i.e. primary key (foo, bar)
.
The exception looks very inconsistent and I'd like to get rid of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some cases, user want to determine the primary key of the table for some use cases such as
- better batching/ serving performance
- streaming temporal join
And in those cases, user can make sure the primary keys column is exactly the message key in Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If format upsert
is specific to MQ with delete tombsonte (null value), I think the PK must be the MQ message key and cannot be a field in the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either of the following behavior is acceptable:
- All fields in message key are used as PK, and empty message body will be considered as delete tomb
- All/Some fields in message key/body are used as PK, and empty message body will not be considered as delete tomb.
1 corresponds to upsert
semantics, while 2 corresponds to plain
(aka.insert
) semantics. Whether to consider empty message as delete tomb is the key difference between upsert
and plain
.
Example:
/* message key: {"a":1,"b":2}
message body: {"a":1,"b":2,"c":12,"d":34} */
-- case 1 --
create table t ( a int, b int, c int, d int)
format upsert
key encode json include key as pk;
/* NOTE: The output table will contain a `pk jsonb` column */
-- case 2 --
create table t ( a int, b int, c int, d int, primary key (a,b))
format plain
[key encode json include key as pk] -- optional!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
* **Important**: `include key` is required for `format upsert` and RisingWave will use the key column as one and | ||
only primary key to perform upsert semantic. It does not allow to specify multiple columns as primary key | ||
even if they are part of the key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So,
- if I specify
key encode avro
andinclude key as x
,x
will bestruct
- if I specify
key encode json
andinclude key as x
,x
will beJSONB
Right?
* For all connectors with `format upsert`, RisingWave derives the column as primary key to perform upsert semantic. | ||
* An exception: for `format upsert encode json`, RisingWave allows the PK to be part of the message payload instead of | ||
the message key. This behavior is more like `format plain encode json` with PK constraint. | ||
* **To Be Discussed**: whether we can afford to perform a breaking change for the above exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- An exception: for
format upsert encode json
, RisingWave allows the PK to be part of the message payload instead of the message key. This behavior is more likeformat plain encode json
with PK constraint.
IIRC, according to our previous discussion, this will be format insert
+ normal PK definition i.e. primary key (foo, bar)
.
The exception looks very inconsistent and I'd like to get rid of it.
|
||
| Allowed Components | Default Type | Note | | ||
|--------------------|------------------------------------------|----------------------------------------------------------------------------------| | ||
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `Record::partition_key` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: partition key in kinesis is always a unicode string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I make it bytea
here to make it consistent with other connectors. There are planned reactors relying on the unified type in source state table.
|--------------------|--------------|-------------------------------------------------------------------------------------------| | ||
| key | `bytea` | Allow overwritten by `encode` and `key encode`. Refer to `MessageMetadata::partition_key` | | ||
|
||
More components are available at [here](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we decide which metadata fields to expose? Do we intend to expose similar (not sure if same) concepts from different connectors using the same word? For example
- kafka
key
, pulsarpartition_key
, kinesispartition_key
- There is also pulsar
ordering_key
or kinesis ExplicitHashKey
- There is also pulsar
- kafka
offset
, pulsarsequence_id
, kinesissequence_number
- i64 vs u64 vs string
- kafka
timestamp
, pulsarpublish_time
, kinesisapproximate_arrival_timestamp
- There is also pulsar
event_time
- There is also pulsar
- kafka
headers
, pulsarproperties
- kafka headers is
struct<varchar, bytea>[]
but pulsar properties isstruct<varchar, varchar>[]
- kafka headers is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we intend to expose similar (not sure if same) concepts from different connectors using the same word?
I think it's not necessary to unify them. If there is ambiguity, I tend to let them have different names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: now we do want to unify partition
and offset
for them.. @tabVersion Can you add the rationale for that in the RFC? 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are going to unify the partition and offset column type for all connectors.
The basic reason is that we want to impl a source exec level throttling and requires a chunk to be cut anywhere with maintaining the offset info. Besides, the source backfill feature also rely on the behavior to tell when to end the backfill stage.
| timestamp | `timestamp with time zone` (i64 in millis) | Refer to `CreateTime` rather than `LogAppendTime` | | ||
| partition | `i64` | The message is from which partition | | ||
| offset | `i64` | The offset in the partition | | ||
| header | `struct<varchar, bytea>[]` | KV pairs along with message | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plural: headers
Would also like to mention an alternative syntax:
Pros and cons of this syntax, compared to
|
Regarding the upgrade, please automatically add |
Currently, we do not allow users to define schema in column clauses when using schema registry or schema file |
FYI. Feel free to join the slack channel |
update offset and partition column type to make it consistent with existing source impl
Co-authored-by: xxchan <[email protected]>
No description provided.