diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index d996f44fdfb6b..10d4524db5370 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -868,15 +868,18 @@ fn fill_table_stream_graph_info( for fragment in fragment_graph.fragments.values_mut() { visit_fragment(fragment, |node_body| { if let NodeBody::Source(source_node) = node_body { + if source_node.source_inner.is_none() { + // skip empty source for dml node + return; + } + // If we're creating a table with connector, we should additionally fill its ID first. if let Some(&mut (ref mut source, source_id)) = source_info.as_mut() { source.id = source_id; let mut source_count = 0; - if let Some(source_inner) = source_node.source_inner.as_mut() { - source_inner.source_id = source_id; - source_count += 1; - } + source_node.source_inner.as_mut().unwrap().source_id = source_id; + source_count += 1; // Generate a random server id for mysql cdc source if needed // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication