Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce include clause to add additional connector columns #13707

Merged
merged 60 commits into from
Dec 20, 2023

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Nov 29, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

risingwavelabs/rfcs#79

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

tabVersion and others added 7 commits November 29, 2023 16:17
- Modified the `bind_columns_from_source` function to remove the `include_columns_options` parameter.
- Added the `handle_addition_columns` function to add connector-specific columns to the column catalog.
- Modified the `handle_create_source` function to handle additional columns, primary key constraints, source watermarks, SQL column constraints, source schema checks, resolving private link connection for Kafka sources, and creating streaming jobs for CDC sources.
- Added the `handle_create_source` function to create a catalog source.
- Updated the `TestCase` struct to include methods for creating a table with a connector, creating a source, and running SQL queries.
- Implemented various methods in the `TestCase` struct for creating schemas, explaining queries, and creating views and indexes.
- Modified the `gen_create_table_plan_with_source` function and added the `handle_addition_columns` function to handle additional columns and primary key binding.
- Updated the `TestCaseResult` struct to include the logical, optimized, batch, local batch, and stream plans, as well as any encountered errors during planning.

Signed-off-by: tabVersion <[email protected]>
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4505 files.

Valid Invalid Ignored Fixed
1980 1 2524 0
Click to see the invalid file list
  • src/connector/src/parser/additional_columns.rs

src/connector/src/parser/additional_columns.rs Outdated Show resolved Hide resolved
- Added import for `format` from `std::fmt` module in `create_source.rs`
- Updated the `additional_column_names` vector with additional column names
- Implemented validation for additional columns in Debezium, DebeziumMongo, Maxwell, and Canal formats in `create_source.rs`

Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
…ports

- Import `SourceColumnDesc` from `crate::source` in `upsert.rs`, `maxwell.rs`, and `debezium.rs`
- Modify the `access_field` function to take a `desc: &SourceColumnDesc` parameter instead of separate `name` and `type_expected` parameters in `upsert.rs`, `maxwell.rs`, and `debezium.rs`
- Remove unreachable code in `upsert_parser.rs`
- Change the parameter signature of the `apply_row_operation_on_stream_chunk_writer_with_op` function in `util.rs`
- Fix error messages and add error messages for various cases in `create_source.rs`

Signed-off-by: tabVersion <[email protected]>
- Make `get_key_column_name` public in `upsert_parser.rs`
- Update `PlainParser` struct in `plain_parser.rs` to include a new `key_builder` field and initialize it appropriately
- Modify `parse_inner` method in `plain_parser.rs` to handle key and payload parameters as options and use `key_builder` and `payload_builder` accessors
- Import new dependencies and update logic for parsing key and payload in `plain_parser.rs`
- Modify `access_field` function in `upsert.rs` to handle accessing key columns differently and remove unreachable code
- Remove `key_builder` attribute from `PlainParser` struct in `parser.rs` and add new test functions and module

Signed-off-by: tabVersion <[email protected]>
…th different formats

- Handle various cases of error messages for different format and encoding combinations
- Rename a variable and remove previously deprecated logic for avro format
- Extract primary key columns from schema registry in certain cases
- Add error messages for the presence or absence of additional columns and key specifications in different formats

Signed-off-by: tabVersion <[email protected]>
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. 2 left comments are about style and would be nice to fix.


pub fn get_connector_compatible_additional_columns(
connector_name: &str,
) -> Option<Vec<(&'static str, CompatibleAdditionalColumnsFn)>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Continue from #13707 (comment))

I think it's not necessary to return a function here (aka. high-order function). Exemplified in #13707 (comment):

pub fn get_additional_column_catalog(connector_name: &str, column_name: &str, column_id: &ColumnId) -> Result<ColumnCatalog>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I misunderstood your comments.

pub fn get_additional_column_catalog(connector_name: &str, column_name: &str, column_id: &ColumnId) -> Result<ColumnCatalog>

It is not an acceptable solution for me because I intend to insert all accepted columns in order.
For example, the output schema will always follow the same order (key, partition, offset)

1)
include key as aaa, partition as bbb, offset as ccc
2)
include offset as ccc, partition as bbb, key as aaa

@tabVersion tabVersion added the ci/run-backwards-compat-tests Run backwards compatibility tests in your PR. label Dec 20, 2023
Copy link

gitguardian bot commented Dec 20, 2023

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id Secret Commit Filename
7648819 Generic Private Key 46df0fc src/utils/pgwire/tests/ssl/demo.key View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Our GitHub checks need improvements? Share your feedbacks!

@tabVersion tabVersion added this pull request to the merge queue Dec 20, 2023
Merged via the queue into main with commit feadac7 Dec 20, 2023
29 of 30 checks passed
@tabVersion tabVersion deleted the tab/include-opts branch December 20, 2023 06:31
);
match (&desc.column_type, &desc.additional_column_type) {
(&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
// SourceColumnType is for CDC source only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me wondering whether we are going to support include for CDC sources... Seems not discussed in RFC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is two separate things actually.
The code here is for direct-cdc tables, which uses column_type to mark the metadata column.
For mq related sources, we use additional_column_type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I know how it works currently. It just makes me pondering that stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just makes me pondering that stuff.

yes it can be merged but I chose to add a new one to prevent mixing logic with CDC stuff.

Comment on lines -757 to -759
(ProtocolProperties::Plain, EncodingProperties::Json(_)) => {
JsonParser::new(parser_config.specific, rw_columns, source_ctx).map(Self::Json)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I remove the plain json parser as it is hard to ingest kafka key. This can have overhead because it uses a [&str] to fetch both key and payload fields.

@mingfang
Copy link

I'm seeing two problems when using include with CDC
1-include is not working for CDC at all

CREATE SOURCE pg_mydb 
INCLUDE timestamp AS ts
WITH (
    connector = 'postgres-cdc', 
   ...
) 
FORMAT PLAIN ENCODE JSON;
ERROR:  Failed to run the query

Caused by:
  Protocol error: Connector postgres-cdc accepts no additional column but got [IncludeOptionItem { column_type: Ident { value: "timestamp", quote_style: None }, column_alias: Some(Ident { value: "ts", quote_style: None }), inner_field: None, header_inner_expect_type: None }]

2-It is also not working when create table from CDC source.

CREATE TABLE courses (id INTEGER, title TEXT, PRIMARY KEY (id)) 
INCLUDE timestamp AS ts
FROM pg_mydb TABLE 'public.courses';
ERROR:  Failed to run the query

Caused by:
  Invalid input syntax: INCLUDE should be used with a connector

@tabVersion
Copy link
Contributor Author

I'm seeing two problems when using include with CDC 1-include is not working for CDC at all

CREATE SOURCE pg_mydb 
INCLUDE timestamp AS ts
WITH (
    connector = 'postgres-cdc', 
   ...
) 
FORMAT PLAIN ENCODE JSON;
ERROR:  Failed to run the query

Caused by:
  Protocol error: Connector postgres-cdc accepts no additional column but got [IncludeOptionItem { column_type: Ident { value: "timestamp", quote_style: None }, column_alias: Some(Ident { value: "ts", quote_style: None }), inner_field: None, header_inner_expect_type: None }]

2-It is also not working when create table from CDC source.

CREATE TABLE courses (id INTEGER, title TEXT, PRIMARY KEY (id)) 
INCLUDE timestamp AS ts
FROM pg_mydb TABLE 'public.courses';
ERROR:  Failed to run the query

Caused by:
  Invalid input syntax: INCLUDE should be used with a connector

Sorry, CDC connectors do not have additional columns atm. The team has not decided on what additional columns should they have. Can you elaborate on what columns you want to have and how you want to use them?

@@ -1054,6 +1078,7 @@ pub async fn generate_stream_graph_for_table(
source_watermarks,
col_id_gen,
append_only,
vec![],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason the real parameter is not passed but a dummy vec![] is provided here, leading to bug #19384.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci/run-backwards-compat-tests Run backwards compatibility tests in your PR. type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants