From 467e3db1234d80920aabd4e88afd26fe59b26dfe Mon Sep 17 00:00:00 2001 From: xfz <73645462+xuefengze@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:39:27 +0800 Subject: [PATCH] test: add `upsert` test for `doris/starrocks-sink` integration test (#14587) --- integration_tests/doris-sink/README.md | 14 +++------- integration_tests/doris-sink/create_mv.sql | 8 ++++++ integration_tests/doris-sink/create_sink.sql | 15 ++++++++++- .../doris-sink/create_source.sql | 17 ++++++++++++ .../doris-sink/docker-compose.yml | 9 +++++++ .../doris-sink/doris_prepare.sql | 10 +++++++ integration_tests/doris-sink/sink_check.py | 26 ++++++++++++++++++- .../doris-sink/update_delete.sql | 5 ++++ .../doris-sink/upsert/create_mv.sql | 7 ----- .../doris-sink/upsert/create_sink.sql | 12 --------- .../doris-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ integration_tests/starrocks-sink/README.md | 14 +++------- .../starrocks-sink/create_mv.sql | 8 ++++++ .../starrocks-sink/create_sink.sql | 17 +++++++++++- .../starrocks-sink/create_source.sql | 17 ++++++++++++ .../starrocks-sink/docker-compose.yml | 6 +++++ .../starrocks-sink/sink_check.py | 26 ++++++++++++++++++- .../starrocks-sink/starrocks_prepare.sql | 8 ++++++ .../starrocks-sink/update_delete.sql | 5 ++++ .../starrocks-sink/upsert/create_mv.sql | 7 ----- .../starrocks-sink/upsert/create_sink.sql | 14 ---------- .../starrocks-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ 24 files changed, 181 insertions(+), 100 deletions(-) create mode 100644 integration_tests/doris-sink/update_delete.sql delete mode 100644 integration_tests/doris-sink/upsert/create_mv.sql delete mode 100644 integration_tests/doris-sink/upsert/create_sink.sql delete mode 100644 integration_tests/doris-sink/upsert/create_table.sql delete mode 100644 integration_tests/doris-sink/upsert/insert_update_delete.sql create mode 100644 integration_tests/starrocks-sink/update_delete.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_mv.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_sink.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_table.sql delete mode 100644 integration_tests/starrocks-sink/upsert/insert_update_delete.sql diff --git a/integration_tests/doris-sink/README.md b/integration_tests/doris-sink/README.md index 75baa2d2449f1..b62c2d2e3adcf 100644 --- a/integration_tests/doris-sink/README.md +++ b/integration_tests/doris-sink/README.md @@ -45,16 +45,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 4. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with doris' `UNIQUE KEY` diff --git a/integration_tests/doris-sink/create_mv.sql b/integration_tests/doris-sink/create_mv.sql index c367e6f2baa94..6e466703b0769 100644 --- a/integration_tests/doris-sink/create_mv.sql +++ b/integration_tests/doris-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index fa0cfddf7bf16..7cd1ac24857e9 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -9,4 +9,17 @@ FROM doris.database = 'demo', doris.table='demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_doris_sink +FROM + upsert_bhv_mv WITH ( + connector = 'doris', + type = 'upsert', + doris.url = 'http://fe:8030', + doris.user = 'users', + doris.password = '123456', + doris.database = 'demo', + doris.table='upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/doris-sink/create_source.sql b/integration_tests/doris-sink/create_source.sql index ed7c02341638a..0e42308511121 100644 --- a/integration_tests/doris-sink/create_source.sql +++ b/integration_tests/doris-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index 74fecbee2baab..fc7cfd751e989 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -74,6 +74,15 @@ services: networks: mynetwork: ipv4_address: 172.21.0.9 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure + networks: + mynetwork: + ipv4_address: 172.21.0.11 volumes: risingwave-standalone: external: false diff --git a/integration_tests/doris-sink/doris_prepare.sql b/integration_tests/doris-sink/doris_prepare.sql index c95e8ac3f9b32..b65e419999caf 100644 --- a/integration_tests/doris-sink/doris_prepare.sql +++ b/integration_tests/doris-sink/doris_prepare.sql @@ -11,5 +11,15 @@ PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) UNIQUE KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 +PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" +); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/doris-sink/sink_check.py b/integration_tests/doris-sink/sink_check.py index 39109f4194fef..510cc867dcda4 100644 --- a/integration_tests/doris-sink/sink_check.py +++ b/integration_tests/doris-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/doris-sink/update_delete.sql b/integration_tests/doris-sink/update_delete.sql new file mode 100644 index 0000000000000..adabd5163ef44 --- /dev/null +++ b/integration_tests/doris-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/doris-sink/upsert/create_mv.sql b/integration_tests/doris-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa94..0000000000000 --- a/integration_tests/doris-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/doris-sink/upsert/create_sink.sql b/integration_tests/doris-sink/upsert/create_sink.sql deleted file mode 100644 index e7bd5445ba557..0000000000000 --- a/integration_tests/doris-sink/upsert/create_sink.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE SINK bhv_doris_sink -FROM - bhv_mv WITH ( - connector = 'doris', - type = 'upsert', - doris.url = 'http://fe:8030', - doris.user = 'users', - doris.password = '123456', - doris.database = 'demo', - doris.table='demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_table.sql b/integration_tests/doris-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c8..0000000000000 --- a/integration_tests/doris-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/doris-sink/upsert/insert_update_delete.sql b/integration_tests/doris-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c161154..0000000000000 --- a/integration_tests/doris-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md index 817ab57481e43..30cb79623d1e8 100644 --- a/integration_tests/starrocks-sink/README.md +++ b/integration_tests/starrocks-sink/README.md @@ -37,16 +37,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 3. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with starrocks' `PRIMARY KEY` diff --git a/integration_tests/starrocks-sink/create_mv.sql b/integration_tests/starrocks-sink/create_mv.sql index c367e6f2baa94..6e466703b0769 100644 --- a/integration_tests/starrocks-sink/create_mv.sql +++ b/integration_tests/starrocks-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/starrocks-sink/create_sink.sql b/integration_tests/starrocks-sink/create_sink.sql index 56d1b227512de..f2f5b5eac9653 100644 --- a/integration_tests/starrocks-sink/create_sink.sql +++ b/integration_tests/starrocks-sink/create_sink.sql @@ -11,4 +11,19 @@ FROM starrocks.database = 'demo', starrocks.table = 'demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_starrocks_sink +FROM + upsert_bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/starrocks-sink/create_source.sql b/integration_tests/starrocks-sink/create_source.sql index ed7c02341638a..0e42308511121 100644 --- a/integration_tests/starrocks-sink/create_source.sql +++ b/integration_tests/starrocks-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml index 41dabac20dc7f..4210206aa7705 100644 --- a/integration_tests/starrocks-sink/docker-compose.yml +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -52,6 +52,12 @@ services: extends: file: ../../docker/docker-compose.yml service: prometheus-0 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure volumes: risingwave-standalone: external: false diff --git a/integration_tests/starrocks-sink/sink_check.py b/integration_tests/starrocks-sink/sink_check.py index 699304854dc1f..7ab27e1e01cd1 100644 --- a/integration_tests/starrocks-sink/sink_check.py +++ b/integration_tests/starrocks-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/starrocks-sink/starrocks_prepare.sql b/integration_tests/starrocks-sink/starrocks_prepare.sql index aadaf85289b3c..6b304534061fe 100644 --- a/integration_tests/starrocks-sink/starrocks_prepare.sql +++ b/integration_tests/starrocks-sink/starrocks_prepare.sql @@ -9,5 +9,13 @@ CREATE table demo_bhv_table( PRIMARY KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/starrocks-sink/update_delete.sql b/integration_tests/starrocks-sink/update_delete.sql new file mode 100644 index 0000000000000..adabd5163ef44 --- /dev/null +++ b/integration_tests/starrocks-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa94..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql deleted file mode 100644 index d7557bc1bd4fc..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_sink.sql +++ /dev/null @@ -1,14 +0,0 @@ -CREATE SINK bhv_starrocks_sink -FROM - bhv_mv WITH ( - connector = 'starrocks', - type = 'upsert', - starrocks.host = 'starrocks-fe', - starrocks.mysqlport = '9030', - starrocks.httpport = '8030', - starrocks.user = 'users', - starrocks.password = '123456', - starrocks.database = 'demo', - starrocks.table = 'demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c8..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c161154..0000000000000 --- a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3;