Skip to content

Commit

Permalink
fix: fix fill table id for cdc backfill source (#14516)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and Little-Wallace committed Jan 20, 2024
1 parent 2dd61a6 commit 4d23e5e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 38 deletions.
5 changes: 2 additions & 3 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ impl DdlServiceImpl {
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
let source = source.as_mut().unwrap();
source.id = source_id;
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(
source,
&mut source,
&mut table,
TableJobType::General,
&mut fragment_graph,
Expand Down
73 changes: 39 additions & 34 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,11 @@ impl DdlController {
stream_job.set_id(id);

match &mut stream_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
if let Some(src) = src {
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
}
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(_) => {
Expand Down Expand Up @@ -1979,19 +1981,12 @@ impl DdlController {

/// Fill in necessary information for table stream graph.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));

for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
Expand All @@ -2001,26 +1996,40 @@ pub fn fill_table_stream_graph_info(
}

// If we're creating a table with connector, we should additionally fill its ID first.
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

Expand All @@ -2034,8 +2043,4 @@ pub fn fill_table_stream_graph_info(
}
});
}
assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);
}
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl DdlController {
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
Expand Down

0 comments on commit 4d23e5e

Please sign in to comment.