Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): map upstream table schema automatically for cdc table #16986

Merged
merged 42 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e697aae
WIP: cdc meta columns
StrikeW May 16, 2024
2c67c65
cdc timestamp column
StrikeW May 19, 2024
430d479
Merge branch 'siyuan/cdc-metadata-columns' of github.com:risingwavela…
StrikeW May 19, 2024
e301335
support include timestamp
StrikeW May 20, 2024
1a4f871
clean code
StrikeW May 20, 2024
05520e8
minor
StrikeW May 20, 2024
35a35d0
fix
StrikeW May 20, 2024
f3e2b15
e2e test for pg and mysql
StrikeW May 21, 2024
e27684e
include timestamp for mongo
StrikeW May 21, 2024
b75c56b
fix comment
StrikeW May 22, 2024
ccf2f8c
fix
StrikeW May 23, 2024
41c3a49
fix ci
StrikeW May 23, 2024
d98c1f7
fix validate
StrikeW May 23, 2024
d4c15d3
fix e2e
StrikeW May 23, 2024
bc7d9ba
fix e2e
StrikeW May 23, 2024
0089cc0
fix e2e
StrikeW May 24, 2024
4cb7cb6
decouple cdc backfill table addi cols
StrikeW May 24, 2024
360a5ff
clean code
StrikeW May 27, 2024
d7ce5d0
fix
StrikeW May 27, 2024
57edad0
WIP: pg external table
StrikeW May 28, 2024
0de0f15
refactor
StrikeW May 28, 2024
b9d0513
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW May 28, 2024
173c91f
PG: auto map schema
StrikeW May 29, 2024
31a8485
WIP: mysql auto map schema
StrikeW May 29, 2024
a025420
clean code
StrikeW May 29, 2024
3117fac
fix mysql named param
StrikeW May 30, 2024
ede9377
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW Jun 3, 2024
240302c
add e2e test
StrikeW Jun 3, 2024
6b45cc3
mysql e2e
StrikeW Jun 4, 2024
2a5ae54
fix
StrikeW Jun 4, 2024
a73b3d9
enhance e2e
StrikeW Jun 4, 2024
24cef4c
* and columns cannot used together
StrikeW Jun 4, 2024
0d5b582
minor
StrikeW Jun 4, 2024
c430283
fix mysql e2e
StrikeW Jun 4, 2024
b131e30
clean code
StrikeW Jun 4, 2024
64678d0
minor
StrikeW Jun 4, 2024
c455b70
fix
StrikeW Jun 4, 2024
27fc504
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW Jun 6, 2024
0e09023
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW Jun 6, 2024
b75c779
minor
StrikeW Jun 6, 2024
a9525a4
minor
StrikeW Jun 10, 2024
38ff10e
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW Jun 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ sea-orm = { version = "0.12.14", features = [
"sqlx-sqlite",
"runtime-tokio-native-tls",
] }
sqlx = "0.7"
tokio-util = "0.7"
tracing-opentelemetry = "0.22"
rand = { version = "0.8", features = ["small_rng"] }
Expand Down
118 changes: 118 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to organize it under the new folder e2e_test/source_inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor in another pr.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

# test case need to cover all data types
system ok
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"
StrikeW marked this conversation as resolved.
Show resolved Hide resolved

system ok
mysql --protocol=tcp -u root mytest -e "
DROP TABLE IF EXISTS mysql_types_test;
CREATE TABLE IF NOT EXISTS mysql_types_test(
c_boolean boolean,
c_bit bit,
c_tinyint tinyint,
c_smallint smallint,
c_mediumint mediumint,
c_integer integer,
c_Bigint bigint,
c_decimal decimal,
c_float float,
c_double double,
c_char_255 char(255),
c_varchar_10000 varchar(10000),
c_binary_255 binary(255),
c_varbinary_10000 varbinary(10000),
c_date date,
c_time time,
c_datetime datetime,
c_timestamp timestamp,
c_enum ENUM('happy','sad','ok'),
c_json JSON,
PRIMARY KEY (c_boolean,c_Bigint,c_date)
);
INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]');
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'root',
password = '${MYSQL_PWD:}',
Comment on lines +41 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}.

database.name = 'mytest',
server.id = '5601'
);


statement ok
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test';

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_mysql_types_test;
----
c_boolean smallint false NULL
c_bit boolean false NULL
c_tinyint smallint false NULL
c_smallint smallint false NULL
c_mediumint integer false NULL
c_integer integer false NULL
c_Bigint bigint false NULL
c_decimal numeric false NULL
c_float real false NULL
c_double double precision false NULL
c_char_255 character varying false NULL
c_varchar_10000 character varying false NULL
c_binary_255 bytea false NULL
c_varbinary_10000 bytea false NULL
c_date date false NULL
c_time time without time zone false NULL
c_datetime timestamp without time zone false NULL
c_timestamp timestamp with time zone false NULL
c_enum character varying false NULL
c_json jsonb false NULL
primary key c_boolean, c_Bigint, c_date NULL NULL
distribution key c_boolean, c_Bigint, c_date NULL NULL
table description rw_mysql_types_test NULL NULL

query TTTTTTTTTTTTT
SELECT
c_boolean,
c_bit,
c_tinyint,
c_smallint,
c_mediumint,
c_integer,
"c_Bigint",
c_decimal,
c_float,
c_double,
c_char_255,
c_varchar_10000,
c_binary_255
FROM rw_mysql_types_test order by c_boolean;
----
0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

query TTTTTTTT
SELECT
c_varbinary_10000,
c_date,
c_time,
c_datetime,
c_timestamp,
c_enum,
c_json
FROM rw_mysql_types_test order by c_boolean;
----
\x 1001-01-01 NULL 2000-01-01 00:00:00 NULL happy [1, 2]
\x 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 sad [3, 4]

statement ok
drop source mysql_source cascade;
168 changes: 168 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
control substitution on

# test case need to cover all data types
system ok
psql -c "
DROP TABLE IF EXISTS postgres_types_test;
CREATE TABLE IF NOT EXISTS postgres_types_test(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_uuid uuid,
c_enum mood,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid_array uuid[],
c_enum_array mood[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_types_test VALUES ( False, 0, 0, 0, 0, 0, 0, '', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]);
INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}');
INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
"

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'pg_slot'
);


statement ok
create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test';

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_postgres_types_test;
----
c_boolean boolean false NULL
c_smallint smallint false NULL
c_integer integer false NULL
c_bigint bigint false NULL
c_decimal numeric false NULL
c_real real false NULL
c_double_precision double precision false NULL
c_varchar character varying false NULL
c_bytea bytea false NULL
c_date date false NULL
c_time time without time zone false NULL
c_timestamp timestamp without time zone false NULL
c_timestamptz timestamp with time zone false NULL
c_interval interval false NULL
c_jsonb jsonb false NULL
c_uuid character varying false NULL
c_enum character varying false NULL
c_boolean_array boolean[] false NULL
c_smallint_array smallint[] false NULL
c_integer_array integer[] false NULL
c_bigint_array bigint[] false NULL
c_decimal_array numeric[] false NULL
c_real_array real[] false NULL
c_double_precision_array double precision[] false NULL
c_varchar_array character varying[] false NULL
c_bytea_array bytea[] false NULL
c_date_array date[] false NULL
c_time_array time without time zone[] false NULL
c_timestamp_array timestamp without time zone[] false NULL
c_timestamptz_array timestamp with time zone[] false NULL
c_interval_array interval[] false NULL
c_jsonb_array jsonb[] false NULL
c_uuid_array character varying[] false NULL
c_enum_array character varying[] false NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_postgres_types_test NULL NULL

query TTTTTTT
SELECT
c_boolean,
c_smallint,
c_integer,
c_bigint,
c_decimal,
c_real,
c_double_precision,
c_varchar,
c_bytea from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 d \x3030
f 1 123 1234567890 123.45 123.45 123.456 a_varchar \x4445414442454546

query TTTTT
SELECT
c_date,
c_time,
c_timestamp,
c_timestamptz,
c_interval from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00
0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day

query TTTTTTT
SELECT
c_jsonb,
c_uuid,
c_enum,
c_boolean_array,
c_smallint_array,
c_integer_array,
c_bigint_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
{} bb488f9b-330d-4012-b849-12adeb49e57e happy {f} {-32767} {-2147483647} {-9223372036854775807}
"hello" 123e4567-e89b-12d3-a456-426614174000 happy {NULL,t} {NULL,1} {NULL,123} {NULL,1234567890}

query TTTTTTTTTTTTT
SELECT
c_decimal_array,
c_real_array,
c_double_precision_array,
c_varchar_array,
c_bytea_array,
c_date_array,
c_time_array,
c_timestamp_array,
c_timestamptz_array,
c_interval_array,
c_jsonb_array,
c_uuid_array,
c_enum_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer;
----
{-10.0} {-10000} {-10000} {""} {"\\x3030"} {0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL {"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} {happy,ok,sad}
{NULL,123.45} {NULL,123.45} {NULL,123.456} {NULL,a_varchar} {NULL,"\\x4445414442454546"} {NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL {NULL,"\"hello\""} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL

statement ok
drop source pg_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ private void validateTableSchema() throws SQLException {
var field = res.getString(1);
var dataType = res.getString(2);
var key = res.getString(3);
schema.put(field.toLowerCase(), dataType);
st1page marked this conversation as resolved.
Show resolved Hide resolved
schema.put(field, dataType);
if (key.equalsIgnoreCase("PRI")) {
// RisingWave always use lower case for column name
pkFields.add(field.toLowerCase());
pkFields.add(field);
}
}

Expand All @@ -208,7 +207,7 @@ private void validateTableSchema() throws SQLException {
if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) {
continue;
}
var dataType = schema.get(e.getKey().toLowerCase());
var dataType = schema.get(e.getKey());
if (dataType == null) {
throw ValidatorUtils.invalidArgument(
"Column '" + e.getKey() + "' not found in the upstream database");
Expand Down
3 changes: 2 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::mysql::MySqlOffset;
use risingwave_connector::source::cdc::external::{
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, MySqlOffset, SchemaTableName,
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName,
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.13.3"
sqlx = { workspace = true }
strum = "0.26"
strum_macros = "0.26"
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def_anyhow_newtype! {
// Connector errors
opendal::Error => transparent, // believed to be self-explanatory

sqlx::Error => transparent, // believed to be self-explanatory
mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
apache_avro::Error => "Avro error",
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ macro_rules! impl_cdc_source_type {
}
)*

#[derive(Clone)]
pub enum CdcSourceType {
$(
$cdc_source_type,
Expand Down
Loading
Loading