Skip to content

Commit

Permalink
fix(citus-cdc): fix publication alter of citus connector (risingwavel…
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Sep 28, 2023
1 parent 48b8501 commit 6f099ae
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
27 changes: 26 additions & 1 deletion integration_tests/citus-cdc/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,30 @@ CREATE TABLE orders_rw (
schema.name = 'public',
table.name = 'orders',
slot.name = 'orders_dbz_slot',
publication.create.enable = 'true'
);

DROP TABLE orders_rw;

CREATE TABLE orders_rw (
o_orderkey bigint,
o_custkey bigint,
o_orderstatus varchar,
o_totalprice decimal,
o_orderdate date,
o_orderpriority varchar,
o_clerk varchar,
o_shippriority bigint,
o_comment varchar,
PRIMARY KEY (o_orderkey)
) WITH (
connector = 'citus-cdc',
hostname = 'citus-master',
port = '5432',
username = 'myuser',
password = '123456',
database.servers = 'citus-worker-1:5432,citus-worker-2:5432',
database.name = 'mydb',
schema.name = 'public',
table.name = 'orders',
slot.name = 'orders_dbz_slot'
);
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package com.risingwave.connector.source;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.MySqlValidator;
import com.risingwave.connector.source.common.PostgresValidator;
import com.risingwave.connector.source.common.ValidatorUtils;
import com.risingwave.connector.source.common.*;
import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -97,7 +94,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

case CITUS:
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new PostgresValidator(props, tableSchema)) {
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
coordinatorValidator.validateDistributedTable();
coordinatorValidator.validateTable();
}
Expand All @@ -110,13 +107,12 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
for (String workerAddr : workerServers) {
String[] hostPort = StringUtils.split(workerAddr, ':');
if (hostPort.length != 2) {
throw ValidatorUtils.invalidArgument(
String.format("invalid database.servers"));
throw ValidatorUtils.invalidArgument("invalid database.servers");
}
// set HOST for each worker server
mutableProps.put(DbzConnectorConfig.HOST, hostPort[0]);
mutableProps.put(DbzConnectorConfig.PORT, hostPort[1]);
try (var workerValidator = new PostgresValidator(mutableProps, tableSchema)) {
try (var workerValidator = new CitusValidator(mutableProps, tableSchema)) {
workerValidator.validateDbConfig();
workerValidator.validateUserPrivilege();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.common;

import com.risingwave.connector.api.TableSchema;
import java.sql.SQLException;
import java.util.Map;

public class CitusValidator extends PostgresValidator {
public CitusValidator(Map<String, String> userProps, TableSchema tableSchema)
throws SQLException {
super(userProps, tableSchema);
}

@Override
protected void alterPublicationIfNeeded() throws SQLException {
// do nothing for citus worker node,
// since we created a FOR ALL TABLES publication when creating the connector,
// which will replicates changes for all tables in the database, including tables created in
// the future.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private void validatePublicationPrivileges() throws SQLException {
}
}

private void alterPublicationIfNeeded() throws SQLException {
protected void alterPublicationIfNeeded() throws SQLException {
String alterPublicationSql =
String.format(
"ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName);
Expand Down

0 comments on commit 6f099ae

Please sign in to comment.