Skip to content

Commit

Permalink
feat(mysql-cdc): deprecate server.id WITH option and generate it in…
Browse files Browse the repository at this point in the history
…ternally (#13031)
  • Loading branch information
StrikeW authored Oct 26, 2023
1 parent d6e5bec commit b0f266b
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ echo "waiting for connector node to start"
wait_for_connector_node_start

echo "--- inline cdc test"
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'

echo "--- mysql & postgres cdc validate test"
Expand Down
133 changes: 133 additions & 0 deletions e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# create and drop CDC mysql tables concurrently

control substitution on

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

system ok
mysql --protocol=tcp -u root -e "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');
CREATE TABLE tt2 (v1 int primary key, v2 timestamp);
INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00');
CREATE TABLE tt3 (v1 int primary key, v2 timestamp);
INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00');
CREATE TABLE tt4 (v1 int primary key, v2 timestamp);
INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00');
CREATE TABLE tt5 (v1 int primary key, v2 timestamp);
INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt1',
);

statement ok
create table tt2 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt2',
);

statement ok
create table tt3 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt3',
);

statement ok
create table tt4 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt4',
);

statement ok
create table tt5 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'dbz',
password = '${MYSQL_PWD:}',
database.name = 'testdb1',
table.name = 'tt5',
);

sleep 3s

query IT
select * from tt1;
----
1 2023-10-23 10:00:00+00:00

query IT
select * from tt2;
----
2 2023-10-23 11:00:00+00:00

query IT
select * from tt3;
----
3 2023-10-23 12:00:00+00:00

query IT
select * from tt4;
----
4 2023-10-23 13:00:00+00:00

query IT
select * from tt5;
----
5 2023-10-23 14:00:00+00:00

statement ok
drop table tt1;

statement ok
drop table tt2;

statement ok
drop table tt3;

statement ok
drop table tt4;

statement ok
drop table tt5;
5 changes: 4 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ macro_rules! impl_cdc_source_type {
$(
$cdc_source_type,
)*
Unspecified,
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
PbSourceType::Unspecified => CdcSourceType::Unspecified,
$(
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
Expand All @@ -253,8 +254,10 @@ macro_rules! impl_cdc_source_type {
$(
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
CdcSourceType::Unspecified => PbSourceType::Unspecified,
}
}
}

}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{

const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";
const UPSTREAM_SOURCE_KEY: &str = "connector";
pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {

for_all_classified_sources!(impl_cdc_source_type);

impl<'a> From<&'a str> for CdcSourceType {
fn from(name: &'a str) -> Self {
match name {
MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
_ => CdcSourceType::Unspecified,
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod monitor;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
pub use base::*;
pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait = "0.1"
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.11"
rand = "0.8"
regex = "1"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
Expand Down
29 changes: 29 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use rand::Rng;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_fragment;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
};
Expand Down Expand Up @@ -428,6 +431,16 @@ impl DdlService for DdlServiceImpl {
// Generate source id.
let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category
fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph);

// Modify properties for cdc sources if needed
if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) {
if matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
) {
fill_cdc_mysql_server_id(&mut fragment_graph);
}
}
}

let mut stream_job = StreamingJob::Table(source, mview);
Expand Down Expand Up @@ -827,3 +840,19 @@ fn fill_table_source(
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}

// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
fn fill_cdc_mysql_server_id(fragment_graph: &mut PbStreamFragmentGraph) {
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
let props = &mut source_node.source_inner.as_mut().unwrap().properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());
}
});
}
}
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ConnectorSource {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);

try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!("spawning connector split reader for split {:?}", splits);
tracing::debug!(?splits, ?prop, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
Expand Down

0 comments on commit b0f266b

Please sign in to comment.