From 1dd3ba09103eb1438d172e62850ed38848acdcde Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 28 Jun 2024 17:14:28 -0400 Subject: [PATCH 1/5] feat(debeizum): upgrade the dependencies of debeizum from 2.4.2.Final to 2.7.0.Fianl --- java/connector-node/risingwave-source-cdc/pom.xml | 3 ++- java/pom.xml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-source-cdc/pom.xml b/java/connector-node/risingwave-source-cdc/pom.xml index 5ee531ef805e..839de9c1f319 100644 --- a/java/connector-node/risingwave-source-cdc/pom.xml +++ b/java/connector-node/risingwave-source-cdc/pom.xml @@ -56,7 +56,8 @@ com.zendesk mysql-binlog-connector-java - 0.27.2 + + 0.29.0 org.postgresql diff --git a/java/pom.xml b/java/pom.xml index 644588c9d6b4..58f33e9da56d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -74,7 +74,7 @@ 2.11.0 1.10.0 3.12.0 - 2.4.2.Final + 2.6.2.Final 2.15.0 3.3.1 3.4.0 From 3f177a542410011d30dd8260c57247dee351123b Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Sun, 30 Jun 2024 18:41:48 -0400 Subject: [PATCH 2/5] fix snapshot mode --- .../source/common/DbzConnectorConfig.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index a5804974fb29..432fc1309753 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -146,21 +146,21 @@ public DbzConnectorConfig( // If cdc backfill enabled, the source only emit incremental changes, so we must // rewind to the given offset and continue binlog reading from there if (null != startOffset && !startOffset.isBlank()) { - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } else { // read upstream table schemas and emit incremental changes only - mysqlProps.setProperty("snapshot.mode", "schema_only"); + mysqlProps.setProperty("snapshot.mode", "never"); } } else { // if snapshot phase is finished and offset is specified, we will continue binlog // reading from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - // 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is + // 'snapshot.mode=recovery' must be configured if binlog offset is // specified. It only snapshots the schemas, not the data, and continue binlog // reading from the specified offset - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -180,7 +180,7 @@ public DbzConnectorConfig( } if (isCdcBackfill) { // skip the initial snapshot for cdc backfill - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); // if startOffset is specified, we should continue // reading changes from the given offset @@ -193,7 +193,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -224,7 +224,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -235,7 +235,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - mongodbProps.setProperty("snapshot.mode", "never"); + mongodbProps.setProperty("snapshot.mode", "no_data"); mongodbProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } From 654d105264d5a595bbd2c56f899e0d30079cd324 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 22 Jul 2024 14:32:40 -0400 Subject: [PATCH 3/5] fix test --- src/connector/src/parser/unified/json.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index ca709e2eebc7..bf1444288af4 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -261,7 +261,7 @@ impl JsonParseOptions { DataType::Int16, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i16().map_err(|_| create_error())?.into(), - + (DataType::Int16, ValueType::Bool) => (value.as_bool().unwrap() as i16).into(), (DataType::Int16, ValueType::String) if matches!( self.numeric_handling, @@ -282,7 +282,7 @@ impl JsonParseOptions { DataType::Int32, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i32().map_err(|_| create_error())?.into(), - + (DataType::Int32, ValueType::Bool) => (value.as_bool().unwrap() as i32).into(), (DataType::Int32, ValueType::String) if matches!( self.numeric_handling, @@ -303,7 +303,7 @@ impl JsonParseOptions { DataType::Int64, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i64().map_err(|_| create_error())?.into(), - + (DataType::Int64, ValueType::Bool) => (value.as_bool().unwrap() as i64).into(), (DataType::Int64, ValueType::String) if matches!( self.numeric_handling, From 36c0fa41ea6ce8197c190ecfcc1d01c8ce11dc6d Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 25 Jul 2024 22:45:55 -0400 Subject: [PATCH 4/5] fix --- .../risingwave/connector/source/common/DbzConnectorConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 432fc1309753..faae0048649b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -151,7 +151,7 @@ public DbzConnectorConfig( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } else { // read upstream table schemas and emit incremental changes only - mysqlProps.setProperty("snapshot.mode", "never"); + mysqlProps.setProperty("snapshot.mode", "no_data"); } } else { // if snapshot phase is finished and offset is specified, we will continue binlog From dcb2c96425f85b17776b6b10ba92bb7fc5aa52fb Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 26 Jul 2024 14:21:19 -0400 Subject: [PATCH 5/5] fix --- src/connector/src/parser/unified/json.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index bf1444288af4..ca709e2eebc7 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -261,7 +261,7 @@ impl JsonParseOptions { DataType::Int16, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i16().map_err(|_| create_error())?.into(), - (DataType::Int16, ValueType::Bool) => (value.as_bool().unwrap() as i16).into(), + (DataType::Int16, ValueType::String) if matches!( self.numeric_handling, @@ -282,7 +282,7 @@ impl JsonParseOptions { DataType::Int32, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i32().map_err(|_| create_error())?.into(), - (DataType::Int32, ValueType::Bool) => (value.as_bool().unwrap() as i32).into(), + (DataType::Int32, ValueType::String) if matches!( self.numeric_handling, @@ -303,7 +303,7 @@ impl JsonParseOptions { DataType::Int64, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i64().map_err(|_| create_error())?.into(), - (DataType::Int64, ValueType::Bool) => (value.as_bool().unwrap() as i64).into(), + (DataType::Int64, ValueType::String) if matches!( self.numeric_handling,