From 6f099ae97269c630864c5d71917d45cef7e95c9c Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 28 Sep 2023 11:03:56 +0800 Subject: [PATCH] fix(citus-cdc): fix publication alter of citus connector (#12488) --- integration_tests/citus-cdc/create_source.sql | 27 ++++++++++++++- .../source/SourceValidateHandler.java | 12 +++---- .../source/common/CitusValidator.java | 34 +++++++++++++++++++ .../source/common/PostgresValidator.java | 2 +- 4 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java diff --git a/integration_tests/citus-cdc/create_source.sql b/integration_tests/citus-cdc/create_source.sql index 106f7cda8f457..9b69abed873ad 100644 --- a/integration_tests/citus-cdc/create_source.sql +++ b/integration_tests/citus-cdc/create_source.sql @@ -20,5 +20,30 @@ CREATE TABLE orders_rw ( schema.name = 'public', table.name = 'orders', slot.name = 'orders_dbz_slot', - publication.create.enable = 'true' ); + +DROP TABLE orders_rw; + +CREATE TABLE orders_rw ( + o_orderkey bigint, + o_custkey bigint, + o_orderstatus varchar, + o_totalprice decimal, + o_orderdate date, + o_orderpriority varchar, + o_clerk varchar, + o_shippriority bigint, + o_comment varchar, + PRIMARY KEY (o_orderkey) +) WITH ( + connector = 'citus-cdc', + hostname = 'citus-master', + port = '5432', + username = 'myuser', + password = '123456', + database.servers = 'citus-worker-1:5432,citus-worker-2:5432', + database.name = 'mydb', + schema.name = 'public', + table.name = 'orders', + slot.name = 'orders_dbz_slot' +); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 38fd23ae1c1aa..18517ebb6dbf3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -15,10 +15,7 @@ package com.risingwave.connector.source; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.source.common.DbzConnectorConfig; -import com.risingwave.connector.source.common.MySqlValidator; -import com.risingwave.connector.source.common.PostgresValidator; -import com.risingwave.connector.source.common.ValidatorUtils; +import com.risingwave.connector.source.common.*; import com.risingwave.proto.ConnectorServiceProto; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -97,7 +94,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re case CITUS: ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME); - try (var coordinatorValidator = new PostgresValidator(props, tableSchema)) { + try (var coordinatorValidator = new CitusValidator(props, tableSchema)) { coordinatorValidator.validateDistributedTable(); coordinatorValidator.validateTable(); } @@ -110,13 +107,12 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re for (String workerAddr : workerServers) { String[] hostPort = StringUtils.split(workerAddr, ':'); if (hostPort.length != 2) { - throw ValidatorUtils.invalidArgument( - String.format("invalid database.servers")); + throw ValidatorUtils.invalidArgument("invalid database.servers"); } // set HOST for each worker server mutableProps.put(DbzConnectorConfig.HOST, hostPort[0]); mutableProps.put(DbzConnectorConfig.PORT, hostPort[1]); - try (var workerValidator = new PostgresValidator(mutableProps, tableSchema)) { + try (var workerValidator = new CitusValidator(mutableProps, tableSchema)) { workerValidator.validateDbConfig(); workerValidator.validateUserPrivilege(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java new file mode 100644 index 0000000000000..db9a85b548d36 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -0,0 +1,34 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.common; + +import com.risingwave.connector.api.TableSchema; +import java.sql.SQLException; +import java.util.Map; + +public class CitusValidator extends PostgresValidator { + public CitusValidator(Map userProps, TableSchema tableSchema) + throws SQLException { + super(userProps, tableSchema); + } + + @Override + protected void alterPublicationIfNeeded() throws SQLException { + // do nothing for citus worker node, + // since we created a FOR ALL TABLES publication when creating the connector, + // which will replicates changes for all tables in the database, including tables created in + // the future. + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index a90ea91de7c01..25aced532112b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -460,7 +460,7 @@ private void validatePublicationPrivileges() throws SQLException { } } - private void alterPublicationIfNeeded() throws SQLException { + protected void alterPublicationIfNeeded() throws SQLException { String alterPublicationSql = String.format( "ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName);