From 69a688a9f1537c7598e1c67e959d3f519d1e0a39 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 22 Aug 2024 12:53:47 +0800 Subject: [PATCH] refine --- Cargo.lock | 1 - src/connector/src/parser/plain_parser.rs | 6 +++++- src/connector/src/parser/unified/debezium.rs | 15 ++++----------- src/meta/Cargo.toml | 1 - .../tests/integration_tests/sink/recovery.rs | 9 --------- 5 files changed, 9 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 842fefb04bd9c..d007b6c6f4f3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11159,7 +11159,6 @@ dependencies = [ "prometheus-http-query", "prost 0.13.1", "rand", - "regex", "risingwave_backup", "risingwave_common", "risingwave_common_heap_profiling", diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index b9ba2a0a53590..797b38168074e 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -125,7 +125,11 @@ impl PlainParser { .generate_accessor(data) .await?; - return match parse_schema_change(&accessor, &self.source_ctx.connector_props) { + return match parse_schema_change( + &accessor, + self.source_ctx.source_id.into(), + &self.source_ctx.connector_props, + ) { Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)), Err(err) => Err(err)?, }; diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 0452610854a6a..d99811ba1f470 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -73,7 +73,6 @@ const BEFORE: &str = "before"; const AFTER: &str = "after"; const UPSTREAM_DDL: &str = "ddl"; -const CDC_SOURCE_NAME_PREFIX: &str = "RW_CDC_"; const SOURCE: &str = "source"; const SOURCE_TS_MS: &str = "ts_ms"; const SOURCE_DB: &str = "db"; @@ -156,8 +155,12 @@ macro_rules! jsonb_access_field { }; } +/// Parse the schema change message from Debezium. +/// The layout of MySQL schema change message can refer to +/// https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic pub fn parse_schema_change( accessor: &impl Access, + source_id: u32, connector_props: &ConnectorProperties, ) -> AccessResult { let mut schema_changes = vec![]; @@ -169,16 +172,6 @@ pub fn parse_schema_change( .as_utf8() .to_string(); - let source_id = if let Some(ScalarRefImpl::Jsonb(source_field)) = - accessor.access(&[SOURCE], &DataType::Jsonb)?.to_datum_ref() - { - let name: String = jsonb_access_field!(source_field, "name", string); - let id = name.strip_prefix(CDC_SOURCE_NAME_PREFIX).unwrap_or("0"); - id.parse::().unwrap_or_default() - } else { - 0 - }; - if let Some(ScalarRefImpl::List(table_changes)) = accessor .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))? .to_datum_ref() diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 727310b03ddda..4511e9f61d894 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -50,7 +50,6 @@ prometheus = "0.13" prometheus-http-query = "0.8" prost = { workspace = true } rand = { workspace = true } -regex = "1.10" risingwave_backup = { workspace = true } risingwave_common = { workspace = true } risingwave_common_heap_profiling = { workspace = true } diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index 774b916db49af..6b4f71d7d508e 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -93,21 +93,12 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { Ok(()) } -fn init_logger() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_ansi(false) - .try_init(); -} - #[tokio::test] async fn test_sink_recovery() -> Result<()> { - init_logger(); recovery_test_inner(false).await } #[tokio::test] async fn test_sink_decouple_recovery() -> Result<()> { - init_logger(); recovery_test_inner(true).await }