From aa9dcac98985f9650595ef4b67e43d2601e41a44 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 28 Dec 2023 18:47:26 +0800 Subject: [PATCH] feat(cdc): support postgres cdc backfill (#13958) --- Cargo.lock | 1 + ci/scripts/e2e-source-test.sh | 15 +- e2e_test/source/cdc/cdc.check.slt | 5 - e2e_test/source/cdc/cdc.check_new_rows.slt | 15 +- e2e_test/source/cdc/cdc.load.slt | 63 ++-- e2e_test/source/cdc/cdc.share_stream.slt | 111 ++++++ e2e_test/source/cdc/postgres_cdc.sql | 39 ++- e2e_test/source/cdc/postgres_cdc_insert.sql | 10 +- .../source/SourceValidateHandler.java | 4 +- .../source/common/CitusValidator.java | 2 +- .../source/common/DbzConnectorConfig.java | 39 ++- .../source/common/DbzSourceUtils.java | 84 ++++- .../source/common/PostgresValidator.java | 73 ++-- .../source/core/JniDbzSourceHandler.java | 11 +- .../src/main/resources/debezium.properties | 3 + .../src/main/resources/postgres.properties | 12 +- .../converters/DatetimeTypeConverter.java | 3 +- src/common/src/types/datetime.rs | 54 ++- src/common/src/types/decimal.rs | 73 ++-- src/common/src/types/mod.rs | 1 + src/common/src/types/ordered_float.rs | 30 ++ src/common/src/types/serial.rs | 17 +- src/common/src/types/timestamptz.rs | 17 +- src/common/src/types/to_sql.rs | 76 +++++ src/compute/tests/cdc_tests.rs | 7 +- src/connector/Cargo.toml | 1 + src/connector/src/error.rs | 3 + src/connector/src/parser/mod.rs | 4 +- src/connector/src/parser/mysql.rs | 6 +- src/connector/src/parser/postgres.rs | 291 ++++++++++++++++ .../{ => cdc/external}/mock_external_table.rs | 8 +- .../{external.rs => cdc/external/mod.rs} | 94 +++--- .../src/source/cdc/external/postgres.rs | 319 ++++++++++++++++++ src/connector/src/source/cdc/mod.rs | 1 + src/connector/src/source/cdc/split.rs | 2 +- src/connector/src/source/mod.rs | 3 - src/frontend/src/handler/create_source.rs | 7 +- src/frontend/src/handler/create_table.rs | 20 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 9 +- src/stream/src/executor/backfill/cdc/state.rs | 2 +- .../backfill/cdc/upstream_table/external.rs | 2 +- .../backfill/cdc/upstream_table/snapshot.rs | 2 +- src/stream/src/executor/backfill/utils.rs | 9 +- src/stream/src/from_proto/stream_cdc_scan.rs | 2 +- 44 files changed, 1334 insertions(+), 216 deletions(-) create mode 100644 src/common/src/types/to_sql.rs create mode 100644 src/connector/src/parser/postgres.rs rename src/connector/src/source/{ => cdc/external}/mock_external_table.rs (94%) rename src/connector/src/source/{external.rs => cdc/external/mod.rs} (87%) create mode 100644 src/connector/src/source/cdc/external/postgres.rs diff --git a/Cargo.lock b/Cargo.lock index a3baa894a398c..c4c882ea9bde3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8507,6 +8507,7 @@ dependencies = [ "tempfile", "thiserror", "time", + "tokio-postgres", "tokio-retry", "tokio-stream", "tokio-util", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 6ccb09dc72ff5..c1624230abc7e 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -72,8 +72,9 @@ sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt' # kill cluster cargo make kill -echo "cluster killed " +echo "> cluster killed " +echo "--- mysql & postgres recovery check" # insert into mytest database (cdc.share_stream.slt) mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'RisingWave','Next generation Streaming Database'), @@ -84,16 +85,19 @@ mysql --protocol=tcp -u root mytest -e "INSERT INTO products # insert new rows mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql +echo "> inserted new rows into mysql" + psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql -echo "inserted new rows into mysql and postgres" +echo "> inserted new rows into postgres" # start cluster w/o clean-data -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +unset RISINGWAVE_CI +export RUST_LOG="events::stream::message::chunk=trace,risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \ cargo make dev ci-1cn-1fe-with-recovery -echo "wait for cluster recovery finish" +echo "> wait for cluster recovery finish" sleep 20 -echo "check mviews after cluster recovery" +echo "> check mviews after cluster recovery" # check results sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check_new_rows.slt' @@ -104,6 +108,7 @@ echo "--- Kill cluster" cargo make ci-kill echo "--- e2e, ci-1cn-1fe, protobuf schema registry" +export RISINGWAVE_CI=true RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make ci-start ci-1cn-1fe python3 -m pip install requests protobuf confluent-kafka diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index f9feed9429d2c..b90137948be1e 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -42,11 +42,6 @@ select count(*) from t1_rw; ---- 1 -query I -select count(*) from person_rw; ----- -3 - query I select count(*) from tt3_rw; ---- diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 271cbb56cc2f1..a5e7c271b465e 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -15,14 +15,22 @@ select cnt from shipments_cnt; 4 query ITTTT -select * from person_rw order by id; +select * from person_new order by id; ---- 1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne 1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise 1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles -1003 张三 kedmrpz@xiauh.com 5536 1959 5460 2096 北京 -1004 李四 egpemle@lrhcg.com 0052 8113 1582 4430 上海 +1100 noris ypl@qbxfg.com 1864 2539 enne +1101 white myc@xpmpe.com 8157 6974 se +1102 spencer wip@dkaap.com 9481 6270 angeles +1203 张三 kedmrpz@xiauh.com 5536 1959 5460 2096 北京 +1204 李四 egpemle@lrhcg.com 0052 8113 1582 4430 上海 +# 2 rows inserted after cluster is stopped +query I +SELECT * from person_new_cnt +---- +8 query ITTTTT select order_id,order_date,customer_name,product_id,order_status from orders order by order_id; @@ -65,3 +73,4 @@ SELECT * FROM products_test order by id limit 3 101 RW Small 2-wheel scooter 102 RW 12V car battery 103 RW 12-pack of drill bits with sizes ranging from #40 to #3 + diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index ce0926339c963..7dda34fe08518 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -1,4 +1,5 @@ # CDC source basic test +control substitution on statement ok create table products ( id INT, @@ -52,11 +53,11 @@ create table shipments ( PRIMARY KEY (shipment_id) ) with ( connector = 'postgres-cdc', - hostname = 'db', - port = '5432', - username = 'postgres', - password = 'postgres', - database.name = 'cdc_test', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', table.name = 'shipments', slot.name = 'shipments' ); @@ -148,11 +149,11 @@ create table shipments_2 ( PRIMARY KEY (shipment_id) ) with ( connector = 'postgres-cdc', - hostname = 'db', - port = '5432', - username = 'postgres', - password = 'postgres', - database.name = 'cdc_test', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', table.name = 'shipments' ); @@ -164,11 +165,11 @@ create table t1_rw ( v3 varchar ) with ( connector = 'postgres-cdc', - hostname = 'db', - port = '5432', - username = 'postgres', - password='postgres', - database.name='cdc_test', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', table.name='t1', schema.name='abs', slot.name='t1_slot', @@ -176,25 +177,6 @@ create table t1_rw ( publication.create.enable='false' ); -statement ok -create table person_rw ( - id int primary key, - name varchar, - email_address varchar, - credit_card varchar, - city varchar -) with ( - connector = 'postgres-cdc', - hostname = 'db', - port = '5432', - username = 'postgres', - password='postgres', - database.name='cdc_test', - table.name='person', - publication.name='my_publicaton', - publication.create.enable='false' -); - statement error create table person_rw ( id int primary key, @@ -202,15 +184,12 @@ create table person_rw ( email_address varchar ) with ( connector = 'postgres-cdc', - hostname = 'db', - port = '5432', - username='postgres', - password='postgres', - database.name='cdc_test', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', table.name='person', publication.name='dumb_publicaton', publication.create.enable='false' ); - -statement ok -create materialized view person_cnt as select count(*) as cnt from person_rw; diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 0d46b8c51a57d..babfb685a945c 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -23,6 +23,13 @@ create source mysql_mytest with ( server.id = '5601' ); +statement error The upstream table name must contain database name prefix* +create table products_test ( id INT, + name STRING, + description STRING, + PRIMARY KEY (id) +) from mysql_mytest table 'products'; + statement ok create table products_test ( id INT, name STRING, @@ -127,3 +134,107 @@ SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_flo ---- -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL + +statement ok +create source pg_source with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'pg_slot' +); + +# test postgres backfill data types +statement ok +CREATE TABLE IF NOT EXISTS postgres_all_types( + c_boolean boolean, + c_smallint smallint, + c_integer integer, + c_bigint bigint, + c_decimal decimal, + c_real real, + c_double_precision double precision, + c_varchar varchar, + c_bytea bytea, + c_date date, + c_time time, + c_timestamp timestamp, + c_timestamptz timestamptz, + c_interval interval, + c_jsonb jsonb, + c_boolean_array boolean[], + c_smallint_array smallint[], + c_integer_array integer[], + c_bigint_array bigint[], + c_decimal_array decimal[], + c_real_array real[], + c_double_precision_array double precision[], + c_varchar_array varchar[], + c_bytea_array bytea[], + c_date_array date[], + c_time_array time[], + c_timestamp_array timestamp[], + c_timestamptz_array timestamptz[], + c_interval_array interval[], + c_jsonb_array jsonb[], + PRIMARY KEY (c_boolean,c_bigint,c_date) +) from pg_source table 'public.postgres_all_types'; + +statement error The upstream table name must contain schema name prefix* +CREATE TABLE person_new ( + id int, + name varchar, + email_address varchar, + credit_card varchar, + city varchar, + PRIMARY KEY (id) +) FROM pg_source TABLE 'person'; + +statement ok +CREATE TABLE person_new ( + id int, + name varchar, + email_address varchar, + credit_card varchar, + city varchar, + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.person'; + +statement ok +CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; + +sleep 3s + +query TTTTTTT +SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807 +---- +f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} + + +# postgres streaming test +system ok +psql -c " +INSERT INTO person VALUES (1100, 'noris', 'ypl@qbxfg.com', '1864 2539', 'enne'); +INSERT INTO person VALUES (1101, 'white', 'myc@xpmpe.com', '8157 6974', 'se'); +INSERT INTO person VALUES (1102, 'spencer', 'wip@dkaap.com', '9481 6270', 'angeles'); +" + +sleep 3s + +# 3 history, 3 new rows +query I +SELECT * from person_new_cnt +---- +6 + +query ITTTT +SELECT * from person_new order by id; +---- +1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne +1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise +1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles +1100 noris ypl@qbxfg.com 1864 2539 enne +1101 white myc@xpmpe.com 8157 6974 se +1102 spencer wip@dkaap.com 9481 6270 angeles diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 74f9341da9a67..43dba14950b36 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -33,4 +33,41 @@ INSERT INTO person VALUES (1002, 'sarah spencer', 'wipvdbm@dkaap.com', '3453 498 create schema abs; create table abs.t1 (v1 int primary key, v2 double precision, v3 varchar, v4 numeric); create publication my_publicaton for table abs.t1 (v1, v3); -insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234'); \ No newline at end of file +insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234'); + + +CREATE TABLE IF NOT EXISTS postgres_all_types( + c_boolean boolean, + c_smallint smallint, + c_integer integer, + c_bigint bigint, + c_decimal decimal, + c_real real, + c_double_precision double precision, + c_varchar varchar, + c_bytea bytea, + c_date date, + c_time time, + c_timestamp timestamp, + c_timestamptz timestamptz, + c_interval interval, + c_jsonb jsonb, + c_boolean_array boolean[], + c_smallint_array smallint[], + c_integer_array integer[], + c_bigint_array bigint[], + c_decimal_array decimal[], + c_real_array real[], + c_double_precision_array double precision[], + c_varchar_array varchar[], + c_bytea_array bytea[], + c_date_array date[], + c_time_array time[], + c_timestamp_array timestamp[], + c_timestamptz_array timestamptz[], + c_interval_array interval[], + c_jsonb_array jsonb[], + PRIMARY KEY (c_boolean,c_bigint,c_date) +); +INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]); +INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]); diff --git a/e2e_test/source/cdc/postgres_cdc_insert.sql b/e2e_test/source/cdc/postgres_cdc_insert.sql index 0e8f64ca86e1b..84ae9068c187a 100644 --- a/e2e_test/source/cdc/postgres_cdc_insert.sql +++ b/e2e_test/source/cdc/postgres_cdc_insert.sql @@ -1,7 +1,13 @@ +SELECT pg_current_wal_lsn(); + INSERT INTO shipments VALUES (default,10004,'Beijing','Shanghai',false); -INSERT INTO person VALUES (1003, '张三', 'kedmrpz@xiauh.com', '5536 1959 5460 2096', '北京'); -INSERT INTO person VALUES (1004, '李四', 'egpemle@lrhcg.com', '0052 8113 1582 4430', '上海'); +INSERT INTO person VALUES (1203, '张三', 'kedmrpz@xiauh.com', '5536 1959 5460 2096', '北京'); +INSERT INTO person VALUES (1204, '李四', 'egpemle@lrhcg.com', '0052 8113 1582 4430', '上海'); insert into abs.t1 values (2, 2.2, 'bbb', '1234.5431'); + +SELECT pg_current_wal_lsn(); +select * from pg_publication_tables where pubname='rw_publication'; +select * from public.person order by id; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index bf70e02326a75..91fc9ed4fe87b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -89,12 +89,12 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { case POSTGRES: - ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE); - try (var validator = new PostgresValidator(props, tableSchema)) { + try (var validator = + new PostgresValidator(props, tableSchema, isMultiTableShared)) { validator.validateAll(isMultiTableShared); } break; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java index db9a85b548d36..756bc65350267 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CitusValidator.java @@ -21,7 +21,7 @@ public class CitusValidator extends PostgresValidator { public CitusValidator(Map userProps, TableSchema tableSchema) throws SQLException { - super(userProps, tableSchema); + super(userProps, tableSchema, false); } @Override diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 2f27579082396..543efa12f91ee 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -153,13 +153,44 @@ public DbzConnectorConfig( dbzProps.putAll(mysqlProps); - } else if (source == SourceTypeE.POSTGRES || source == SourceTypeE.CITUS) { + } else if (source == SourceTypeE.POSTGRES) { var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); - // citus needs all_tables publication to capture all shards - if (source == SourceTypeE.CITUS) { - postgresProps.setProperty("publication.autocreate.mode", "all_tables"); + // disable publication auto creation if needed + var pubAutoCreate = + Boolean.parseBoolean( + userProps.getOrDefault(DbzConnectorConfig.PG_PUB_CREATE, "true")); + if (!pubAutoCreate) { + postgresProps.setProperty("publication.autocreate.mode", "disabled"); } + if (isCdcBackfill) { + // skip the initial snapshot for cdc backfill + postgresProps.setProperty("snapshot.mode", "never"); + + // if startOffset is specified, we should continue + // reading changes from the given offset + if (null != startOffset && !startOffset.isBlank()) { + postgresProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } + + } else { + // if snapshot phase is finished and offset is specified, we will continue reading + // changes from the given offset + if (snapshotDone && null != startOffset && !startOffset.isBlank()) { + postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } + } + + dbzProps.putAll(postgresProps); + + } else if (source == SourceTypeE.CITUS) { + var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); + + // citus needs all_tables publication to capture all shards + postgresProps.setProperty("publication.autocreate.mode", "all_tables"); // disable publication auto creation if needed var pubAutoCreate = diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index ce9c746d9b9bd..4c0e4f928f522 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -16,24 +16,100 @@ import com.risingwave.connector.api.source.SourceTypeE; import java.lang.management.ManagementFactory; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DbzSourceUtils { static final Logger LOG = LoggerFactory.getLogger(DbzSourceUtils.class); + /** + * This method is used to create a publication for the cdc source job or cdc table if it doesn't + * exist. + */ + public static void createPostgresPublicationIfNeeded( + Map properties, long sourceId) throws SQLException { + boolean pubAutoCreate = + properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + if (!pubAutoCreate) { + LOG.info( + "Postgres publication auto creation is disabled, skip creation for source {}.", + sourceId); + return; + } + + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + var dbHost = properties.get(DbzConnectorConfig.HOST); + var dbPort = properties.get(DbzConnectorConfig.PORT); + var dbName = properties.get(DbzConnectorConfig.DB_NAME); + var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.POSTGRES, dbHost, dbPort, dbName); + var user = properties.get(DbzConnectorConfig.USER); + var password = properties.get(DbzConnectorConfig.PASSWORD); + try (var jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password)) { + boolean isPubExist = false; + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.publication_exist"))) { + stmt.setString(1, pubName); + var res = stmt.executeQuery(); + if (res.next()) { + isPubExist = res.getBoolean(1); + } + } + + if (!isPubExist) { + var schemaName = properties.get(DbzConnectorConfig.PG_SCHEMA_NAME); + var tableName = properties.get(DbzConnectorConfig.TABLE_NAME); + Optional schemaTableName; + if (StringUtils.isBlank(schemaName) || StringUtils.isBlank(tableName)) { + schemaTableName = Optional.empty(); + } else { + schemaTableName = + Optional.of(quotePostgres(schemaName) + "." + quotePostgres(tableName)); + } + + // create the publication if it doesn't exist + String createPublicationSql; + if (schemaTableName.isPresent()) { + createPublicationSql = + String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + quotePostgres(pubName), schemaTableName.get()); + } else { + createPublicationSql = + String.format("CREATE PUBLICATION %s", quotePostgres(pubName)); + } + try (var stmt = jdbcConnection.createStatement()) { + LOG.info( + "Publication '{}' doesn't exist, created publication with statement: {}", + pubName, + createPublicationSql); + stmt.execute(createPublicationSql); + } + } + } + } + + private static String quotePostgres(String identifier) { + return "\"" + identifier + "\""; + } + public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) { - // Right now, we only needs to wait for MySQL source, as it's the only source that support - // backfill. After we support backfill for other sources, we need to wait for all sources - // too + // Wait for streaming source of source that supported backfill + LOG.info("Waiting for streaming source of {} to start", dbServerName); if (sourceType == SourceTypeE.MYSQL) { - LOG.info("Waiting for streaming source of {} to start", dbServerName); return waitForStreamingRunningInner("mysql", dbServerName); + } else if (sourceType == SourceTypeE.POSTGRES) { + return waitForStreamingRunningInner("postgres", dbServerName); } else { LOG.info("Unsupported backfill source, just return true for {}", dbServerName); return true; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 25aced532112b..9d9137b4a7fb8 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -45,7 +45,12 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl private static final String AWS_RDS_HOST = "rds.amazonaws.com"; private final boolean isAwsRds; - public PostgresValidator(Map userProps, TableSchema tableSchema) + // Whether the properties to validate is shared by multiple tables. + // If true, we will skip validation check for table + private final boolean isMultiTableShared; + + public PostgresValidator( + Map userProps, TableSchema tableSchema, boolean isMultiTableShared) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -69,6 +74,7 @@ public PostgresValidator(Map userProps, TableSchema tableSchema) this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + this.isMultiTableShared = isMultiTableShared; } @Override @@ -146,6 +152,9 @@ public void validateDistributedTable() throws SQLException { } private void validateTableSchema() throws SQLException { + if (isMultiTableShared) { + return; + } try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table"))) { stmt.setString(1, schemaName); stmt.setString(2, tableName); @@ -271,7 +280,7 @@ private void validatePrivileges() throws SQLException { } private void validateTablePrivileges(boolean isSuperUser) throws SQLException { - if (isSuperUser) { + if (isSuperUser || isMultiTableShared) { return; } @@ -297,9 +306,9 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { /* Check required privilege to create/alter a publication */ private void validatePublicationConfig(boolean isSuperUser) throws SQLException { - boolean publicationCoversTable = false; - boolean publicationExists = false; - boolean partialPublication = false; + boolean isPublicationCoversTable = false; + boolean isPublicationExists = false; + boolean isPartialPublicationEnabled = false; // Check whether publication exists try (var stmt = @@ -308,25 +317,39 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException stmt.setString(1, pubName); var res = stmt.executeQuery(); while (res.next()) { - publicationExists = res.getBoolean(1); + isPublicationExists = res.getBoolean(1); } } - if (!pubAutoCreate && !publicationExists) { - throw ValidatorUtils.invalidArgument( - "Publication '" + pubName + "' doesn't exist and auto create is disabled"); + if (!isPublicationExists) { + // We require a publication on upstream to publish table cdc events + if (!pubAutoCreate) { + throw ValidatorUtils.invalidArgument( + "Publication '" + pubName + "' doesn't exist and auto create is disabled"); + } else { + // createPublicationIfNeeded(Optional.empty()); + LOG.info( + "Publication '{}' doesn't exist, will be created in the process of streaming job.", + this.pubName); + } + } + + // If the source properties is shared by multiple tables, skip the following + // check of publication + if (isMultiTableShared) { + return; } // When publication exists, we should check whether it covers the table try (var stmt = jdbcConnection.createStatement()) { var res = stmt.executeQuery(ValidatorUtils.getSql("postgres.publication_att_exists")); while (res.next()) { - partialPublication = res.getBoolean(1); + isPartialPublicationEnabled = res.getBoolean(1); } } // PG 15 and up supports partial publication of table // check whether publication covers all columns of the table schema - if (partialPublication) { + if (isPartialPublicationEnabled) { try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.publication_attnames"))) { @@ -346,10 +369,10 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException pubName, schemaName + "." + tableName)); } if (i == tableSchema.getNumColumns() - 1) { - publicationCoversTable = true; + isPublicationCoversTable = true; } } - if (publicationCoversTable) { + if (isPublicationCoversTable) { LOG.info( "The publication covers the table '{}'.", schemaName + "." + tableName); @@ -368,9 +391,11 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException stmt.setString(3, pubName); var res = stmt.executeQuery(); if (res.next()) { - publicationCoversTable = res.getBoolean(1); - if (publicationCoversTable) { - LOG.info("The publication covers the table."); + isPublicationCoversTable = res.getBoolean(1); + if (isPublicationCoversTable) { + LOG.info( + "The publication covers the table '{}'.", + schemaName + "." + tableName); } } } @@ -378,12 +403,12 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException // If auto create is enabled and the publication doesn't exist or doesn't cover the table, // we need to create or alter the publication. And we need to check the required privileges. - if (!publicationCoversTable) { + if (!isPublicationCoversTable) { // check whether the user has the CREATE privilege on database if (!isSuperUser) { validatePublicationPrivileges(); } - if (publicationExists) { + if (isPublicationExists) { alterPublicationIfNeeded(); } else { LOG.info( @@ -394,6 +419,11 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } private void validatePublicationPrivileges() throws SQLException { + if (isMultiTableShared) { + throw ValidatorUtils.invalidArgument( + "The connector properties is shared by multiple tables unexpectedly"); + } + // check whether the user has the CREATE privilege on database try (var stmt = jdbcConnection.prepareStatement( @@ -461,11 +491,16 @@ private void validatePublicationPrivileges() throws SQLException { } protected void alterPublicationIfNeeded() throws SQLException { + if (isMultiTableShared) { + throw ValidatorUtils.invalidArgument( + "The connector properties is shared by multiple tables unexpectedly"); + } + String alterPublicationSql = String.format( "ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName); try (var stmt = jdbcConnection.createStatement()) { - LOG.info("Altered publication with statement: {}", stmt); + LOG.info("Altered publication with statement: {}", alterPublicationSql); stmt.execute(alterPublicationSql); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index bbf99d3677926..4dd16100928c9 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -14,9 +14,12 @@ package com.risingwave.connector.source.core; +import static com.risingwave.proto.ConnectorServiceProto.SourceType.POSTGRES; + import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.source.common.DbzConnectorConfig; +import com.risingwave.connector.source.common.DbzSourceUtils; import com.risingwave.java.binding.Binding; import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.proto.ConnectorServiceProto; @@ -38,7 +41,7 @@ public JniDbzSourceHandler(DbzConnectorConfig config) { } public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) - throws com.google.protobuf.InvalidProtocolBufferException { + throws Exception { var request = ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes); @@ -50,6 +53,12 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long mutableUserProps.put("source.id", Long.toString(request.getSourceId())); var commonParam = request.getCommonParam(); boolean isMultiTableShared = commonParam.getIsMultiTableShared(); + + if (request.getSourceType() == POSTGRES) { + DbzSourceUtils.createPostgresPublicationIfNeeded( + request.getPropertiesMap(), request.getSourceId()); + } + var config = new DbzConnectorConfig( SourceTypeE.valueOf(request.getSourceType()), diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties index 3337aebf92c6c..22b7f0c4689c5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties @@ -12,3 +12,6 @@ interval.handling.mode=string max.batch.size=${debezium.max.batch.size:-1024} max.queue.size=${debezium.max.queue.size:-8192} time.precision.mode=adaptive_time_microseconds +# The maximum number of retries on connection errors before failing +# (-1 = no limit, 0 = disabled, > 0 = num of retries). +errors.max.retries=10 diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties index 4d71480dc3536..7f9785b7a34b1 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties @@ -12,10 +12,12 @@ table.include.list=${schema.name}.${table.name} slot.name=${slot.name} # default plugin name is 'pgoutput' plugin.name=${debezium.plugin.name:-pgoutput} -# allow to auto create publication for given tables -publication.autocreate.mode=${debezium.publication.autocreate.mode:-filtered} +# disable auto creation of debezium +publication.autocreate.mode=disabled publication.name=${publication.name:-rw_publication} -# default heartbeat interval 60 seconds -heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000} -name=${hostname}:${port}:${database.name}.${schema.name}.${table.name} +# default heartbeat interval 5 mins +heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} +# In sharing cdc source mode, we will subscribe to multiple tables in the given database, +# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. +name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} provide.transaction.metadata=${transactional:-false} diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java index ca63649d27360..87f861c06008c 100644 --- a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java +++ b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java @@ -39,7 +39,8 @@ public void converterFor( SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().name("rw.cdc.date.string"); + // field schema should be optional + schemaBuilder = SchemaBuilder.string().name("rw.cdc.date.string").optional(); converter = this::convertDate; } if (schemaBuilder != null) { diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index 105f0f0da1dc4..6eca78fc2e13a 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -14,6 +14,7 @@ //! Date, time, and timestamp types. +use std::error::Error; use std::fmt::Display; use std::hash::Hash; use std::io::Write; @@ -21,7 +22,7 @@ use std::str::FromStr; use bytes::{Bytes, BytesMut}; use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; -use postgres_types::{ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; use thiserror::Error; use super::to_binary::ToBinary; @@ -70,6 +71,57 @@ impl_chrono_wrapper!(Date, NaiveDate); impl_chrono_wrapper!(Timestamp, NaiveDateTime); impl_chrono_wrapper!(Time, NaiveTime); +impl ToSql for Date { + accepts!(DATE); + + to_sql_checked!(); + + fn to_sql( + &self, + ty: &Type, + out: &mut BytesMut, + ) -> std::result::Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} + +impl ToSql for Time { + accepts!(TIME); + + to_sql_checked!(); + + fn to_sql( + &self, + ty: &Type, + out: &mut BytesMut, + ) -> std::result::Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} + +impl ToSql for Timestamp { + accepts!(TIMESTAMP); + + to_sql_checked!(); + + fn to_sql( + &self, + ty: &Type, + out: &mut BytesMut, + ) -> std::result::Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} + /// Parse a date from varchar. /// /// # Example diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index 4e9ba74f2db63..affba896ba3ca 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -20,7 +20,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use num_traits::{ CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero, }; -use postgres_types::{ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; use rust_decimal::prelude::FromStr; use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy}; @@ -85,33 +85,7 @@ impl ToBinary for Decimal { match ty { DataType::Decimal => { let mut output = BytesMut::new(); - match self { - Decimal::Normalized(d) => { - d.to_sql(&Type::ANY, &mut output).unwrap(); - return Ok(Some(output.freeze())); - } - Decimal::NaN => { - output.reserve(8); - output.put_u16(0); - output.put_i16(0); - output.put_u16(0xC000); - output.put_i16(0); - } - Decimal::PositiveInf => { - output.reserve(8); - output.put_u16(0); - output.put_i16(0); - output.put_u16(0xD000); - output.put_i16(0); - } - Decimal::NegativeInf => { - output.reserve(8); - output.put_u16(0); - output.put_i16(0); - output.put_u16(0xF000); - output.put_i16(0); - } - }; + self.to_sql(&Type::NUMERIC, &mut output).unwrap(); Ok(Some(output.freeze())) } _ => unreachable!(), @@ -119,6 +93,49 @@ impl ToBinary for Decimal { } } +impl ToSql for Decimal { + accepts!(NUMERIC); + + to_sql_checked!(); + + fn to_sql( + &self, + ty: &Type, + out: &mut BytesMut, + ) -> Result> + where + Self: Sized, + { + match self { + Decimal::Normalized(d) => { + return d.to_sql(ty, out); + } + Decimal::NaN => { + out.reserve(8); + out.put_u16(0); + out.put_i16(0); + out.put_u16(0xC000); + out.put_i16(0); + } + Decimal::PositiveInf => { + out.reserve(8); + out.put_u16(0); + out.put_i16(0); + out.put_u16(0xD000); + out.put_i16(0); + } + Decimal::NegativeInf => { + out.reserve(8); + out.put_u16(0); + out.put_i16(0); + out.put_u16(0xF000); + out.put_i16(0); + } + } + Ok(IsNull::No) + } +} + macro_rules! impl_convert_int { ($T:ty) => { impl core::convert::From<$T> for Decimal { diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index d8bcae757d530..757d7d218653a 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -66,6 +66,7 @@ mod struct_type; mod successor; mod timestamptz; mod to_binary; +mod to_sql; mod to_text; mod with_data_type; diff --git a/src/common/src/types/ordered_float.rs b/src/common/src/types/ordered_float.rs index f22fe395bea7a..4ce18a88b7577 100644 --- a/src/common/src/types/ordered_float.rs +++ b/src/common/src/types/ordered_float.rs @@ -49,12 +49,16 @@ use core::ops::{ Add, AddAssign, Div, DivAssign, Mul, MulAssign, Neg, Rem, RemAssign, Sub, SubAssign, }; use core::str::FromStr; +use std::error::Error; +use std::fmt::Debug; +use bytes::BytesMut; pub use num_traits::Float; use num_traits::{ Bounded, CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Pow, Zero, }; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; // masks for the parts of the IEEE 754 float const SIGN_MASK: u64 = 0x8000000000000000u64; @@ -104,6 +108,32 @@ impl OrderedFloat { } } +impl ToSql for OrderedFloat { + accepts!(FLOAT4); + + to_sql_checked!(); + + fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} + +impl ToSql for OrderedFloat { + accepts!(FLOAT8); + + to_sql_checked!(); + + fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} + impl AsRef for OrderedFloat { #[inline] fn as_ref(&self) -> &T { diff --git a/src/common/src/types/serial.rs b/src/common/src/types/serial.rs index ad807a90abed9..bec576261862b 100644 --- a/src/common/src/types/serial.rs +++ b/src/common/src/types/serial.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::error::Error; use std::hash::Hash; -use postgres_types::{ToSql as _, Type}; +use bytes::BytesMut; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; use serde::{Serialize, Serializer}; use crate::estimate_size::ZeroHeapSize; @@ -77,3 +79,16 @@ impl crate::types::to_binary::ToBinary for Serial { Ok(Some(output.freeze())) } } + +impl ToSql for Serial { + accepts!(INT8); + + to_sql_checked!(); + + fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } +} diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 2db126458b400..3913c6b3e6820 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::error::Error; use std::io::Write; use std::str::FromStr; use bytes::{Bytes, BytesMut}; use chrono::{TimeZone, Utc}; use chrono_tz::Tz; -use postgres_types::ToSql; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; use serde::{Deserialize, Serialize}; use super::to_binary::ToBinary; @@ -36,6 +37,20 @@ pub struct Timestamptz(i64); impl ZeroHeapSize for Timestamptz {} +impl ToSql for Timestamptz { + accepts!(TIMESTAMPTZ); + + to_sql_checked!(); + + fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + let instant = self.to_datetime_utc(); + instant.to_sql(&Type::ANY, out) + } +} + impl ToBinary for Timestamptz { fn to_binary_with_type(&self, _ty: &DataType) -> super::to_binary::Result> { let instant = self.to_datetime_utc(); diff --git a/src/common/src/types/to_sql.rs b/src/common/src/types/to_sql.rs new file mode 100644 index 0000000000000..434efd39e614f --- /dev/null +++ b/src/common/src/types/to_sql.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::error::Error; + +use bytes::BytesMut; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; + +use crate::types::{JsonbRef, ScalarRefImpl}; + +impl ToSql for ScalarRefImpl<'_> { + to_sql_checked!(); + + fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + match self { + ScalarRefImpl::Int16(v) => v.to_sql(ty, out), + ScalarRefImpl::Int32(v) => v.to_sql(ty, out), + ScalarRefImpl::Int64(v) => v.to_sql(ty, out), + ScalarRefImpl::Serial(v) => v.to_sql(ty, out), + ScalarRefImpl::Float32(v) => v.to_sql(ty, out), + ScalarRefImpl::Float64(v) => v.to_sql(ty, out), + ScalarRefImpl::Utf8(v) => v.to_sql(ty, out), + ScalarRefImpl::Bool(v) => v.to_sql(ty, out), + ScalarRefImpl::Decimal(v) => v.to_sql(ty, out), + ScalarRefImpl::Interval(v) => v.to_sql(ty, out), + ScalarRefImpl::Date(v) => v.to_sql(ty, out), + ScalarRefImpl::Timestamp(v) => v.to_sql(ty, out), + ScalarRefImpl::Timestamptz(v) => v.to_sql(ty, out), + ScalarRefImpl::Time(v) => v.to_sql(ty, out), + ScalarRefImpl::Bytea(v) => v.to_sql(ty, out), + ScalarRefImpl::Jsonb(_) // jsonbb::Value doesn't implement ToSql yet + | ScalarRefImpl::Int256(_) + | ScalarRefImpl::Struct(_) + | ScalarRefImpl::List(_) => { + bail_not_implemented!("the postgres encoding for {ty} is unsupported") + } + } + } + + // return true to accept all types + fn accepts(_ty: &Type) -> bool + where + Self: Sized, + { + true + } +} + +impl ToSql for JsonbRef<'_> { + accepts!(JSONB); + + to_sql_checked!(); + + fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result> + where + Self: Sized, + { + let buf = self.value_serialize(); + out.extend(buf); + Ok(IsNull::No) + } +} diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index c733f78bf0b59..63d8aa9e44038 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -30,11 +30,12 @@ use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_connector::source::cdc::{CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit}; -use risingwave_connector::source::external::{ +use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader; +use risingwave_connector::source::cdc::external::{ DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, MySqlOffset, SchemaTableName, }; -use risingwave_connector::source::{MockExternalTableReader, SplitImpl}; +use risingwave_connector::source::cdc::{CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit}; +use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 459b85978e23a..70eb4a0435ab3 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -128,6 +128,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } +tokio-postgres = "0.7" tokio-retry = "0.3" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 2e1c826aac98d..b9f1093ef0ca9 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -43,6 +43,9 @@ pub enum ConnectorError { #[error("MySQL error: {0}")] MySql(#[from] mysql_async::Error), + #[error("Postgres error: {0}")] + Postgres(#[from] tokio_postgres::Error), + #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 448c98ec571ae..8f714bc84413a 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -41,8 +41,9 @@ use tracing_futures::Instrument; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; -pub use self::mysql::mysql_row_to_datums; +pub use self::mysql::mysql_row_to_owned_row; use self::plain_parser::PlainParser; +pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; use self::unified::{AccessImpl, AccessResult}; use self::upsert_parser::UpsertParser; @@ -69,6 +70,7 @@ mod json_parser; mod maxwell; mod mysql; pub mod plain_parser; +mod postgres; mod protobuf; mod unified; mod upsert_parser; diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 3e6842873db01..a7632499756f8 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -53,7 +53,7 @@ macro_rules! handle_data_type { }}; } -pub fn mysql_row_to_datums(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow { +pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow { let mut datums = vec![]; for i in 0..schema.fields.len() { let rw_field = &schema.fields[i]; @@ -165,7 +165,7 @@ mod tests { use risingwave_common::types::{DataType, ToText}; use tokio_stream::StreamExt; - use crate::parser::mysql_row_to_datums; + use crate::parser::mysql_row_to_owned_row; // manual test case #[ignore] @@ -189,7 +189,7 @@ mod tests { let row_stream = s.map(|row| { // convert mysql row into OwnedRow let mut mysql_row = row.unwrap(); - Ok::<_, anyhow::Error>(Some(mysql_row_to_datums(&mut mysql_row, &t1schema))) + Ok::<_, anyhow::Error>(Some(mysql_row_to_owned_row(&mut mysql_row, &t1schema))) }); pin_mut!(row_stream); while let Some(row) = row_stream.next().await { diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs new file mode 100644 index 0000000000000..e158885a4cc58 --- /dev/null +++ b/src/connector/src/parser/postgres.rs @@ -0,0 +1,291 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; + +use chrono::{NaiveDate, Utc}; +use risingwave_common::catalog::Schema; +use risingwave_common::log::LogSuppresser; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{ + DataType, Date, Decimal, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp, + Timestamptz, +}; +use rust_decimal::Decimal as RustDecimal; + +static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); + +macro_rules! handle_list_data_type { + ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr) => { + let res = $row.try_get::<_, Option>>($i); + match res { + Ok(val) => { + if let Some(v) = val { + v.into_iter() + .for_each(|val| $builder.append(Some(ScalarImpl::from(val)))) + } + } + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + $name, + err, + sc + ); + } + } + } + }; + ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr, $rw_type:ty) => { + let res = $row.try_get::<_, Option>>($i); + match res { + Ok(val) => { + if let Some(v) = val { + v.into_iter().for_each(|val| { + $builder.append(Some(ScalarImpl::from(<$rw_type>::from(val)))) + }) + } + } + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + $name, + err, + sc + ); + } + } + } + }; +} + +macro_rules! handle_data_type { + ($row:expr, $i:expr, $name:expr, $type:ty) => {{ + let res = $row.try_get::<_, Option<$type>>($i); + match res { + Ok(val) => val.map(|v| ScalarImpl::from(v)), + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + $name, + err, + sc + ); + } + None + } + } + }}; + ($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{ + let res = $row.try_get::<_, Option<$type>>($i); + match res { + Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + $name, + err, + sc + ); + } + None + } + } + }}; +} + +pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow { + let mut datums = vec![]; + for i in 0..schema.fields.len() { + let rw_field = &schema.fields[i]; + let name = rw_field.name.as_str(); + let datum = { + match &rw_field.data_type { + DataType::Boolean => { + handle_data_type!(row, i, name, bool) + } + DataType::Int16 => { + handle_data_type!(row, i, name, i16) + } + DataType::Int32 => { + handle_data_type!(row, i, name, i32) + } + DataType::Int64 => { + handle_data_type!(row, i, name, i64) + } + DataType::Float32 => { + handle_data_type!(row, i, name, f32) + } + DataType::Float64 => { + handle_data_type!(row, i, name, f64) + } + DataType::Decimal => { + handle_data_type!(row, i, name, RustDecimal, Decimal) + } + DataType::Varchar => { + handle_data_type!(row, i, name, String) + } + DataType::Date => { + handle_data_type!(row, i, name, NaiveDate, Date) + } + DataType::Time => { + handle_data_type!(row, i, name, chrono::NaiveTime, Time) + } + DataType::Timestamp => { + handle_data_type!(row, i, name, chrono::NaiveDateTime, Timestamp) + } + DataType::Timestamptz => { + handle_data_type!(row, i, name, chrono::DateTime, Timestamptz) + } + DataType::Bytea => { + let res = row.try_get::<_, Option>>(i); + match res { + Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + name, + err, + sc + ); + } + None + } + } + } + DataType::Jsonb => { + handle_data_type!(row, i, name, serde_json::Value, JsonbVal) + } + DataType::Interval => { + handle_data_type!(row, i, name, Interval) + } + DataType::List(dtype) => { + let mut builder = dtype.create_array_builder(0); + match **dtype { + DataType::Boolean => { + handle_list_data_type!(row, i, name, bool, builder); + } + DataType::Int16 => { + handle_list_data_type!(row, i, name, i16, builder); + } + DataType::Int32 => { + handle_list_data_type!(row, i, name, i32, builder); + } + DataType::Int64 => { + handle_list_data_type!(row, i, name, i64, builder); + } + DataType::Float32 => { + handle_list_data_type!(row, i, name, f32, builder); + } + DataType::Float64 => { + handle_list_data_type!(row, i, name, f64, builder); + } + DataType::Decimal => { + handle_list_data_type!(row, i, name, RustDecimal, builder, Decimal); + } + DataType::Date => { + handle_list_data_type!(row, i, name, NaiveDate, builder, Date); + } + DataType::Varchar => { + handle_list_data_type!(row, i, name, String, builder); + } + DataType::Time => { + handle_list_data_type!(row, i, name, chrono::NaiveTime, builder, Time); + } + DataType::Timestamp => { + handle_list_data_type!( + row, + i, + name, + chrono::NaiveDateTime, + builder, + Timestamp + ); + } + DataType::Timestamptz => { + handle_list_data_type!( + row, + i, + name, + chrono::DateTime, + builder, + Timestamptz + ); + } + DataType::Interval => { + handle_list_data_type!(row, i, name, Interval, builder); + } + DataType::Jsonb => { + handle_list_data_type!( + row, + i, + name, + serde_json::Value, + builder, + JsonbVal + ); + } + DataType::Bytea => { + let res = row.try_get::<_, Option>>>(i); + match res { + Ok(val) => { + if let Some(v) = val { + v.into_iter().for_each(|val| { + builder.append(Some(ScalarImpl::from( + val.into_boxed_slice(), + ))) + }) + } + } + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse column \"{}\" fail: {} ({} suppressed)", + name, + err, + sc + ); + } + } + } + } + DataType::Struct(_) + | DataType::List(_) + | DataType::Serial + | DataType::Int256 => { + tracing::warn!( + "unsupported List data type {:?}, set the List to empty", + **dtype + ); + } + }; + + Some(ScalarImpl::from(ListValue::new(builder.finish()))) + } + DataType::Struct(_) | DataType::Int256 | DataType::Serial => { + // Interval, Struct, List, Int256 are not supported + tracing::warn!(rw_field.name, ?rw_field.data_type, "unsupported data type, set to null"); + None + } + } + }; + datums.push(datum); + } + OwnedRow::new(datums) +} diff --git a/src/connector/src/source/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs similarity index 94% rename from src/connector/src/source/mock_external_table.rs rename to src/connector/src/source/cdc/external/mock_external_table.rs index 24c43acfff769..91007ced55494 100644 --- a/src/connector/src/source/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -20,7 +20,7 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl; use crate::error::ConnectorError; -use crate::source::external::{ +use crate::source::cdc::external::{ CdcOffset, ConnectorResult, ExternalTableReader, MySqlOffset, SchemaTableName, }; @@ -102,9 +102,11 @@ impl ExternalTableReader for MockExternalTableReader { } } - fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult { + fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { // same as mysql offset - Ok(CdcOffset::MySql(MySqlOffset::parse_str(offset)?)) + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) } fn snapshot_read( diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/cdc/external/mod.rs similarity index 87% rename from src/connector/src/source/external.rs rename to src/connector/src/source/cdc/external/mod.rs index 76c8bb5b3081e..776218cfd6e21 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod mock_external_table; +mod postgres; + use std::collections::HashMap; use anyhow::anyhow; @@ -30,8 +33,9 @@ use risingwave_common::util::iter_util::ZipEqFast; use serde_derive::{Deserialize, Serialize}; use crate::error::ConnectorError; -use crate::parser::mysql_row_to_datums; -use crate::source::MockExternalTableReader; +use crate::parser::mysql_row_to_owned_row; +use crate::source::cdc::external::mock_external_table::MockExternalTableReader; +use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; pub type ConnectorResult = std::result::Result; @@ -58,7 +62,7 @@ impl CdcTableType { } pub fn can_backfill(&self) -> bool { - matches!(self, Self::MySql) + matches!(self, Self::MySql | Self::Postgres) } pub async fn create_table_reader( @@ -70,6 +74,9 @@ impl CdcTableType { Self::MySql => Ok(ExternalTableReaderImpl::MySql( MySqlExternalTableReader::new(with_properties, schema).await?, )), + Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( + PostgresExternalTableReader::new(with_properties, schema).await?, + )), _ => bail!(ConnectorError::Config(anyhow!( "invalid external table type: {:?}", *self @@ -133,13 +140,6 @@ impl MySqlOffset { } } -#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)] -pub struct PostgresOffset { - pub txid: u64, - pub lsn: u64, - pub tx_usec: u64, -} - #[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)] pub enum CdcOffset { MySql(MySqlOffset), @@ -182,15 +182,15 @@ pub struct DebeziumSourceOffset { pub file: Option, pub pos: Option, - // postgres binlog offset + // postgres offset pub lsn: Option, #[serde(rename = "txId")] - pub txid: Option, + pub txid: Option, pub tx_usec: Option, } impl MySqlOffset { - pub fn parse_str(offset: &str) -> ConnectorResult { + pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) })?; @@ -213,7 +213,7 @@ pub trait ExternalTableReader { async fn current_cdc_offset(&self) -> ConnectorResult; - fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult; + fn parse_cdc_offset(&self, dbz_offset: &str) -> ConnectorResult; fn snapshot_read( &self, @@ -226,6 +226,7 @@ pub trait ExternalTableReader { #[derive(Debug)] pub enum ExternalTableReaderImpl { MySql(MySqlExternalTableReader), + Postgres(PostgresExternalTableReader), Mock(MockExternalTableReader), } @@ -275,8 +276,10 @@ impl ExternalTableReader for MySqlExternalTableReader { })) } - fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult { - Ok(CdcOffset::MySql(MySqlOffset::parse_str(offset)?)) + fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { + Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset( + offset, + )?)) } fn snapshot_read( @@ -366,7 +369,7 @@ impl MySqlExternalTableReader { let row_stream = rs_stream.map(|row| { // convert mysql row into OwnedRow let mut row = row?; - Ok::<_, ConnectorError>(mysql_row_to_datums(&mut row, &self.rw_schema)) + Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) }); pin_mut!(row_stream); @@ -427,7 +430,7 @@ impl MySqlExternalTableReader { let row_stream = rs_stream.map(|row| { // convert mysql row into OwnedRow let mut row = row?; - Ok::<_, ConnectorError>(mysql_row_to_datums(&mut row, &self.rw_schema)) + Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema)) }); pin_mut!(row_stream); @@ -484,6 +487,9 @@ impl ExternalTableReader for ExternalTableReaderImpl { fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String { match self { ExternalTableReaderImpl::MySql(mysql) => mysql.get_normalized_table_name(table_name), + ExternalTableReaderImpl::Postgres(postgres) => { + postgres.get_normalized_table_name(table_name) + } ExternalTableReaderImpl::Mock(mock) => mock.get_normalized_table_name(table_name), } } @@ -491,14 +497,16 @@ impl ExternalTableReader for ExternalTableReaderImpl { async fn current_cdc_offset(&self) -> ConnectorResult { match self { ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await, + ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await, ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await, } } - fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult { + fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { match self { - ExternalTableReaderImpl::MySql(mysql) => mysql.parse_binlog_offset(offset), - ExternalTableReaderImpl::Mock(mock) => mock.parse_binlog_offset(offset), + ExternalTableReaderImpl::MySql(mysql) => mysql.parse_cdc_offset(offset), + ExternalTableReaderImpl::Postgres(postgres) => postgres.parse_cdc_offset(offset), + ExternalTableReaderImpl::Mock(mock) => mock.parse_cdc_offset(offset), } } @@ -524,6 +532,9 @@ impl ExternalTableReaderImpl { ExternalTableReaderImpl::MySql(mysql) => { mysql.snapshot_read(table_name, start_pk, primary_keys) } + ExternalTableReaderImpl::Postgres(postgres) => { + postgres.snapshot_read(table_name, start_pk, primary_keys) + } ExternalTableReaderImpl::Mock(mock) => { mock.snapshot_read(table_name, start_pk, primary_keys) } @@ -544,12 +555,10 @@ mod tests { use futures::pin_mut; use futures_async_stream::for_await; use maplit::{convert_args, hashmap}; - use risingwave_common::catalog::{ColumnDesc, ColumnId}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; use risingwave_common::types::DataType; - use crate::sink::catalog::SinkType; - use crate::sink::SinkParam; - use crate::source::external::{ + use crate::source::cdc::external::{ CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName, }; @@ -575,11 +584,11 @@ mod tests { let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; - let off0 = CdcOffset::MySql(MySqlOffset::parse_str(off0_str).unwrap()); - let off1 = CdcOffset::MySql(MySqlOffset::parse_str(off1_str).unwrap()); - let off2 = CdcOffset::MySql(MySqlOffset::parse_str(off2_str).unwrap()); - let off3 = CdcOffset::MySql(MySqlOffset::parse_str(off3_str).unwrap()); - let off4 = CdcOffset::MySql(MySqlOffset::parse_str(off4_str).unwrap()); + let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap()); + let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap()); + let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap()); + let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap()); + let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap()); assert!(off0 <= off1); assert!(off1 > off2); @@ -591,24 +600,15 @@ mod tests { #[ignore] #[tokio::test] async fn test_mysql_table_reader() { - let param = SinkParam { - sink_id: Default::default(), - properties: Default::default(), - columns: vec![ - ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), - ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal), - ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar), - ColumnDesc::named("v4", ColumnId::new(4), DataType::Date), - ], - downstream_pk: vec![0], - sink_type: SinkType::AppendOnly, - format_desc: None, - db_name: "db".into(), - sink_from_name: "table".into(), - target_table: None, + let columns = vec![ + ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), + ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal), + ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar), + ColumnDesc::named("v4", ColumnId::new(4), DataType::Date), + ]; + let rw_schema = Schema { + fields: columns.iter().map(Field::from).collect(), }; - - let rw_schema = param.schema(); let props = convert_args!(hashmap!( "hostname" => "localhost", "port" => "8306", diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs new file mode 100644 index 0000000000000..6e1f7e6c2abf8 --- /dev/null +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -0,0 +1,319 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::HashMap; + +use anyhow::anyhow; +use futures::stream::BoxStream; +use futures::{pin_mut, StreamExt}; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::catalog::Schema; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::DatumRef; +use serde_derive::{Deserialize, Serialize}; +use tokio_postgres::types::PgLsn; +use tokio_postgres::NoTls; + +use crate::error::ConnectorError; +use crate::parser::postgres_row_to_owned_row; +use crate::source::cdc::external::{ + CdcOffset, ConnectorResult, DebeziumOffset, ExternalTableConfig, ExternalTableReader, + SchemaTableName, +}; + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct PostgresOffset { + pub txid: i64, + // In postgres, an LSN is a 64-bit integer, representing a byte position in the write-ahead log stream. + // It is printed as two hexadecimal numbers of up to 8 digits each, separated by a slash; for example, 16/B374D848 + pub lsn: u64, +} + +// only compare the lsn field +impl PartialOrd for PostgresOffset { + fn partial_cmp(&self, other: &Self) -> Option { + self.lsn.partial_cmp(&other.lsn) + } +} + +impl PostgresOffset { + pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { + let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { + ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) + })?; + + Ok(Self { + txid: dbz_offset + .source_offset + .txid + .ok_or_else(|| anyhow!("invalid postgres txid"))?, + lsn: dbz_offset + .source_offset + .lsn + .ok_or_else(|| anyhow!("invalid postgres lsn"))?, + }) + } +} + +#[derive(Debug)] +pub struct PostgresExternalTableReader { + config: ExternalTableConfig, + rw_schema: Schema, + field_names: String, + + client: tokio::sync::Mutex, +} + +impl ExternalTableReader for PostgresExternalTableReader { + fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String { + format!( + "\"{}\".\"{}\"", + table_name.schema_name, table_name.table_name + ) + } + + async fn current_cdc_offset(&self) -> ConnectorResult { + let mut client = self.client.lock().await; + // start a transaction to read current lsn and txid + let trxn = client.transaction().await?; + let row = trxn.query_one("SELECT pg_current_wal_lsn()", &[]).await?; + let mut pg_offset = PostgresOffset::default(); + let pg_lsn = row.get::<_, PgLsn>(0); + tracing::debug!("current lsn: {}", pg_lsn); + pg_offset.lsn = pg_lsn.into(); + + let txid_row = trxn.query_one("SELECT txid_current()", &[]).await?; + let txid: i64 = txid_row.get::<_, i64>(0); + pg_offset.txid = txid; + + // commit the transaction + trxn.commit().await?; + + Ok(CdcOffset::Postgres(pg_offset)) + } + + fn parse_cdc_offset(&self, offset: &str) -> ConnectorResult { + Ok(CdcOffset::Postgres(PostgresOffset::parse_debezium_offset( + offset, + )?)) + } + + fn snapshot_read( + &self, + table_name: SchemaTableName, + start_pk: Option, + primary_keys: Vec, + ) -> BoxStream<'_, ConnectorResult> { + self.snapshot_read_inner(table_name, start_pk, primary_keys) + } +} + +impl PostgresExternalTableReader { + pub async fn new( + properties: HashMap, + rw_schema: Schema, + ) -> ConnectorResult { + tracing::debug!(?rw_schema, "create postgres external table reader"); + + let config = serde_json::from_value::( + serde_json::to_value(properties).unwrap(), + ) + .map_err(|e| { + ConnectorError::Config(anyhow!( + "fail to extract postgres connector properties: {}", + e + )) + })?; + + let database_url = format!( + "postgresql://{}:{}@{}:{}/{}", + config.username, config.password, config.host, config.port, config.database + ); + + let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!("connection error: {}", e); + } + }); + + let field_names = rw_schema + .fields + .iter() + .map(|f| Self::quote_column(&f.name)) + .join(","); + + Ok(Self { + config, + rw_schema, + field_names, + client: tokio::sync::Mutex::new(client), + }) + } + + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] + async fn snapshot_read_inner( + &self, + table_name: SchemaTableName, + start_pk_row: Option, + primary_keys: Vec, + ) { + let order_key = primary_keys.iter().join(","); + let sql = if start_pk_row.is_none() { + format!( + "SELECT {} FROM {} ORDER BY {}", + self.field_names, + self.get_normalized_table_name(&table_name), + order_key + ) + } else { + let filter_expr = Self::filter_expression(&primary_keys); + format!( + "SELECT {} FROM {} WHERE {} ORDER BY {}", + self.field_names, + self.get_normalized_table_name(&table_name), + filter_expr, + order_key + ) + }; + + let client = self.client.lock().await; + client.execute("set time zone '+00:00'", &[]).await?; + + let params: Vec> = match start_pk_row { + Some(ref pk_row) => pk_row.iter().collect_vec(), + None => Vec::new(), + }; + + let stream = client.query_raw(&sql, ¶ms).await?; + let row_stream = stream.map(|row| { + let row = row?; + Ok::<_, anyhow::Error>(postgres_row_to_owned_row(row, &self.rw_schema)) + }); + + pin_mut!(row_stream); + #[for_await] + for row in row_stream { + let row = row?; + yield row; + } + } + + // row filter expression: (v1, v2, v3) > ($1, $2, $3) + fn filter_expression(columns: &[String]) -> String { + let mut col_expr = String::new(); + let mut arg_expr = String::new(); + for (i, column) in columns.iter().enumerate() { + if i > 0 { + col_expr.push_str(", "); + arg_expr.push_str(", "); + } + col_expr.push_str(&Self::quote_column(column)); + arg_expr.push_str(format!("${}", i + 1).as_str()); + } + format!("({}) > ({})", col_expr, arg_expr) + } + + fn quote_column(column: &str) -> String { + format!("\"{}\"", column) + } +} + +#[cfg(test)] +mod tests { + use futures::pin_mut; + use futures_async_stream::for_await; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + + use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; + use crate::source::cdc::external::{ExternalTableReader, SchemaTableName}; + + #[test] + fn test_postgres_offset() { + let off1 = PostgresOffset { txid: 4, lsn: 2 }; + let off2 = PostgresOffset { txid: 1, lsn: 3 }; + let off3 = PostgresOffset { txid: 5, lsn: 1 }; + + assert!(off1 < off2); + assert!(off3 < off1); + assert!(off2 > off3); + } + + #[test] + fn test_filter_expression() { + let cols = vec!["v1".to_string()]; + let expr = PostgresExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(\"v1\") > ($1)"); + + let cols = vec!["v1".to_string(), "v2".to_string()]; + let expr = PostgresExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(\"v1\", \"v2\") > ($1, $2)"); + + let cols = vec!["v1".to_string(), "v2".to_string(), "v3".to_string()]; + let expr = PostgresExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(\"v1\", \"v2\", \"v3\") > ($1, $2, $3)"); + } + + // manual test + #[ignore] + #[tokio::test] + async fn test_pg_table_reader() { + let columns = vec![ + ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), + ColumnDesc::named("v2", ColumnId::new(2), DataType::Varchar), + ColumnDesc::named("v3", ColumnId::new(3), DataType::Decimal), + ColumnDesc::named("v4", ColumnId::new(4), DataType::Date), + ]; + let rw_schema = Schema { + fields: columns.iter().map(Field::from).collect(), + }; + + let props = convert_args!(hashmap!( + "hostname" => "localhost", + "port" => "8432", + "username" => "myuser", + "password" => "123456", + "database.name" => "mydb", + "schema.name" => "public", + "table.name" => "t1")); + let reader = PostgresExternalTableReader::new(props, rw_schema) + .await + .unwrap(); + + let offset = reader.current_cdc_offset().await.unwrap(); + println!("CdcOffset: {:?}", offset); + + let start_pk = OwnedRow::new(vec![Some(ScalarImpl::from(3)), Some(ScalarImpl::from("c"))]); + let stream = reader.snapshot_read( + SchemaTableName { + schema_name: "public".to_string(), + table_name: "t1".to_string(), + }, + Some(start_pk), + vec!["v1".to_string(), "v2".to_string()], + ); + + pin_mut!(stream); + #[for_await] + for row in stream { + println!("OwnedRow: {:?}", row); + } + } +} diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 485cb7529db81..136b0e7b4cb1b 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod enumerator; +pub mod external; pub mod source; pub mod split; use std::collections::HashMap; diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index d2bfebc27cf3d..f679ce6e72ca6 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -18,8 +18,8 @@ use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use crate::source::cdc::external::DebeziumOffset; use crate::source::cdc::CdcSourceTypeTrait; -use crate::source::external::DebeziumOffset; use crate::source::{SplitId, SplitMetaData}; /// The base states of a CDC split, which will be persisted to checkpoint. diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 9273754797ea6..20a4efc68ba7e 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -31,13 +31,10 @@ pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; pub use nats::NATS_CONNECTOR; mod common; -pub mod external; mod manager; -mod mock_external_table; pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; -pub use mock_external_table::MockExternalTableReader; pub use crate::parser::additional_columns::{ get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 75826e5a444e7..21f442af8f677 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -34,12 +34,12 @@ use risingwave_connector::parser::{ use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; +use risingwave_connector::source::cdc::external::CdcTableType; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; -use risingwave_connector::source::external::CdcTableType; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ @@ -1092,17 +1092,16 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Csv, Encode::Json], ), MYSQL_CDC_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Bytes], Format::Debezium => vec![Encode::Json], // support source stream job Format::Plain => vec![Encode::Json], ), POSTGRES_CDC_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Bytes], Format::Debezium => vec![Encode::Json], + // support source stream job + Format::Plain => vec![Encode::Json], ), CITUS_CDC_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Bytes], Format::Debezium => vec![Encode::Json], ), NATS_CONNECTOR => hashmap!( diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index f7843310e482d..ee93fc565958d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -30,7 +30,9 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source; -use risingwave_connector::source::external::{DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY}; +use risingwave_connector::source::cdc::external::{ + DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, +}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -867,17 +869,17 @@ fn derive_connect_properties( let prefix = format!("{}.", db_name.as_str()); external_table_name .strip_prefix(prefix.as_str()) - .ok_or_else(|| anyhow!("external table name must contain database prefix"))? + .ok_or_else(|| anyhow!("The upstream table name must contain database name prefix, e.g. 'mydb.table'."))? } POSTGRES_CDC_CONNECTOR => { - let schema_name = connect_properties - .get(SCHEMA_NAME_KEY) - .ok_or_else(|| anyhow!("{} not found in source properties", SCHEMA_NAME_KEY))?; + let (schema_name, table_name) = external_table_name + .split_once('.') + .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?; - let prefix = format!("{}.", schema_name.as_str()); - external_table_name - .strip_prefix(prefix.as_str()) - .ok_or_else(|| anyhow!("external table name must contain schema prefix"))? + // insert 'schema.name' into connect properties + connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + + table_name } _ => { return Err(RwError::from(anyhow!( diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index ed33c2102a48d..6309aed97e9ef 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -29,7 +29,7 @@ use risingwave_connector::parser::{ DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; -use risingwave_connector::source::external::CdcOffset; +use risingwave_connector::source::cdc::external::CdcOffset; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; use risingwave_storage::StateStore; @@ -119,12 +119,6 @@ impl CdcBackfillExecutor { // `None` means it starts from the beginning. let mut current_pk_pos: Option; - tracing::info!( - upstream_table_id, - upstream_table_name, - ?pk_in_output_indices - ); - // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; @@ -164,6 +158,7 @@ impl CdcBackfillExecutor { tracing::info!( upstream_table_id, upstream_table_name, + initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, is_finished = state.is_finished, snapshot_row_count = total_snapshot_row_count, diff --git a/src/stream/src/executor/backfill/cdc/state.rs b/src/stream/src/executor/backfill/cdc/state.rs index ef46b02b222e2..53b80a51aad37 100644 --- a/src/stream/src/executor/backfill/cdc/state.rs +++ b/src/stream/src/executor/backfill/cdc/state.rs @@ -17,7 +17,7 @@ use risingwave_common::row; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{Datum, JsonbVal, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; -use risingwave_connector::source::external::CdcOffset; +use risingwave_connector::source::cdc::external::CdcOffset; use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs index f4d28225bc9a5..cc4da422ff9d1 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::{Schema, TableId}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::external::{ExternalTableReaderImpl, SchemaTableName}; +use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName}; /// This struct represents an external table to be read during backfill pub struct ExternalStorageTable { diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index cc8883aeea6b1..27e4283e9a96b 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_connector::source::external::{CdcOffset, ExternalTableReader}; +use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader}; use super::external::ExternalStorageTable; use crate::executor::backfill::utils::iter_chunks; diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index c01e1cd87b72e..618604af81b56 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -34,7 +34,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::external::{ +use risingwave_connector::source::cdc::external::{ CdcOffset, ExternalTableReader, ExternalTableReaderImpl, }; use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; @@ -346,7 +346,7 @@ fn mark_cdc_chunk_inner( let offset_col_idx = data.dimension() - 1; for v in data.rows().map(|row| { let offset_datum = row.datum_at(offset_col_idx).unwrap(); - let event_offset = table_reader.parse_binlog_offset(offset_datum.into_utf8())?; + let event_offset = table_reader.parse_cdc_offset(offset_datum.into_utf8())?; let visible = { // filter changelog events with binlog range let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset { @@ -537,9 +537,8 @@ pub(crate) fn get_cdc_chunk_last_offset( ) -> StreamExecutorResult> { let row = chunk.rows().last().unwrap().1; let offset_col = row.iter().last().unwrap(); - let output = offset_col.map(|scalar| { - Ok::<_, ConnectorError>(table_reader.parse_binlog_offset(scalar.into_utf8()))? - }); + let output = offset_col + .map(|scalar| Ok::<_, ConnectorError>(table_reader.parse_cdc_offset(scalar.into_utf8()))?); output.transpose().map_err(|e| e.into()) } diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index d988e0a75704e..47879e5b73193 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use risingwave_common::catalog::{Schema, TableId}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::external::{CdcTableType, SchemaTableName}; +use risingwave_connector::source::cdc::external::{CdcTableType, SchemaTableName}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::stream_plan::StreamCdcScanNode;