-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce include
clause to add additional connector columns
#13707
Conversation
…a/risingwave into tab/include-opts
- Modified the `bind_columns_from_source` function to remove the `include_columns_options` parameter. - Added the `handle_addition_columns` function to add connector-specific columns to the column catalog. - Modified the `handle_create_source` function to handle additional columns, primary key constraints, source watermarks, SQL column constraints, source schema checks, resolving private link connection for Kafka sources, and creating streaming jobs for CDC sources. - Added the `handle_create_source` function to create a catalog source. - Updated the `TestCase` struct to include methods for creating a table with a connector, creating a source, and running SQL queries. - Implemented various methods in the `TestCase` struct for creating schemas, explaining queries, and creating views and indexes. - Modified the `gen_create_table_plan_with_source` function and added the `handle_addition_columns` function to handle additional columns and primary key binding. - Updated the `TestCaseResult` struct to include the logical, optimized, batch, local batch, and stream plans, as well as any encountered errors during planning. Signed-off-by: tabVersion <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4505 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
1980 | 1 | 2524 | 0 |
Click to see the invalid file list
- src/connector/src/parser/additional_columns.rs
- Added import for `format` from `std::fmt` module in `create_source.rs` - Updated the `additional_column_names` vector with additional column names - Implemented validation for additional columns in Debezium, DebeziumMongo, Maxwell, and Canal formats in `create_source.rs` Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
This reverts commit 4f6bca2
…ports - Import `SourceColumnDesc` from `crate::source` in `upsert.rs`, `maxwell.rs`, and `debezium.rs` - Modify the `access_field` function to take a `desc: &SourceColumnDesc` parameter instead of separate `name` and `type_expected` parameters in `upsert.rs`, `maxwell.rs`, and `debezium.rs` - Remove unreachable code in `upsert_parser.rs` - Change the parameter signature of the `apply_row_operation_on_stream_chunk_writer_with_op` function in `util.rs` - Fix error messages and add error messages for various cases in `create_source.rs` Signed-off-by: tabVersion <[email protected]>
- Make `get_key_column_name` public in `upsert_parser.rs` - Update `PlainParser` struct in `plain_parser.rs` to include a new `key_builder` field and initialize it appropriately - Modify `parse_inner` method in `plain_parser.rs` to handle key and payload parameters as options and use `key_builder` and `payload_builder` accessors - Import new dependencies and update logic for parsing key and payload in `plain_parser.rs` - Modify `access_field` function in `upsert.rs` to handle accessing key columns differently and remove unreachable code - Remove `key_builder` attribute from `PlainParser` struct in `parser.rs` and add new test functions and module Signed-off-by: tabVersion <[email protected]>
…th different formats - Handle various cases of error messages for different format and encoding combinations - Rename a variable and remove previously deprecated logic for avro format - Extract primary key columns from schema registry in certain cases - Add error messages for the presence or absence of additional columns and key specifications in different formats Signed-off-by: tabVersion <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. 2 left comments are about style and would be nice to fix.
|
||
pub fn get_connector_compatible_additional_columns( | ||
connector_name: &str, | ||
) -> Option<Vec<(&'static str, CompatibleAdditionalColumnsFn)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Continue from #13707 (comment))
I think it's not necessary to return a function here (aka. high-order function). Exemplified in #13707 (comment):
pub fn get_additional_column_catalog(connector_name: &str, column_name: &str, column_id: &ColumnId) -> Result<ColumnCatalog>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, I misunderstood your comments.
pub fn get_additional_column_catalog(connector_name: &str, column_name: &str, column_id: &ColumnId) -> Result<ColumnCatalog>
It is not an acceptable solution for me because I intend to insert all accepted columns in order.
For example, the output schema will always follow the same order (key, partition, offset)
1)
include key as aaa, partition as bbb, offset as ccc
2)
include offset as ccc, partition as bbb, key as aaa
|
GitGuardian id | Secret | Commit | Filename | |
---|---|---|---|---|
7648819 | Generic Private Key | 46df0fc | src/utils/pgwire/tests/ssl/demo.key | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
Our GitHub checks need improvements? Share your feedbacks!
); | ||
match (&desc.column_type, &desc.additional_column_type) { | ||
(&SourceColumnType::Offset | &SourceColumnType::RowId, _) => { | ||
// SourceColumnType is for CDC source only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wondering whether we are going to support include
for CDC sources... Seems not discussed in RFC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I noticed in slack that we decided to delay it until needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is two separate things actually.
The code here is for direct-cdc tables, which uses column_type
to mark the metadata column.
For mq related sources, we use additional_column_type
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I know how it works currently. It just makes me pondering that stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just makes me pondering that stuff.
yes it can be merged but I chose to add a new one to prevent mixing logic with CDC stuff.
(ProtocolProperties::Plain, EncodingProperties::Json(_)) => { | ||
JsonParser::new(parser_config.specific, rw_columns, source_ctx).map(Self::Json) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here I remove the plain json parser as it is hard to ingest kafka key. This can have overhead because it uses a [&str]
to fetch both key and payload fields.
I'm seeing two problems when using CREATE SOURCE pg_mydb
INCLUDE timestamp AS ts
WITH (
connector = 'postgres-cdc',
...
)
FORMAT PLAIN ENCODE JSON; ERROR: Failed to run the query
Caused by:
Protocol error: Connector postgres-cdc accepts no additional column but got [IncludeOptionItem { column_type: Ident { value: "timestamp", quote_style: None }, column_alias: Some(Ident { value: "ts", quote_style: None }), inner_field: None, header_inner_expect_type: None }] 2-It is also not working when create table from CDC source. CREATE TABLE courses (id INTEGER, title TEXT, PRIMARY KEY (id))
INCLUDE timestamp AS ts
FROM pg_mydb TABLE 'public.courses'; ERROR: Failed to run the query
Caused by:
Invalid input syntax: INCLUDE should be used with a connector |
Sorry, CDC connectors do not have additional columns atm. The team has not decided on what additional columns should they have. Can you elaborate on what columns you want to have and how you want to use them? |
@@ -1054,6 +1078,7 @@ pub async fn generate_stream_graph_for_table( | |||
source_watermarks, | |||
col_id_gen, | |||
append_only, | |||
vec![], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason the real parameter is not passed but a dummy vec![]
is provided here, leading to bug #19384.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
risingwavelabs/rfcs#79
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.