diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index cac4896f3fa58..9cc1257a1914a 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -321,3 +321,57 @@ NULL NULL NULL {} {} {} NULL NULL NULL {NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL + +query TTT +SELECT * FROM partitioned_timestamp_table ORDER BY c_boolean, c_int; +---- +1 f 2023-02-01 10:30:00 +2 f 2023-05-15 11:45:00 +3 f 2023-11-03 12:15:00 +4 f 2024-01-04 13:00:00 +5 f 2024-03-05 09:30:00 +6 f 2024-06-06 14:20:00 +7 f 2024-09-07 16:45:00 +8 f 2025-01-08 18:30:00 +9 f 2025-07-09 07:10:00 +11 t 2023-02-01 10:30:00 +12 t 2023-05-15 11:45:00 +13 t 2023-11-03 12:15:00 +14 t 2024-01-04 13:00:00 +15 t 2024-03-05 09:30:00 +16 t 2024-06-06 14:20:00 +17 t 2024-09-07 16:45:00 +18 t 2025-01-08 18:30:00 +19 t 2025-07-09 07:10:00 + +query TTT +SELECT * FROM partitioned_timestamp_table_2023 ORDER BY c_boolean, c_int; +---- +1 f 2023-02-01 10:30:00 +2 f 2023-05-15 11:45:00 +3 f 2023-11-03 12:15:00 +11 t 2023-02-01 10:30:00 +12 t 2023-05-15 11:45:00 +13 t 2023-11-03 12:15:00 + +query TTT +SELECT * FROM partitioned_timestamp_table_shared ORDER BY c_boolean, c_int; +---- +1 f 2023-02-01 10:30:00 +2 f 2023-05-15 11:45:00 +3 f 2023-11-03 12:15:00 +4 f 2024-01-04 13:00:00 +5 f 2024-03-05 09:30:00 +6 f 2024-06-06 14:20:00 +7 f 2024-09-07 16:45:00 +8 f 2025-01-08 18:30:00 +9 f 2025-07-09 07:10:00 +11 t 2023-02-01 10:30:00 +12 t 2023-05-15 11:45:00 +13 t 2023-11-03 12:15:00 +14 t 2024-01-04 13:00:00 +15 t 2024-03-05 09:30:00 +16 t 2024-06-06 14:20:00 +17 t 2024-09-07 16:45:00 +18 t 2025-01-08 18:30:00 +19 t 2025-07-09 07:10:00 diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 599394c8a0047..a50a0b7c9458d 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -258,3 +258,43 @@ create table upper_orders ( table.name = 'Orders', slot.name = 'orders' ); + +# for the partitioned table +statement ok +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table', + publication.name = 'rw_publication_partition_root', + slot.name = 'my_slot_partition' +); + +# for only one partition, as Postgres does not support adding both a partitioned tableand its individual partitions to the same publication, we use different publication for the partition +statement ok +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023', + publication.name = 'rw_publication_partition', + slot.name = 'my_slot_partition_2' +); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index b180a67c2cb2a..83840239b623a 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -288,6 +288,14 @@ INCLUDE SCHEMA_NAME as schema_name INCLUDE TABLE_NAME as table_name FROM pg_source TABLE 'public.person'; +statement ok +CREATE TABLE partitioned_timestamp_table_shared( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) from pg_source table 'public.partitioned_timestamp_table'; + statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/cdc.validate.postgres.slt b/e2e_test/source/cdc/cdc.validate.postgres.slt index 1392357298fc3..1b14942c7af63 100644 --- a/e2e_test/source/cdc/cdc.validate.postgres.slt +++ b/e2e_test/source/cdc/cdc.validate.postgres.slt @@ -243,3 +243,22 @@ create table shipments ( statement ok drop table shipments; + +statement error Table 'partitioned_timestamp_table' has partitions, which requires publication 'rw_publication_pubviaroot_false' to be created with `publish_via_partition_root = true`. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table', + publication.name = 'rw_publication_pubviaroot_false', + slot.name = 'my_slot_partition' +); diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 20b6e7b414469..f80dfa267432b 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -112,3 +112,34 @@ CREATE TABLE "Orders" ( name varchar ); INSERT INTO "Orders" VALUES (1, 'happy'); + + +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) PARTITION BY RANGE (c_timestamp); + +CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table + FOR VALUES FROM ('2023-01-01') TO ('2023-12-31'); + +CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table + FOR VALUES FROM ('2024-01-01') TO ('2024-12-31'); + +CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table + FOR VALUES FROM ('2025-01-01') TO ('2025-12-31'); + +INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES +(1, false, '2023-02-01 10:30:00'), +(2, false, '2023-05-15 11:45:00'), +(3, false, '2023-11-03 12:15:00'), +(4, false, '2024-01-04 13:00:00'), +(5, false, '2024-03-05 09:30:00'), +(6, false, '2024-06-06 14:20:00'), +(7, false, '2024-09-07 16:45:00'), +(8, false, '2025-01-08 18:30:00'), +(9, false, '2025-07-09 07:10:00'); + +-- Here we create this publication without `WITH ( publish_via_partition_root = true )` only for tests. Normally, it should be added. +create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table; diff --git a/e2e_test/source/cdc/postgres_cdc_insert.sql b/e2e_test/source/cdc/postgres_cdc_insert.sql index 4c0d0dee48b42..8054ec1508778 100644 --- a/e2e_test/source/cdc/postgres_cdc_insert.sql +++ b/e2e_test/source/cdc/postgres_cdc_insert.sql @@ -32,3 +32,13 @@ INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL, INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}'); INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL); +INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES +(11, true, '2023-02-01 10:30:00'), +(12, true, '2023-05-15 11:45:00'), +(13, true, '2023-11-03 12:15:00'), +(14, true, '2024-01-04 13:00:00'), +(15, true, '2024-03-05 09:30:00'), +(16, true, '2024-06-06 14:20:00'), +(17, true, '2024-09-07 16:45:00'), +(18, true, '2025-01-08 18:30:00'), +(19, true, '2025-07-09 07:10:00'); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 13ca00261287b..6ab10bf18eb04 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -83,11 +83,13 @@ public static void createPostgresPublicationIfNeeded( if (schemaTableName.isPresent()) { createPublicationSql = String.format( - "CREATE PUBLICATION %s FOR TABLE %s;", + "CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );", quotePostgres(pubName), schemaTableName.get()); } else { createPublicationSql = - String.format("CREATE PUBLICATION %s", quotePostgres(pubName)); + String.format( + "CREATE PUBLICATION %s WITH ( publish_via_partition_root = true );", + quotePostgres(pubName)); } try (var stmt = jdbcConnection.createStatement()) { LOG.info( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 6e50cb0772ae7..9d5b65c1d73af 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -331,6 +331,7 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { private void validatePublicationConfig(boolean isSuperUser) throws SQLException { boolean isPublicationCoversTable = false; boolean isPublicationExists = false; + boolean isPublicationViaRoot = false; boolean isPartialPublicationEnabled = false; // Check whether publication exists @@ -340,7 +341,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException stmt.setString(1, pubName); var res = stmt.executeQuery(); while (res.next()) { - isPublicationExists = res.getBoolean(1); + isPublicationViaRoot = res.getBoolean(1); + isPublicationExists = true; } } @@ -370,6 +372,31 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException isPartialPublicationEnabled = res.getBoolean(1); } } + + List partitions = new ArrayList<>(); + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_names"))) { + stmt.setString(1, schemaName); + stmt.setString(2, tableName); + var res = stmt.executeQuery(); + while (res.next()) { + partitions.add(res.getString(1)); + } + } + if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) { + // make sure the publication are created with `publish_via_partition_root = true`, which + // is required by partitioned tables. + throw ValidatorUtils.invalidArgument( + "Table '" + + tableName + + "' has partitions, which requires publication '" + + pubName + + "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '" + + pubName + + "'` in the upstream Postgres to check."); + } + // PG 15 and up supports partial publication of table // check whether publication covers all columns of the table schema if (isPartialPublicationEnabled) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index cb092acb5a26a..0288b7b37fcac 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -17,8 +17,9 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? -postgres.publication_exist=SELECT COUNT(*) > 0 from pg_publication WHERE pubname = ? +postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ? postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? +postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?; postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \ SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ INNER JOIN pg_roles r1 ON r1.oid = am.roleid \ diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index 5ccb24cef3ffc..3e7a02ca1c111 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -177,7 +177,8 @@ public void testPermissionCheck() throws SQLException { "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; SourceTestClient.performQuery(connDbz, query); // create a partial publication, check whether error is reported - query = "CREATE PUBLICATION rw_publication FOR TABLE orders (o_key)"; + query = + "CREATE PUBLICATION rw_publication FOR TABLE orders (o_key) WITH ( publish_via_partition_root = true );"; SourceTestClient.performQuery(connDbz, query); ConnectorServiceProto.TableSchema tableSchema = ConnectorServiceProto.TableSchema.newBuilder()