From 746a793bbf0cddfcf4db7501b815a174d2fc9ae8 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 22 Sep 2023 14:58:36 +0800 Subject: [PATCH 1/4] fix citus publication alter --- .../source/SourceValidateHandler.java | 12 ++++-------- .../source/common/CitusValidator.java | 18 ++++++++++++++++++ .../source/common/PostgresValidator.java | 2 +- 3 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 2611d1cca676b..30141741055bc 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -15,10 +15,7 @@ package com.risingwave.connector.source; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.source.common.DbzConnectorConfig; -import com.risingwave.connector.source.common.MySqlValidator; -import com.risingwave.connector.source.common.PostgresValidator; -import com.risingwave.connector.source.common.ValidatorUtils; +import com.risingwave.connector.source.common.*; import com.risingwave.proto.ConnectorServiceProto; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -97,7 +94,7 @@ private void validateSource(ConnectorServiceProto.ValidateSourceRequest request) case CITUS: ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); - try (var coordinatorValidator = new PostgresValidator(props, tableSchema)) { + try (var coordinatorValidator = new CitusValidator(props, tableSchema)) { coordinatorValidator.validateDistributedTable(); coordinatorValidator.validateTable(); } @@ -110,13 +107,12 @@ private void validateSource(ConnectorServiceProto.ValidateSourceRequest request) for (String workerAddr : workerServers) { String[] hostPort = StringUtils.split(workerAddr, ':'); if (hostPort.length != 2) { - throw ValidatorUtils.invalidArgument( - String.format("invalid database.servers")); + throw ValidatorUtils.invalidArgument("invalid database.servers"); } // set HOST for each worker server mutableProps.put(DbzConnectorConfig.HOST, hostPort[0]); mutableProps.put(DbzConnectorConfig.PORT, hostPort[1]); - try (var workerValidator = new PostgresValidator(mutableProps, tableSchema)) { + try (var workerValidator = new CitusValidator(mutableProps, tableSchema)) { workerValidator.validateDbConfig(); workerValidator.validateUserPrivilege(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java new file mode 100644 index 0000000000000..08f286e4f8a7c --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -0,0 +1,18 @@ +package com.risingwave.connector.source.common; + +import com.risingwave.connector.api.TableSchema; + +import java.sql.SQLException; +import java.util.Map; + +public class CitusValidator extends PostgresValidator { + public CitusValidator(Map userProps, TableSchema tableSchema) throws SQLException { + super(userProps, tableSchema); + } + + @Override + protected void alterPublicationIfNeeded() throws SQLException { + // do nothing for citus worker node, + // since we created a FOR ALL TABLES publication when creating the connector + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index a90ea91de7c01..25aced532112b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -460,7 +460,7 @@ private void validatePublicationPrivileges() throws SQLException { } } - private void alterPublicationIfNeeded() throws SQLException { + protected void alterPublicationIfNeeded() throws SQLException { String alterPublicationSql = String.format( "ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName); From ba2db745023edacf9e3e06e9a5dfda67ddbc377f Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 22 Sep 2023 15:17:38 +0800 Subject: [PATCH 2/4] update test --- integration_tests/citus-cdc/create_source.sql | 27 ++++++++++++++++++- .../source/common/CitusValidator.java | 4 +-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/integration_tests/citus-cdc/create_source.sql b/integration_tests/citus-cdc/create_source.sql index 106f7cda8f457..9b69abed873ad 100644 --- a/integration_tests/citus-cdc/create_source.sql +++ b/integration_tests/citus-cdc/create_source.sql @@ -20,5 +20,30 @@ CREATE TABLE orders_rw ( schema.name = 'public', table.name = 'orders', slot.name = 'orders_dbz_slot', - publication.create.enable = 'true' ); + +DROP TABLE orders_rw; + +CREATE TABLE orders_rw ( + o_orderkey bigint, + o_custkey bigint, + o_orderstatus varchar, + o_totalprice decimal, + o_orderdate date, + o_orderpriority varchar, + o_clerk varchar, + o_shippriority bigint, + o_comment varchar, + PRIMARY KEY (o_orderkey) +) WITH ( + connector = 'citus-cdc', + hostname = 'citus-master', + port = '5432', + username = 'myuser', + password = '123456', + database.servers = 'citus-worker-1:5432,citus-worker-2:5432', + database.name = 'mydb', + schema.name = 'public', + table.name = 'orders', + slot.name = 'orders_dbz_slot' +); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java index 08f286e4f8a7c..2517a9af44da4 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -1,12 +1,12 @@ package com.risingwave.connector.source.common; import com.risingwave.connector.api.TableSchema; - import java.sql.SQLException; import java.util.Map; public class CitusValidator extends PostgresValidator { - public CitusValidator(Map userProps, TableSchema tableSchema) throws SQLException { + public CitusValidator(Map userProps, TableSchema tableSchema) + throws SQLException { super(userProps, tableSchema); } From a3214f3c7cda2129dd933281dc85ec09824b4689 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 26 Sep 2023 10:26:36 +0800 Subject: [PATCH 3/4] fix license --- .../connector/source/common/CitusValidator.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java index 2517a9af44da4..d615a99d3084a 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -1,3 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package com.risingwave.connector.source.common; import com.risingwave.connector.api.TableSchema; @@ -13,6 +27,7 @@ public CitusValidator(Map userProps, TableSchema tableSchema) @Override protected void alterPublicationIfNeeded() throws SQLException { // do nothing for citus worker node, - // since we created a FOR ALL TABLES publication when creating the connector + // since we created a FOR ALL TABLES publication when creating the connector, + // which will replicates changes for all tables in the database, including tables created in the future. } } From 8626543212d2fbadacde8e363ce507e6569ea89b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 27 Sep 2023 23:25:44 +0800 Subject: [PATCH 4/4] minor --- .../com/risingwave/connector/source/common/CitusValidator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java index d615a99d3084a..db9a85b548d36 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -28,6 +28,7 @@ public CitusValidator(Map userProps, TableSchema tableSchema) protected void alterPublicationIfNeeded() throws SQLException { // do nothing for citus worker node, // since we created a FOR ALL TABLES publication when creating the connector, - // which will replicates changes for all tables in the database, including tables created in the future. + // which will replicates changes for all tables in the database, including tables created in + // the future. } }