Skip to content

Commit

Permalink
fix shared cdc source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 13, 2024
1 parent 6aa8502 commit 4c7ae69
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,8 @@ pub async fn handle_create_source(
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

let create_cdc_source_job = with_properties.is_shared_cdc_source();
let has_streaming_job = create_cdc_source_job || with_properties.is_kafka_connector();
let has_streaming_job = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_reusable_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
Expand Down Expand Up @@ -1414,7 +1415,7 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if has_streaming_job && session.config().rw_enable_reusable_source() {
if has_streaming_job {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_node = LogicalSource::with_catalog(
Expand Down

0 comments on commit 4c7ae69

Please sign in to comment.