Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix fill table id for cdc backfill source #14516

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ impl DdlServiceImpl {
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
let source = source.as_mut().unwrap();
source.id = source_id;
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(
source,
&mut source,
&mut table,
TableJobType::General,
&mut fragment_graph,
Expand Down
73 changes: 39 additions & 34 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,11 @@ impl DdlController {
stream_job.set_id(id);

match &mut stream_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
if let Some(src) = src {
src.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
}
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(_) => {
Expand Down Expand Up @@ -1979,19 +1981,12 @@ impl DdlController {

/// Fill in necessary information for table stream graph.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));

for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
Expand All @@ -2001,26 +1996,40 @@ pub fn fill_table_stream_graph_info(
}

// If we're creating a table with connector, we should additionally fill its ID first.
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

Expand All @@ -2034,8 +2043,4 @@ pub fn fill_table_stream_graph_info(
}
});
}
assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);
}
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl DdlController {
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(Some(src), table, job_type) => {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
Expand Down
Loading