diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 37cca7ec09bed..6c857de148fad 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -18,6 +18,7 @@ use risingwave_pb::secret::PbSecretRef; use crate::sink::catalog::SinkFormatDesc; use crate::source::cdc::external::CdcTableType; +use crate::source::cdc::MYSQL_CDC_CONNECTOR; use crate::source::iceberg::ICEBERG_CONNECTOR; use crate::source::{ AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, @@ -104,6 +105,14 @@ pub trait WithPropertiesExt: Get + Sized { connector == KAFKA_CONNECTOR } + #[inline(always)] + fn is_mysql_cdc_connector(&self) -> bool { + let Some(connector) = self.get_connector() else { + return false; + }; + connector == MYSQL_CDC_CONNECTOR + } + #[inline(always)] fn is_cdc_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3616997d384cb..0eed92c79e74d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,6 +21,7 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; +use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ @@ -1465,6 +1466,14 @@ pub fn bind_connector_props( .to_string(), ); } + if !is_create_source && with_properties.is_mysql_cdc_connector() { + // Generate a random server id for mysql cdc source if needed + // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication + // group (that is, different from any other server id being used by any master or slave) + with_properties + .entry("server.id".to_string()) + .or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string()); + } Ok(with_properties) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c2629cc3718f6..78f724a5a0d10 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -20,7 +20,6 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use itertools::Itertools; -use rand::Rng; use risingwave_common::bitmap::Bitmap; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::{ActorMapping, VnodeCountCompat}; @@ -32,10 +31,8 @@ use risingwave_common::util::stream_graph_visitor::{ }; use risingwave_common::{bail, hash, must_match}; use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::cdc::CdcSourceType; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, - UPSTREAM_SOURCE_KEY, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; use risingwave_meta_model::object::ObjectType; @@ -2009,25 +2006,6 @@ pub fn fill_table_stream_graph_info( source_node.source_inner.as_mut().unwrap().source_id = source.id; source_count += 1; - // Generate a random server id for mysql cdc source if needed - // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication - // group (that is, different from any other server id being used by any master or slave) - if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY) - && matches!( - CdcSourceType::from(connector.as_str()), - CdcSourceType::Mysql - ) - { - let props = &mut source_node.source_inner.as_mut().unwrap().with_properties; - let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); - props - .entry("server.id".to_string()) - .or_insert(rand_server_id.to_string()); - - // make these two `Source` consistent - props.clone_into(&mut source.with_properties); - } - assert_eq!( source_count, 1, "require exactly 1 external stream source when creating table with a connector"