diff --git a/Cargo.lock b/Cargo.lock index 2f2ee2991b10f..dbce3dfae9f3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7778,6 +7778,7 @@ dependencies = [ "itertools 0.11.0", "madsim-tokio", "madsim-tonic", + "rand", "regex", "risingwave_common", "risingwave_connector", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 59618d24641aa..2edd099e31187 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -81,6 +81,7 @@ echo "waiting for connector node to start" wait_for_connector_node_start echo "--- inline cdc test" +export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt' echo "--- mysql & postgres cdc validate test" diff --git a/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt b/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt new file mode 100644 index 0000000000000..071fe0ef2da83 --- /dev/null +++ b/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt @@ -0,0 +1,133 @@ +# create and drop CDC mysql tables concurrently + +control substitution on + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4; + +system ok +mysql --protocol=tcp -u root -e " + DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); + CREATE TABLE tt2 (v1 int primary key, v2 timestamp); + INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); + CREATE TABLE tt3 (v1 int primary key, v2 timestamp); + INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); + CREATE TABLE tt4 (v1 int primary key, v2 timestamp); + INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); + CREATE TABLE tt5 (v1 int primary key, v2 timestamp); + INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');" + +statement ok +create table tt1 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '${MYSQL_PWD:}', + database.name = 'testdb1', + table.name = 'tt1', +); + +statement ok +create table tt2 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '${MYSQL_PWD:}', + database.name = 'testdb1', + table.name = 'tt2', +); + +statement ok +create table tt3 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '${MYSQL_PWD:}', + database.name = 'testdb1', + table.name = 'tt3', +); + +statement ok +create table tt4 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '${MYSQL_PWD:}', + database.name = 'testdb1', + table.name = 'tt4', +); + +statement ok +create table tt5 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '${MYSQL_PWD:}', + database.name = 'testdb1', + table.name = 'tt5', +); + +sleep 3s + +query IT +select * from tt1; +---- +1 2023-10-23 10:00:00+00:00 + +query IT +select * from tt2; +---- +2 2023-10-23 11:00:00+00:00 + +query IT +select * from tt3; +---- +3 2023-10-23 12:00:00+00:00 + +query IT +select * from tt4; +---- +4 2023-10-23 13:00:00+00:00 + +query IT +select * from tt5; +---- +5 2023-10-23 14:00:00+00:00 + +statement ok +drop table tt1; + +statement ok +drop table tt2; + +statement ok +drop table tt3; + +statement ok +drop table tt4; + +statement ok +drop table tt5; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 62a3cfdcd9682..fdc3ed8867297 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -234,12 +234,13 @@ macro_rules! impl_cdc_source_type { $( $cdc_source_type, )* + Unspecified, } impl From for CdcSourceType { fn from(value: PbSourceType) -> Self { match value { - PbSourceType::Unspecified => unreachable!(), + PbSourceType::Unspecified => CdcSourceType::Unspecified, $( PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type, )* @@ -253,8 +254,10 @@ macro_rules! impl_cdc_source_type { $( CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type, )* + CdcSourceType::Unspecified => PbSourceType::Unspecified, } } } + } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 6a8cd12ce9fac..49dc3b5d87119 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -51,7 +51,7 @@ use crate::{ const SPLIT_TYPE_FIELD: &str = "split_type"; const SPLIT_INFO_FIELD: &str = "split_info"; -const UPSTREAM_SOURCE_KEY: &str = "connector"; +pub const UPSTREAM_SOURCE_KEY: &str = "connector"; pub trait TryFromHashmap: Sized { fn try_from_hashmap(props: HashMap) -> Result; diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 1d795a7141e84..d55273bf725db 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -42,6 +42,17 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { for_all_classified_sources!(impl_cdc_source_type); +impl<'a> From<&'a str> for CdcSourceType { + fn from(name: &'a str) -> Self { + match name { + MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql, + POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres, + CITUS_CDC_CONNECTOR => CdcSourceType::Citus, + _ => CdcSourceType::Unspecified, + } + } +} + #[derive(Clone, Debug, Default)] pub struct CdcProperties { /// Properties specified in the WITH clause by user diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 7410834ce1daa..4d25d82c106c3 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -87,6 +87,9 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, }), + CdcSourceType::Unspecified => { + unreachable!(); + } } } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 869b7089ac271..c866ed6c3c223 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -24,7 +24,7 @@ pub mod monitor; pub mod nats; pub mod nexmark; pub mod pulsar; -pub use base::*; +pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 87b293f64a5e6..d1c08a642c8ca 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.11" +rand = "0.8" regex = "1" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 6f08ebfb18d17..5f73ffb815117 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -16,9 +16,12 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use rand::Rng; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_connector::sink::catalog::SinkId; +use risingwave_connector::source::cdc::CdcSourceType; +use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_pb::catalog::connection::private_link_service::{ PbPrivateLinkProvider, PrivateLinkProvider, }; @@ -428,6 +431,16 @@ impl DdlService for DdlServiceImpl { // Generate source id. let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph); + + // Modify properties for cdc sources if needed + if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) { + if matches!( + CdcSourceType::from(connector.as_str()), + CdcSourceType::Mysql + ) { + fill_cdc_mysql_server_id(&mut fragment_graph); + } + } } let mut stream_job = StreamingJob::Table(source, mview); @@ -827,3 +840,19 @@ fn fill_table_source( table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } + +// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication +// group (that is, different from any other server id being used by any master or slave) +fn fill_cdc_mysql_server_id(fragment_graph: &mut PbStreamFragmentGraph) { + for fragment in fragment_graph.fragments.values_mut() { + visit_fragment(fragment, |node_body| { + if let NodeBody::Source(source_node) = node_body { + let props = &mut source_node.source_inner.as_mut().unwrap().properties; + let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX); + props + .entry("server.id".to_string()) + .or_insert(rand_server_id.to_string()); + } + }); + } +} diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 733ba6a8c4a83..31ee763d2a0b9 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -155,7 +155,7 @@ impl ConnectorSource { let to_reader_splits = splits.into_iter().map(|split| vec![split]); try_join_all(to_reader_splits.into_iter().map(|splits| { - tracing::debug!("spawning connector split reader for split {:?}", splits); + tracing::debug!(?splits, ?prop, "spawning connector split reader"); let props = prop.clone(); let data_gen_columns = data_gen_columns.clone(); let parser_config = parser_config.clone();