From 64cc6ce3ccc97a704ab6654bfafa9223691c9bc5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 26 Nov 2024 22:27:50 +0800 Subject: [PATCH] fix cdc Signed-off-by: xxchan --- ci/scripts/e2e-source-test.sh | 2 +- src/frontend/src/handler/create_source.rs | 31 +++++++++-------------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 4491db5633ea8..b6ea8267a1690 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu apt-get -y install jq echo "--- e2e, inline test" -RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \ risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' -j16 risedev slt './e2e_test/source_inline/**/*.slt.serial' diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2e1ba73c20606..a8fe232236eeb 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1495,7 +1495,7 @@ pub async fn bind_create_source_or_table_with_connector( col_id_gen: &mut ColumnIdGenerator, // `true` for "create source", `false` for "create table with connector" is_create_source: bool, - is_shared: bool, + is_shared_non_cdc: bool, source_rate_limit: Option, ) -> Result<(SourceCatalog, DatabaseId, SchemaId)> { let session = &handler_args.session; @@ -1559,7 +1559,8 @@ pub async fn bind_create_source_or_table_with_connector( check_and_add_timestamp_column(&with_properties, &mut columns); // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor. - if is_shared { + // For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS + if is_shared_non_cdc { let (columns_exist, additional_columns) = source_add_partition_offset_cols( &columns, &with_properties.get_connector().unwrap(), @@ -1687,14 +1688,14 @@ pub async fn handle_create_source( let with_properties = bind_connector_props(&handler_args, &format_encode, true)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); - let is_shared = create_cdc_source_job - || (with_properties.is_shareable_non_cdc_connector() - && session - .env() - .streaming_config() - .developer - .enable_shared_source - && session.config().streaming_use_shared_source()); + let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector() + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source(); + let is_shared = create_cdc_source_job || is_shared_non_cdc; let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &format_encode)? @@ -1722,7 +1723,7 @@ pub async fn handle_create_source( stmt.include_column_options, &mut col_id_gen, true, - is_shared, + is_shared_non_cdc, overwrite_options.source_rate_limit, ) .await?; @@ -1965,14 +1966,6 @@ pub mod tests { "_rw_table_name", Varchar, ), - ( - "_rw_mysql-cdc_partition", - Varchar, - ), - ( - "_rw_mysql-cdc_offset", - Varchar, - ), ( "_row_id", Serial,