Skip to content

Commit

Permalink
refactor: move server.id for MySQL CDC from meta to fe
Browse files Browse the repository at this point in the history
motivation is to simplify create stream job process in meta

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Oct 29, 2024
1 parent 4c05b7a commit f59bc27
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
9 changes: 9 additions & 0 deletions src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::secret::PbSecretRef;

use crate::sink::catalog::SinkFormatDesc;
use crate::source::cdc::external::CdcTableType;
use crate::source::cdc::MYSQL_CDC_CONNECTOR;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
Expand Down Expand Up @@ -104,6 +105,14 @@ pub trait WithPropertiesExt: Get + Sized {
connector == KAFKA_CONNECTOR
}

#[inline(always)]
fn is_mysql_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == MYSQL_CDC_CONNECTOR
}

#[inline(always)]
fn is_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use rand::Rng;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
Expand Down Expand Up @@ -1465,6 +1466,14 @@ pub fn bind_connector_props(
.to_string(),
);
}
if !is_create_source && with_properties.is_mysql_cdc_connector() {
// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
with_properties
.entry("server.id".to_string())
.or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string());
}
Ok(with_properties)
}

Expand Down
22 changes: 0 additions & 22 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Duration;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use rand::Rng;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
Expand All @@ -32,10 +31,8 @@ use risingwave_common::util::stream_graph_visitor::{
};
use risingwave_common::{bail, hash, must_match};
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
UPSTREAM_SOURCE_KEY,
};
use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved};
use risingwave_meta_model::object::ObjectType;
Expand Down Expand Up @@ -2009,25 +2006,6 @@ pub fn fill_table_stream_graph_info(
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
Expand Down

0 comments on commit f59bc27

Please sign in to comment.