diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index b55575d6eb1a8..2fb2d2a8c0645 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu apt-get -y install jq echo "--- e2e, inline test" -RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \ risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' -j16 risedev slt './e2e_test/source_inline/**/*.slt.serial' diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 12c71013556b7..e5dba23ef1faa 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1478,7 +1478,7 @@ pub async fn bind_create_source_or_table_with_connector( col_id_gen: &mut ColumnIdGenerator, // `true` for "create source", `false` for "create table with connector" is_create_source: bool, - is_shared: bool, + is_shared_non_cdc: bool, source_rate_limit: Option, ) -> Result<(SourceCatalog, DatabaseId, SchemaId)> { let session = &handler_args.session; @@ -1543,7 +1543,8 @@ pub async fn bind_create_source_or_table_with_connector( check_and_add_timestamp_column(&with_properties, &mut columns); // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor. - if is_shared { + // For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS + if is_shared_non_cdc { let (columns_exist, additional_columns) = source_add_partition_offset_cols( &columns, &with_properties.get_connector().unwrap(), @@ -1672,14 +1673,14 @@ pub async fn handle_create_source( let with_properties = bind_connector_props(&handler_args, &source_schema, true)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); - let is_shared = create_cdc_source_job - || (with_properties.is_shareable_non_cdc_connector() - && session - .env() - .streaming_config() - .developer - .enable_shared_source - && session.config().streaming_use_shared_source()); + let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector() + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source(); + let is_shared = create_cdc_source_job || is_shared_non_cdc; let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? @@ -1707,7 +1708,7 @@ pub async fn handle_create_source( stmt.include_column_options, &mut col_id_gen, true, - is_shared, + is_shared_non_cdc, overwrite_options.source_rate_limit, ) .await?;