Skip to content

Commit

Permalink
fix(pgcdc): fix uppercase-identifier of pgcdc (#17754)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Jul 23, 2024
1 parent 243c25e commit 5bd3e3e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 3 deletions.
2 changes: 1 addition & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
interval: 5s
timeout: 5s
retries: 5
command: [ "postgres", "-c", "wal_level=logical" ]
command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ]

mysql:
image: mysql:8.0
Expand Down
5 changes: 5 additions & 0 deletions e2e_test/source/cdc/cdc.check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,8 @@ select * from enum_to_varchar order by id;
----
1 happy
2 ok

query II
select * from upper_orders order by id;
----
1 happy
16 changes: 16 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,19 @@ create table enum_to_varchar (
table.name = 'enum_table',
slot.name = 'enum_to_varchar'
);

statement ok
create table upper_orders (
id int,
name varchar,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name = 'Orders',
slot.name = 'orders'
);
11 changes: 11 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ CREATE TABLE person_new (
city varchar
) FROM pg_source TABLE 'person';

statement ok
CREATE TABLE upper_orders_shared (
id int PRIMARY KEY,
name varchar
) FROM pg_source TABLE 'public.Orders';

statement ok
CREATE TABLE person_new (
id int,
Expand Down Expand Up @@ -509,3 +515,8 @@ select * from enum_to_varchar_shared order by id;
----
1 happy
2 ok

query II
select * from upper_orders_shared order by id;
----
1 happy
6 changes: 6 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,9 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[],
INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}');
INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}');
INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

CREATE TABLE "Orders" (
id int PRIMARY KEY,
name varchar
);
INSERT INTO "Orders" VALUES (1, 'happy');
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void validateTableSchema() throws SQLException {
// check primary key
// reference: https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.pk"))) {
stmt.setString(1, this.schemaName + "." + this.tableName);
stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
var res = stmt.executeQuery();
var pkFields = new HashSet<String>();
while (res.next()) {
Expand Down Expand Up @@ -521,7 +521,8 @@ protected void alterPublicationIfNeeded() throws SQLException {

String alterPublicationSql =
String.format(
"ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName);
"ALTER PUBLICATION %s ADD TABLE %s",
pubName, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
try (var stmt = jdbcConnection.createStatement()) {
LOG.info("Altered publication with statement: {}", alterPublicationSql);
stmt.execute(alterPublicationSql);
Expand Down

0 comments on commit 5bd3e3e

Please sign in to comment.