Skip to content

Commit

Permalink
feat(cdc): support postgres cdc backfill (risingwavelabs#13958)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 28, 2023
1 parent fffff6d commit aa9dcac
Show file tree
Hide file tree
Showing 44 changed files with 1,334 additions and 216 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt'

# kill cluster
cargo make kill
echo "cluster killed "
echo "> cluster killed "

echo "--- mysql & postgres recovery check"
# insert into mytest database (cdc.share_stream.slt)
mysql --protocol=tcp -u root mytest -e "INSERT INTO products
VALUES (default,'RisingWave','Next generation Streaming Database'),
Expand All @@ -84,16 +85,19 @@ mysql --protocol=tcp -u root mytest -e "INSERT INTO products

# insert new rows
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql
echo "> inserted new rows into mysql"

psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql
echo "inserted new rows into mysql and postgres"
echo "> inserted new rows into postgres"

# start cluster w/o clean-data
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
unset RISINGWAVE_CI
export RUST_LOG="events::stream::message::chunk=trace,risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \

cargo make dev ci-1cn-1fe-with-recovery
echo "wait for cluster recovery finish"
echo "> wait for cluster recovery finish"
sleep 20
echo "check mviews after cluster recovery"
echo "> check mviews after cluster recovery"
# check results
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check_new_rows.slt'

Expand All @@ -104,6 +108,7 @@ echo "--- Kill cluster"
cargo make ci-kill

echo "--- e2e, ci-1cn-1fe, protobuf schema registry"
export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe
python3 -m pip install requests protobuf confluent-kafka
Expand Down
5 changes: 0 additions & 5 deletions e2e_test/source/cdc/cdc.check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ select count(*) from t1_rw;
----
1

query I
select count(*) from person_rw;
----
3

query I
select count(*) from tt3_rw;
----
Expand Down
15 changes: 12 additions & 3 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ select cnt from shipments_cnt;
4

query ITTTT
select * from person_rw order by id;
select * from person_new order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles
1003 张三 [email protected] 5536 1959 5460 2096 北京
1004 李四 [email protected] 0052 8113 1582 4430 上海
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles
1203 张三 [email protected] 5536 1959 5460 2096 北京
1204 李四 [email protected] 0052 8113 1582 4430 上海

# 2 rows inserted after cluster is stopped
query I
SELECT * from person_new_cnt
----
8

query ITTTTT
select order_id,order_date,customer_name,product_id,order_status from orders order by order_id;
Expand Down Expand Up @@ -65,3 +73,4 @@ SELECT * FROM products_test order by id limit 3
101 RW Small 2-wheel scooter
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3

63 changes: 21 additions & 42 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# CDC source basic test
control substitution on

statement ok
create table products ( id INT,
Expand Down Expand Up @@ -52,11 +53,11 @@ create table shipments (
PRIMARY KEY (shipment_id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name = 'shipments',
slot.name = 'shipments'
);
Expand Down Expand Up @@ -148,11 +149,11 @@ create table shipments_2 (
PRIMARY KEY (shipment_id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name = 'shipments'
);

Expand All @@ -164,53 +165,31 @@ create table t1_rw (
v3 varchar
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password='postgres',
database.name='cdc_test',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name='t1',
schema.name='abs',
slot.name='t1_slot',
publication.name='my_publicaton',
publication.create.enable='false'
);

statement ok
create table person_rw (
id int primary key,
name varchar,
email_address varchar,
credit_card varchar,
city varchar
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password='postgres',
database.name='cdc_test',
table.name='person',
publication.name='my_publicaton',
publication.create.enable='false'
);

statement error
create table person_rw (
id int primary key,
name varchar,
email_address varchar
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username='postgres',
password='postgres',
database.name='cdc_test',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name='person',
publication.name='dumb_publicaton',
publication.create.enable='false'
);

statement ok
create materialized view person_cnt as select count(*) as cnt from person_rw;
111 changes: 111 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ create source mysql_mytest with (
server.id = '5601'
);

statement error The upstream table name must contain database name prefix*
create table products_test ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) from mysql_mytest table 'products';

statement ok
create table products_test ( id INT,
name STRING,
Expand Down Expand Up @@ -127,3 +134,107 @@ SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_flo
----
-128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00
NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'pg_slot'
);

# test postgres backfill data types
statement ok
CREATE TABLE IF NOT EXISTS postgres_all_types(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
) from pg_source table 'public.postgres_all_types';

statement error The upstream table name must contain schema name prefix*
CREATE TABLE person_new (
id int,
name varchar,
email_address varchar,
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'person';

statement ok
CREATE TABLE person_new (
id int,
name varchar,
email_address varchar,
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.person';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

sleep 3s

query TTTTTTT
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807
----
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"}


# postgres streaming test
system ok
psql -c "
INSERT INTO person VALUES (1100, 'noris', '[email protected]', '1864 2539', 'enne');
INSERT INTO person VALUES (1101, 'white', '[email protected]', '8157 6974', 'se');
INSERT INTO person VALUES (1102, 'spencer', '[email protected]', '9481 6270', 'angeles');
"

sleep 3s

# 3 history, 3 new rows
query I
SELECT * from person_new_cnt
----
6

query ITTTT
SELECT * from person_new order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles
39 changes: 38 additions & 1 deletion e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,41 @@ INSERT INTO person VALUES (1002, 'sarah spencer', '[email protected]', '3453 498
create schema abs;
create table abs.t1 (v1 int primary key, v2 double precision, v3 varchar, v4 numeric);
create publication my_publicaton for table abs.t1 (v1, v3);
insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234');
insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234');


CREATE TABLE IF NOT EXISTS postgres_all_types(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]);
10 changes: 8 additions & 2 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
SELECT pg_current_wal_lsn();

INSERT INTO shipments
VALUES (default,10004,'Beijing','Shanghai',false);

INSERT INTO person VALUES (1003, '张三', '[email protected]', '5536 1959 5460 2096', '北京');
INSERT INTO person VALUES (1004, '李四', '[email protected]', '0052 8113 1582 4430', '上海');
INSERT INTO person VALUES (1203, '张三', '[email protected]', '5536 1959 5460 2096', '北京');
INSERT INTO person VALUES (1204, '李四', '[email protected]', '0052 8113 1582 4430', '上海');

insert into abs.t1 values (2, 2.2, 'bbb', '1234.5431');

SELECT pg_current_wal_lsn();
select * from pg_publication_tables where pubname='rw_publication';
select * from public.person order by id;
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE);
try (var validator = new PostgresValidator(props, tableSchema)) {
try (var validator =
new PostgresValidator(props, tableSchema, isMultiTableShared)) {
validator.validateAll(isMultiTableShared);
}
break;
Expand Down
Loading

0 comments on commit aa9dcac

Please sign in to comment.