diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index f85f69322e6f..7b7d46260052 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -88,10 +88,9 @@ impl DdlServiceImpl { if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = table.optional_associated_source_id { - let source = source.as_mut().unwrap(); - source.id = source_id; + source.as_mut().unwrap().id = source_id; fill_table_stream_graph_info( - source, + &mut source, &mut table, TableJobType::General, &mut fragment_graph, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3bff278d32e0..3937944e66d9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -644,9 +644,11 @@ impl DdlController { stream_job.set_id(id); match &mut stream_job { - StreamingJob::Table(Some(src), table, job_type) => { + StreamingJob::Table(src, table, job_type) => { // If we're creating a table with connector, we should additionally fill its ID first. - src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + if let Some(src) = src { + src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + } fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); } StreamingJob::Source(_) => { @@ -1979,19 +1981,12 @@ impl DdlController { /// Fill in necessary information for table stream graph. pub fn fill_table_stream_graph_info( - source: &mut PbSource, + source: &mut Option, table: &mut PbTable, table_job_type: TableJobType, fragment_graph: &mut PbStreamFragmentGraph, ) { let mut source_count = 0; - // Fill in the correct table id for source. - source.optional_associated_table_id = - Some(OptionalAssociatedTableId::AssociatedTableId(table.id)); - // Fill in the correct source id for mview. - table.optional_associated_source_id = - Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id)); - for fragment in fragment_graph.fragments.values_mut() { visit_fragment(fragment, |node_body| { if let NodeBody::Source(source_node) = node_body { @@ -2001,26 +1996,40 @@ pub fn fill_table_stream_graph_info( } // If we're creating a table with connector, we should additionally fill its ID first. - source_node.source_inner.as_mut().unwrap().source_id = source.id; - source_count += 1; - - // Generate a random server id for mysql cdc source if needed - // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication - // group (that is, different from any other server id being used by any master or slave) - if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY) - && matches!( - CdcSourceType::from(connector.as_str()), - CdcSourceType::Mysql - ) - { - let props = &mut source_node.source_inner.as_mut().unwrap().with_properties; - let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); - props - .entry("server.id".to_string()) - .or_insert(rand_server_id.to_string()); - - // make these two `Source` consistent - props.clone_into(&mut source.with_properties); + if let Some(source) = source { + source_node.source_inner.as_mut().unwrap().source_id = source.id; + source_count += 1; + + // Generate a random server id for mysql cdc source if needed + // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication + // group (that is, different from any other server id being used by any master or slave) + if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY) + && matches!( + CdcSourceType::from(connector.as_str()), + CdcSourceType::Mysql + ) + { + let props = &mut source_node.source_inner.as_mut().unwrap().with_properties; + let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); + props + .entry("server.id".to_string()) + .or_insert(rand_server_id.to_string()); + + // make these two `Source` consistent + props.clone_into(&mut source.with_properties); + } + + assert_eq!( + source_count, 1, + "require exactly 1 external stream source when creating table with a connector" + ); + + // Fill in the correct table id for source. + source.optional_associated_table_id = + Some(OptionalAssociatedTableId::AssociatedTableId(table.id)); + // Fill in the correct source id for mview. + table.optional_associated_source_id = + Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id)); } } @@ -2034,8 +2043,4 @@ pub fn fill_table_stream_graph_info( } }); } - assert_eq!( - source_count, 1, - "require exactly 1 external stream source when creating table with a connector" - ); } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 852d9776d61d..4a83914f4f08 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -46,7 +46,7 @@ impl DdlController { let job_id = streaming_job.id(); match &mut streaming_job { - StreamingJob::Table(Some(src), table, job_type) => { + StreamingJob::Table(src, table, job_type) => { // If we're creating a table with connector, we should additionally fill its ID first. fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); }