Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only ingest key-ed value in additional header column #14628

Merged
merged 31 commits into from
Jan 25, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Jan 17, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

syntax INCLUDE HEADER 'header_col' AS column_name and only header can specify the inner field part.
if alias not specified, the header with inner field will be named _rw_kafka_header_{inner field}

if an inner field name is specified, the column type becomes bytea instead of Array[Struct<Varchar, Bytea>]

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

support syntax specially for header

create table/source s (...) include header '<header col name>' [varchar/bytea] [as <alias>]

now we can specify desired keys in header and make it a column


here is an example: given a header [(key1, value1), (key2, value2)]

and the clause is include header 'key1', the column content is value1 in bytes
if the clause is include header 'key1' varchar the column content is value1 in varchar

about the default naming:

in prev impl, the default name is _rw_{connector name}_{additional column type}
In this pr we introduced inner field and type hint, so it can be _rw_{connector name}_{additional column type}_{<inner field name>}_{type hint}
to be specific, for include header 'header1' bytea, the name is _rw_kafka_header_header1_bytea and for include header 'header2' varchar the name is _rw_kafka_header_header2_varchar

…ages

- Refactored the `do_action` method in `mod.rs` for improved source column description processing
- Added support for additional column types in the `wrapped_f` closure in `mod.rs`
- Updated error handling for failed access to non-primary key columns in the `wrapped_f` closure in `mod.rs`
- Added rollback functionality to the `do_action` method in `mod.rs`
- Modified the function `extract_headers_from_meta` in `util.rs` to accept an additional parameter
- Updated the implementation of `extract_headers_from_meta` to call `kafka_meta.extract_headers(inner_field)` in `util.rs`
- Added an optional `inner_field` parameter to the `extract_headers` function in `message.rs`
- Updated the implementation of `extract_headers` to handle the changes in `message.rs`

Signed-off-by: tabVersion <[email protected]>
@tabVersion tabVersion marked this pull request as ready for review January 19, 2024 14:12
Comment on lines 67 to 59
AdditionalColumnType additional_column_type = 9;

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved 9;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional_column_type is included in 1.6, but not documented.

#14215 (comment)

Why do we need to deprecate this field here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, for source created in 1.6, it will have AdditionalColumnType::NORMAL. So we cannot change the type for field 9

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we told any poc user to try include before? We might need to tell them to rebuild the sources later. Or maybe we just document this breaking change in the release note.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code is in v1.6.0 but the feature but not considered as released. It is ok to ignore the non-normal columns.
And I don't want to make breaking changes to normal columns here so I choose to use a new field.

I want to make things flexible when handling additional columns. Just like this change, the prev enum is not sufficient with handling an extra inner field arg. I don't know what comes next, so I make all columns a message instead of an enum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The breaking change looks acceptable to me, although it seems not hard to make it backward compatible.


// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved 9;

ColumnDescVersion version = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a new ColumnDescVersion? TBH I'm not sure about why it's added, and it doesn't seem to be needed here. Ask just in case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the field is introduced in #13707 to deal with DEFAULT_KEY_COLUMN_NAME change in future.
discussions are available here #13707 (comment)

@tabVersion tabVersion added the user-facing-changes Contains changes that are visible to users label Jan 19, 2024
@tabVersion tabVersion requested a review from st1page January 19, 2024 15:40
@fuyufjh
Copy link
Member

fuyufjh commented Jan 22, 2024

Materialize parse the value to UTF-8 string by default, unless specifying the data type BYTES

image

Ref. https://materialize.com/docs/sql/create-source/kafka/#syntax

I think we should follow the design, because in most cases the value will just be used as varchar instead of bytea, and converting bytea to varchar is verbose.

My idea:

INCLUDE HEADER key                  -- decode as UTF-8 string
INCLUDE HEADER key AS name          -- decode as UTF-8 string
INCLUDE HEADER key AS name VARCHAR  -- decode as UTF-8 string
INCLUDE HEADER key AS name BYTEA    -- output raw bytes

proto/plan_common.proto Outdated Show resolved Hide resolved
proto/plan_common.proto Outdated Show resolved Hide resolved
Comment on lines 67 to 59
AdditionalColumnType additional_column_type = 9;

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved 9;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The breaking change looks acceptable to me, although it seems not hard to make it backward compatible.

@tabVersion
Copy link
Contributor Author

Materialize parse the value to UTF-8 string by default, unless specifying the data type BYTES

image Ref. [materialize.com/docs/sql/create-source/kafka/#syntax](https://materialize.com/docs/sql/create-source/kafka/#syntax)

I think we should follow the design, because in most cases the value will just be used as varchar instead of bytea, and converting bytea to varchar is verbose.

My idea:

INCLUDE HEADER key                  -- decode as UTF-8 string
INCLUDE HEADER key AS name          -- decode as UTF-8 string
INCLUDE HEADER key AS name VARCHAR  -- decode as UTF-8 string
INCLUDE HEADER key AS name BYTEA    -- output raw bytes

The solution seems a little verbose to me, the original purpose for this new syntax is to handle the problem that users can have trouble finding the key they want in Array[Struct<varchar, bytea>]. And the solution works well solving it.
Users have no obstacle converting varchar from bytes in sql and the efficiency is good. I'd keep this external because it serves as a shortcut in parser, not sharing the logic that fills NULL when parsing failure. We won't want to be responsible for some unexpected behavior caused by users forgetting to specify varchar in the clause.

@fuyufjh
Copy link
Member

fuyufjh commented Jan 22, 2024

The solution seems a little verbose to me, the original purpose for this new syntax is to handle the problem that users can have trouble finding the key they want in Array[Struct<varchar, bytea>]. And the solution works well solving it. Users have no obstacle converting varchar from bytes in sql and the efficiency is good. I'd keep this external because it serves as a shortcut in parser, not sharing the logic that fills NULL when parsing failure. We won't want to be responsible for some unexpected behavior caused by users forgetting to specify varchar in the clause.

As a connector, I would hope to complete every parsing work inside of it. I consider parsing a header value to string as part of this.

I can forecast that users will ask you 2 questions frequently without such an option.

  1. How to convert bytea to varchar with PG functions? -- Please use encode()
  2. How to create a table with a header value in varchar type? -- Please use generated columns.

Both are natural requirements but the solution is obscure, so you can't blame the users actually.

@tabVersion
Copy link
Contributor Author

The solution seems a little verbose to me, the original purpose for this new syntax is to handle the problem that users can have trouble finding the key they want in Array[Struct<varchar, bytea>]. And the solution works well solving it. Users have no obstacle converting varchar from bytes in sql and the efficiency is good. I'd keep this external because it serves as a shortcut in parser, not sharing the logic that fills NULL when parsing failure. We won't want to be responsible for some unexpected behavior caused by users forgetting to specify varchar in the clause.

As a connector, I would hope to complete every parsing work inside of it. I consider parsing a header value to string as part of this.

I can forecast that users will ask you 2 questions frequently without such an option.

  1. How to convert bytea to varchar with PG functions? -- Please use encode()
  2. How to create a table with a header value in varchar type? -- Please use generated columns.

Both are natural requirements but the solution is obscure, so you can't blame the users actually.

Already implemented. Please review.

proto/plan_common.proto Show resolved Hide resolved
src/connector/src/parser/additional_columns.rs Outdated Show resolved Hide resolved
src/connector/src/source/kafka/source/message.rs Outdated Show resolved Hide resolved
proto/plan_common.proto Outdated Show resolved Hide resolved
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@tabVersion tabVersion enabled auto-merge January 25, 2024 03:07
@tabVersion tabVersion added this pull request to the merge queue Jan 25, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 25, 2024
@tabVersion tabVersion added this pull request to the merge queue Jan 25, 2024
Merged via the queue into main with commit df87c2d Jan 25, 2024
31 of 32 checks passed
@tabVersion tabVersion deleted the tab/header-col branch January 25, 2024 05:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants