-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: only ingest key-ed value in additional header column #14628
Conversation
…ages - Refactored the `do_action` method in `mod.rs` for improved source column description processing - Added support for additional column types in the `wrapped_f` closure in `mod.rs` - Updated error handling for failed access to non-primary key columns in the `wrapped_f` closure in `mod.rs` - Added rollback functionality to the `do_action` method in `mod.rs` - Modified the function `extract_headers_from_meta` in `util.rs` to accept an additional parameter - Updated the implementation of `extract_headers_from_meta` to call `kafka_meta.extract_headers(inner_field)` in `util.rs` - Added an optional `inner_field` parameter to the `extract_headers` function in `message.rs` - Updated the implementation of `extract_headers` to handle the changes in `message.rs` Signed-off-by: tabVersion <[email protected]>
proto/plan_common.proto
Outdated
AdditionalColumnType additional_column_type = 9; | ||
|
||
// deprecated, use AdditionalColumn instead | ||
// AdditionalColumnType additional_column_type = 9; | ||
reserved 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
additional_column_type
is included in 1.6, but not documented.
Why do we need to deprecate this field here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, for source created in 1.6, it will have AdditionalColumnType::NORMAL
. So we cannot change the type for field 9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we told any poc user to try include
before? We might need to tell them to rebuild the sources later. Or maybe we just document this breaking change in the release note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the code is in v1.6.0 but the feature but not considered as released. It is ok to ignore the non-normal columns.
And I don't want to make breaking changes to normal columns here so I choose to use a new field.
I want to make things flexible when handling additional columns. Just like this change, the prev enum is not sufficient with handling an extra inner field arg. I don't know what comes next, so I make all columns a message instead of an enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The breaking change looks acceptable to me, although it seems not hard to make it backward compatible.
|
||
// deprecated, use AdditionalColumn instead | ||
// AdditionalColumnType additional_column_type = 9; | ||
reserved 9; | ||
|
||
ColumnDescVersion version = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a new ColumnDescVersion
? TBH I'm not sure about why it's added, and it doesn't seem to be needed here. Ask just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the field is introduced in #13707 to deal with DEFAULT_KEY_COLUMN_NAME
change in future.
discussions are available here #13707 (comment)
Materialize parse the value to UTF-8 string by default, unless specifying the data type Ref. https://materialize.com/docs/sql/create-source/kafka/#syntax I think we should follow the design, because in most cases the value will just be used as My idea: INCLUDE HEADER key -- decode as UTF-8 string
INCLUDE HEADER key AS name -- decode as UTF-8 string
INCLUDE HEADER key AS name VARCHAR -- decode as UTF-8 string
INCLUDE HEADER key AS name BYTEA -- output raw bytes |
proto/plan_common.proto
Outdated
AdditionalColumnType additional_column_type = 9; | ||
|
||
// deprecated, use AdditionalColumn instead | ||
// AdditionalColumnType additional_column_type = 9; | ||
reserved 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The breaking change looks acceptable to me, although it seems not hard to make it backward compatible.
The solution seems a little verbose to me, the original purpose for this new syntax is to handle the problem that users can have trouble finding the key they want in |
As a connector, I would hope to complete every parsing work inside of it. I consider parsing a header value to string as part of this. I can forecast that users will ask you 2 questions frequently without such an option.
Both are natural requirements but the solution is obscure, so you can't blame the users actually. |
Signed-off-by: tabVersion <[email protected]>
Already implemented. Please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
syntax
INCLUDE HEADER 'header_col' AS column_name
and only header can specify the inner field part.if alias not specified, the header with inner field will be named
_rw_kafka_header_{inner field}
if an inner field name is specified, the column type becomes
bytea
instead ofArray[Struct<Varchar, Bytea>]
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
support syntax specially for header
now we can specify desired keys in header and make it a column
here is an example: given a header
[(key1, value1), (key2, value2)]
and the clause is
include header 'key1'
, the column content isvalue1
in bytesif the clause is
include header 'key1' varchar
the column content isvalue1
in varcharabout the default naming:
in prev impl, the default name is
_rw_{connector name}_{additional column type}
In this pr we introduced inner field and type hint, so it can be
_rw_{connector name}_{additional column type}_{<inner field name>}_{type hint}
to be specific, for
include header 'header1' bytea
, the name is_rw_kafka_header_header1_bytea
and forinclude header 'header2' varchar
the name is_rw_kafka_header_header2_varchar