Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow configure other additional columns for connectors #14215

Merged
merged 24 commits into from
Jan 10, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Dec 26, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

following #13707 and the final part of risingwavelabs/rfcs#79

the syntax will be like

create table t (..schema.. )
  include key as some_key
  include partition
  include offset
with (...) format ... encode ...

accept columns for each connector -> https://github.com/risingwavelabs/rfcs/blob/tab/include-key-as/rfcs/0079-include-key-as.md

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

described above.

a special note to batch queries on source

in prev impl, we always insert a timestamptz column to the catalog for all source with kafka.
but when introducing include timestamp the semantic is the same. So in this pr, we no longer insert the column if already include timestamp.

a minor change for batch query

for source like

create source s ( ... ) with ( ... ) format ... encode ... 
create source s ( ... ) include timestamp with ( ... ) format ... encode ... 

the query works select * from s where _rw_kafka_timestamp > '1977-01-01 00:00:00'

but if there is an alias specified

create source s ( ... ) include timestamp as some_ts with ( ... ) format ... encode ... 

the query should be select * from s where some_ts > '1977-01-01 00:00:00'

@tabVersion tabVersion marked this pull request as ready for review January 4, 2024 08:58
@tabVersion tabVersion requested a review from a team as a code owner January 4, 2024 08:58
@tabVersion
Copy link
Contributor Author

on second thought, we are going to use both partition column and offset column to record the consumption process, and deprecating StreamChunkWithState
so there is no need to spend effort on the tests, the later refactor will cover most of the logic

@tabVersion tabVersion added ci/run-s3-source-tests ci/run-backwards-compat-tests Run backwards compatibility tests in your PR. labels Jan 4, 2024
Copy link
Contributor

@st1page st1page left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM

e2e_test/source/basic/inlcude_key_as.slt Show resolved Hide resolved
AND timestamp_col IS NOT NULL
AND header_col IS NOT NULL
----
101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know what count to expect here 🤔

Where can I find the input data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done

it will generate message like

key payload header
key1 {"a": 1} [(header1, v1), (header2, v2)]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May mention this in comment to avoid confusion

name,
id,
DataType::List(get_kafka_header_item_datatype().into()),
AdditionalColumnType::Header,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be DataType::Struct instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Kafka header is a list of kvs, with schema (varchar, bytes). The list can be empty, ie. no header and multiple kvs,

here is an example

for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done

kcat will generate 101 messages containing header with two kvs (header1, v1) and (header2, v2).

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is legacy code for _rw_kafka_timestamp considered?

@tabVersion
Copy link
Contributor Author

Is legacy code for _rw_kafka_timestamp considered?

good question, I almost forgot it.

@tabVersion
Copy link
Contributor Author

tabVersion commented Jan 10, 2024

I remember it's the case, just to confirm: #13707 is will be included in v1.6, but not documented, right? So this PR includes some changes to that, but it's ok. BTW, might mention this in Release Note section for doc team to understand it better.

Yes, #13707 is in v1.6.0 and will not be in the doc, will notify the doc team.

- Add `_rw_kafka_timestamp` column to messages from Kafka source
- Handle addition of columns and bind primary key columns
- Set connector to backfill mode and enable CDC sharing mode
- Check and add timestamp column before generating column IDs
- Throw error if source does not support PRIMARY KEY constraint
- Bind source watermark based on columns
- Resolve privatelink connection for Kafka source
- Create PbSource object with provided properties
- Import `KAFKA_TIMESTAMP_COLUMN_NAME` and handle legacy column in `trad_source.rs`

Signed-off-by: tabVersion <[email protected]>
// }),
// ),
(
"header", // type: struct<key varchar, value bytea>[]
Copy link
Member

@fuyufjh fuyufjh Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think JSONB is better for storing Kafka header because it can get (->) a value easily.

A related topic was discussed at #13387 (Summary at #13387 (comment))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is jsonb type do not support bytes inside.

Copy link
Member

@fuyufjh fuyufjh Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. 🤣 Thinking...

The downside of struct<key varchar, value bytea>[] is obvious: RW/PG doesn't provide any function to get a value by key. I don't know how the users can do that...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can support array_filter() https://github.com/risingwavelabs/rfcs/pull/69/files#diff-857a6f40f71644499fee9c269c260a570942420de9a0225b059508d02c1fe98bR127-R138 or array_find for it. if there is not too many fields in the array.
Or support Map datatype...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe lambda can do the work 😈

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the rest

@tabVersion
Copy link
Contributor Author

Btw seems like the additional column offset can't be used for rate limit implementation of source then?

Will the offset column will only be present when the user includes it? Or will it always be parsed, but marked as hidden?

Previous thread: #13800 (comment)

Oh, I plan to do the refactor in #14384. After this, we will always derive offset and partition columns for source and table with connector regardless of whether users explicitly include them. The clause just changes the visibility of the two columns.

- Added new source `s10` with columns `v1` and `v2`
- Included a timestamp column `some_ts` in the `s10` source
- Configured `s10` source as a Kafka connector with topic, bootstrap server, and startup mode properties
- Implemented a query to filter rows from `s10` based on a specific timestamp
- Dropped tables `s8` and `s9`
- Removed source `s9`
- Removed source `s10`

Signed-off-by: tabVersion <[email protected]>
@tabVersion tabVersion enabled auto-merge January 10, 2024 13:30
Signed-off-by: tabVersion <[email protected]>
@tabVersion tabVersion added this pull request to the merge queue Jan 10, 2024
Merged via the queue into main with commit b03a641 Jan 10, 2024
27 of 28 checks passed
@tabVersion tabVersion deleted the tab/addi-columns branch January 10, 2024 15:20
@xxchan
Copy link
Member

xxchan commented Jan 10, 2024

but if there is an alias specified

create source s ( ... ) include timestamp as some_ts with ( ... ) format ... encode ...
the query should be select * from s where some_ts > '1977-01-01 00:00:00'

I think expr_to_kafka_timestamp_range still uses hard coded KAFKA_TIMESTAMP_COLUMN_NAME, so that query doesn't work as expected...

@tabVersion
Copy link
Contributor Author

but if there is an alias specified
create source s ( ... ) include timestamp as some_ts with ( ... ) format ... encode ...
the query should be select * from s where some_ts > '1977-01-01 00:00:00'

I think expr_to_kafka_timestamp_range still uses hard coded KAFKA_TIMESTAMP_COLUMN_NAME, so that query doesn't work as expected...

Yes, but the example above works. Let me find out why...

@xxchan
Copy link
Member

xxchan commented Jan 17, 2024

I think it may because the predicate becomes a FILTER above SOURCE, but not pushed down into the source.

@tabVersion
Copy link
Contributor Author

I think it may because the predicate becomes a FILTER above SOURCE, but not pushed down into the source.

alright, can you help remove the hard code col to prevent future panic?

@xxchan
Copy link
Member

xxchan commented Jan 17, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants