-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): map upstream table schema automatically for cdc table #16986
Merged
Merged
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
e697aae
WIP: cdc meta columns
StrikeW 2c67c65
cdc timestamp column
StrikeW 430d479
Merge branch 'siyuan/cdc-metadata-columns' of github.com:risingwavela…
StrikeW e301335
support include timestamp
StrikeW 1a4f871
clean code
StrikeW 05520e8
minor
StrikeW 35a35d0
fix
StrikeW f3e2b15
e2e test for pg and mysql
StrikeW e27684e
include timestamp for mongo
StrikeW b75c56b
fix comment
StrikeW ccf2f8c
fix
StrikeW 41c3a49
fix ci
StrikeW d98c1f7
fix validate
StrikeW d4c15d3
fix e2e
StrikeW bc7d9ba
fix e2e
StrikeW 0089cc0
fix e2e
StrikeW 4cb7cb6
decouple cdc backfill table addi cols
StrikeW 360a5ff
clean code
StrikeW d7ce5d0
fix
StrikeW 57edad0
WIP: pg external table
StrikeW 0de0f15
refactor
StrikeW b9d0513
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW 173c91f
PG: auto map schema
StrikeW 31a8485
WIP: mysql auto map schema
StrikeW a025420
clean code
StrikeW 3117fac
fix mysql named param
StrikeW ede9377
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW 240302c
add e2e test
StrikeW 6b45cc3
mysql e2e
StrikeW 2a5ae54
fix
StrikeW a73b3d9
enhance e2e
StrikeW 24cef4c
* and columns cannot used together
StrikeW 0d5b582
minor
StrikeW c430283
fix mysql e2e
StrikeW b131e30
clean code
StrikeW 64678d0
minor
StrikeW c455b70
fix
StrikeW 27fc504
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW 0e09023
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW b75c779
minor
StrikeW a9525a4
minor
StrikeW 38ff10e
Merge remote-tracking branch 'origin/main' into siyuan/auto-map-schema
StrikeW File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
control substitution on | ||
|
||
# test case need to cover all data types | ||
system ok | ||
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;" | ||
StrikeW marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
system ok | ||
mysql --protocol=tcp -u root mytest -e " | ||
DROP TABLE IF EXISTS mysql_types_test; | ||
CREATE TABLE IF NOT EXISTS mysql_types_test( | ||
c_boolean boolean, | ||
c_bit bit, | ||
c_tinyint tinyint, | ||
c_smallint smallint, | ||
c_mediumint mediumint, | ||
c_integer integer, | ||
c_Bigint bigint, | ||
c_decimal decimal, | ||
c_float float, | ||
c_double double, | ||
c_char_255 char(255), | ||
c_varchar_10000 varchar(10000), | ||
c_binary_255 binary(255), | ||
c_varbinary_10000 varbinary(10000), | ||
c_date date, | ||
c_time time, | ||
c_datetime datetime, | ||
c_timestamp timestamp, | ||
c_enum ENUM('happy','sad','ok'), | ||
c_json JSON, | ||
PRIMARY KEY (c_boolean,c_Bigint,c_date) | ||
); | ||
INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]'); | ||
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]'); | ||
" | ||
|
||
|
||
statement ok | ||
create source mysql_source with ( | ||
connector = 'mysql-cdc', | ||
hostname = '${MYSQL_HOST:localhost}', | ||
port = '${MYSQL_TCP_PORT:8306}', | ||
username = 'root', | ||
password = '${MYSQL_PWD:}', | ||
Comment on lines
+41
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use |
||
database.name = 'mytest', | ||
server.id = '5601' | ||
); | ||
|
||
|
||
statement ok | ||
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test'; | ||
|
||
sleep 3s | ||
|
||
# Name, Type, Is Hidden, Description | ||
query TTTT | ||
describe rw_mysql_types_test; | ||
---- | ||
c_boolean smallint false NULL | ||
c_bit boolean false NULL | ||
c_tinyint smallint false NULL | ||
c_smallint smallint false NULL | ||
c_mediumint integer false NULL | ||
c_integer integer false NULL | ||
c_Bigint bigint false NULL | ||
c_decimal numeric false NULL | ||
c_float real false NULL | ||
c_double double precision false NULL | ||
c_char_255 character varying false NULL | ||
c_varchar_10000 character varying false NULL | ||
c_binary_255 bytea false NULL | ||
c_varbinary_10000 bytea false NULL | ||
c_date date false NULL | ||
c_time time without time zone false NULL | ||
c_datetime timestamp without time zone false NULL | ||
c_timestamp timestamp with time zone false NULL | ||
c_enum character varying false NULL | ||
c_json jsonb false NULL | ||
primary key c_boolean, c_Bigint, c_date NULL NULL | ||
distribution key c_boolean, c_Bigint, c_date NULL NULL | ||
table description rw_mysql_types_test NULL NULL | ||
|
||
query TTTTTTTTTTTTT | ||
SELECT | ||
c_boolean, | ||
c_bit, | ||
c_tinyint, | ||
c_smallint, | ||
c_mediumint, | ||
c_integer, | ||
"c_Bigint", | ||
c_decimal, | ||
c_float, | ||
c_double, | ||
c_char_255, | ||
c_varchar_10000, | ||
c_binary_255 | ||
FROM rw_mysql_types_test order by c_boolean; | ||
---- | ||
0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 | ||
1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 | ||
|
||
query TTTTTTTT | ||
SELECT | ||
c_varbinary_10000, | ||
c_date, | ||
c_time, | ||
c_datetime, | ||
c_timestamp, | ||
c_enum, | ||
c_json | ||
FROM rw_mysql_types_test order by c_boolean; | ||
---- | ||
\x 1001-01-01 NULL 2000-01-01 00:00:00 NULL happy [1, 2] | ||
\x 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 sad [3, 4] | ||
|
||
statement ok | ||
drop source mysql_source cascade; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
control substitution on | ||
|
||
# test case need to cover all data types | ||
system ok | ||
psql -c " | ||
DROP TABLE IF EXISTS postgres_types_test; | ||
CREATE TABLE IF NOT EXISTS postgres_types_test( | ||
c_boolean boolean, | ||
c_smallint smallint, | ||
c_integer integer, | ||
c_bigint bigint, | ||
c_decimal decimal, | ||
c_real real, | ||
c_double_precision double precision, | ||
c_varchar varchar, | ||
c_bytea bytea, | ||
c_date date, | ||
c_time time, | ||
c_timestamp timestamp, | ||
c_timestamptz timestamptz, | ||
c_interval interval, | ||
c_jsonb jsonb, | ||
c_uuid uuid, | ||
c_enum mood, | ||
c_boolean_array boolean[], | ||
c_smallint_array smallint[], | ||
c_integer_array integer[], | ||
c_bigint_array bigint[], | ||
c_decimal_array decimal[], | ||
c_real_array real[], | ||
c_double_precision_array double precision[], | ||
c_varchar_array varchar[], | ||
c_bytea_array bytea[], | ||
c_date_array date[], | ||
c_time_array time[], | ||
c_timestamp_array timestamp[], | ||
c_timestamptz_array timestamptz[], | ||
c_interval_array interval[], | ||
c_jsonb_array jsonb[], | ||
c_uuid_array uuid[], | ||
c_enum_array mood[], | ||
PRIMARY KEY (c_boolean,c_bigint,c_date) | ||
); | ||
INSERT INTO postgres_types_test VALUES ( False, 0, 0, 0, 0, 0, 0, '', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', null, 'sad', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], array[]::uuid[], array[]::mood[]); | ||
INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}'); | ||
INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]); | ||
INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); | ||
" | ||
|
||
statement ok | ||
create source pg_source with ( | ||
connector = 'postgres-cdc', | ||
hostname = '${PGHOST:localhost}', | ||
port = '${PGPORT:5432}', | ||
username = '${PGUSER:$USER}', | ||
password = '${PGPASSWORD:}', | ||
database.name = '${PGDATABASE:postgres}', | ||
slot.name = 'pg_slot' | ||
); | ||
|
||
|
||
statement ok | ||
create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test'; | ||
|
||
sleep 3s | ||
|
||
# Name, Type, Is Hidden, Description | ||
query TTTT | ||
describe rw_postgres_types_test; | ||
---- | ||
c_boolean boolean false NULL | ||
c_smallint smallint false NULL | ||
c_integer integer false NULL | ||
c_bigint bigint false NULL | ||
c_decimal numeric false NULL | ||
c_real real false NULL | ||
c_double_precision double precision false NULL | ||
c_varchar character varying false NULL | ||
c_bytea bytea false NULL | ||
c_date date false NULL | ||
c_time time without time zone false NULL | ||
c_timestamp timestamp without time zone false NULL | ||
c_timestamptz timestamp with time zone false NULL | ||
c_interval interval false NULL | ||
c_jsonb jsonb false NULL | ||
c_uuid character varying false NULL | ||
c_enum character varying false NULL | ||
c_boolean_array boolean[] false NULL | ||
c_smallint_array smallint[] false NULL | ||
c_integer_array integer[] false NULL | ||
c_bigint_array bigint[] false NULL | ||
c_decimal_array numeric[] false NULL | ||
c_real_array real[] false NULL | ||
c_double_precision_array double precision[] false NULL | ||
c_varchar_array character varying[] false NULL | ||
c_bytea_array bytea[] false NULL | ||
c_date_array date[] false NULL | ||
c_time_array time without time zone[] false NULL | ||
c_timestamp_array timestamp without time zone[] false NULL | ||
c_timestamptz_array timestamp with time zone[] false NULL | ||
c_interval_array interval[] false NULL | ||
c_jsonb_array jsonb[] false NULL | ||
c_uuid_array character varying[] false NULL | ||
c_enum_array character varying[] false NULL | ||
primary key c_boolean, c_bigint, c_date NULL NULL | ||
distribution key c_boolean, c_bigint, c_date NULL NULL | ||
table description rw_postgres_types_test NULL NULL | ||
|
||
query TTTTTTT | ||
SELECT | ||
c_boolean, | ||
c_smallint, | ||
c_integer, | ||
c_bigint, | ||
c_decimal, | ||
c_real, | ||
c_double_precision, | ||
c_varchar, | ||
c_bytea from rw_postgres_types_test where c_enum = 'happy' order by c_integer; | ||
---- | ||
f -32767 -2147483647 -9223372036854775807 -10.0 -10000 -10000 d \x3030 | ||
f 1 123 1234567890 123.45 123.45 123.456 a_varchar \x4445414442454546 | ||
|
||
query TTTTT | ||
SELECT | ||
c_date, | ||
c_time, | ||
c_timestamp, | ||
c_timestamptz, | ||
c_interval from rw_postgres_types_test where c_enum = 'happy' order by c_integer; | ||
---- | ||
0001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 08:00:00+00:00 00:00:00 | ||
0024-01-01 12:34:56 2024-05-19 12:34:56 2024-05-19 12:34:56+00:00 1 day | ||
|
||
query TTTTTTT | ||
SELECT | ||
c_jsonb, | ||
c_uuid, | ||
c_enum, | ||
c_boolean_array, | ||
c_smallint_array, | ||
c_integer_array, | ||
c_bigint_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer; | ||
---- | ||
{} bb488f9b-330d-4012-b849-12adeb49e57e happy {f} {-32767} {-2147483647} {-9223372036854775807} | ||
"hello" 123e4567-e89b-12d3-a456-426614174000 happy {NULL,t} {NULL,1} {NULL,123} {NULL,1234567890} | ||
|
||
query TTTTTTTTTTTTT | ||
SELECT | ||
c_decimal_array, | ||
c_real_array, | ||
c_double_precision_array, | ||
c_varchar_array, | ||
c_bytea_array, | ||
c_date_array, | ||
c_time_array, | ||
c_timestamp_array, | ||
c_timestamptz_array, | ||
c_interval_array, | ||
c_jsonb_array, | ||
c_uuid_array, | ||
c_enum_array from rw_postgres_types_test where c_enum = 'happy' order by c_integer; | ||
---- | ||
{-10.0} {-10000} {-10000} {""} {"\\x3030"} {0001-01-01} {00:00:00} {"2001-01-01 00:00:00"} {"2001-01-01 08:00:00+00:00"} NULL {"{}"} {bb488f9b-330d-4012-b849-12adeb49e57e} {happy,ok,sad} | ||
{NULL,123.45} {NULL,123.45} {NULL,123.456} {NULL,a_varchar} {NULL,"\\x4445414442454546"} {NULL,2024-05-19} {NULL,12:34:56} {NULL,"2024-05-19 12:34:56"} {NULL,"2024-05-19 12:34:56+00:00"} NULL {NULL,"\"hello\""} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL | ||
|
||
statement ok | ||
drop source pg_source cascade; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to organize it under the new folder
e2e_test/source_inline
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will refactor in another pr.