Skip to content

Commit

Permalink
test: add upsert test for doris/starrocks-sink integration test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored and Little-Wallace committed Jan 20, 2024
1 parent 1387bd1 commit 467e3db
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 100 deletions.
14 changes: 4 additions & 10 deletions integration_tests/doris-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,10 @@ GRANT ALL ON *.* TO 'users'@'%';

4. Execute the SQL queries in sequence:

- append-only sql:
- create_source.sql
- create_mv.sql
- create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql
- create_source.sql
- create_mv.sql
- create_sink.sql
- update_delete.sql

We only support `upsert` with doris' `UNIQUE KEY`

Expand Down
8 changes: 8 additions & 0 deletions integration_tests/doris-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ SELECT
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
user_behaviors;

CREATE MATERIALIZED VIEW upsert_bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
upsert_user_behaviors;
15 changes: 14 additions & 1 deletion integration_tests/doris-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,17 @@ FROM
doris.database = 'demo',
doris.table='demo_bhv_table',
force_append_only='true'
);
);

CREATE SINK upsert_doris_sink
FROM
upsert_bhv_mv WITH (
connector = 'doris',
type = 'upsert',
doris.url = 'http://fe:8030',
doris.user = 'users',
doris.password = '123456',
doris.database = 'demo',
doris.table='upsert_table',
primary_key = 'user_id'
);
17 changes: 17 additions & 0 deletions integration_tests/doris-sink/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,20 @@ CREATE table user_behaviors (
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;

CREATE table upsert_user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);

INSERT INTO upsert_user_behaviors VALUES
(1,'1','1','2020-01-01T01:01:01Z','1','1','1'),
(2,'2','2','2020-01-01T01:01:02Z','2','2','2'),
(3,'3','3','2020-01-01T01:01:03Z','3','3','3'),
(4,'4','4','2020-01-01T01:01:04Z','4','4','4');
9 changes: 9 additions & 0 deletions integration_tests/doris-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ services:
networks:
mynetwork:
ipv4_address: 172.21.0.9
postgres:
image: postgres:latest
command: tail -f /dev/null
volumes:
- "./update_delete.sql:/update_delete.sql"
restart: on-failure
networks:
mynetwork:
ipv4_address: 172.21.0.11
volumes:
risingwave-standalone:
external: false
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/doris-sink/doris_prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,15 @@ PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

CREATE table upsert_table(
user_id int,
target_id text,
event_timestamp_local datetime
) UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
26 changes: 25 additions & 1 deletion integration_tests/doris-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
import sys

relations = ['demo.demo_bhv_table']
relations = ['demo.demo_bhv_table', 'demo.upsert_table']

failed_cases = []
for rel in relations:
Expand All @@ -18,6 +18,30 @@
if rows < 1:
failed_cases.append(rel)

# update data
subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True)

# delete
sql = f"SELECT COUNT(*) FROM demo.upsert_table;"
command = f'mysql -uroot -P9030 -hfe -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "mysql", "bash", "-c", command])
rows = int(output.decode('utf-8').split('\n')[1])
print(f"{rows} rows in demo.upsert_table")
if rows != 3:
print(f"rows expected 3, get {rows}")
failed_cases.append("delete demo.upsert_table")

# update
sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;"
command = f'mysql -uroot -P9030 -hfe -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "mysql", "bash", "-c", command])
id = int(output.decode('utf-8').split('\n')[1])
if id != 30:
print(f"target_id expected 30, get {id}")
failed_cases.append("update demo.upsert_table")

if len(failed_cases) != 0:
print(f"Data check failed for case {failed_cases}")
sys.exit(1)
5 changes: 5 additions & 0 deletions integration_tests/doris-sink/update_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DELETE FROM upsert_user_behaviors WHERE user_id = 2;

UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3;

FLUSH;
7 changes: 0 additions & 7 deletions integration_tests/doris-sink/upsert/create_mv.sql

This file was deleted.

12 changes: 0 additions & 12 deletions integration_tests/doris-sink/upsert/create_sink.sql

This file was deleted.

10 changes: 0 additions & 10 deletions integration_tests/doris-sink/upsert/create_table.sql

This file was deleted.

8 changes: 0 additions & 8 deletions integration_tests/doris-sink/upsert/insert_update_delete.sql

This file was deleted.

14 changes: 4 additions & 10 deletions integration_tests/starrocks-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ GRANT ALL ON *.* TO 'users'@'%';

3. Execute the SQL queries in sequence:

- append-only sql:
- create_source.sql
- create_mv.sql
- create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql
- create_source.sql
- create_mv.sql
- create_sink.sql
- update_delete.sql

We only support `upsert` with starrocks' `PRIMARY KEY`

Expand Down
8 changes: 8 additions & 0 deletions integration_tests/starrocks-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ SELECT
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
user_behaviors;

CREATE MATERIALIZED VIEW upsert_bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
upsert_user_behaviors;
17 changes: 16 additions & 1 deletion integration_tests/starrocks-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,19 @@ FROM
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
force_append_only='true'
);
);

CREATE SINK upsert_starrocks_sink
FROM
upsert_bhv_mv WITH (
connector = 'starrocks',
type = 'upsert',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'upsert_table',
primary_key = 'user_id'
);
17 changes: 17 additions & 0 deletions integration_tests/starrocks-sink/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,20 @@ CREATE table user_behaviors (
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;

CREATE table upsert_user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);

INSERT INTO upsert_user_behaviors VALUES
(1,'1','1','2020-01-01T01:01:01Z','1','1','1'),
(2,'2','2','2020-01-01T01:01:02Z','2','2','2'),
(3,'3','3','2020-01-01T01:01:03Z','3','3','3'),
(4,'4','4','2020-01-01T01:01:04Z','4','4','4');
6 changes: 6 additions & 0 deletions integration_tests/starrocks-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ services:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
postgres:
image: postgres:latest
command: tail -f /dev/null
volumes:
- "./update_delete.sql:/update_delete.sql"
restart: on-failure
volumes:
risingwave-standalone:
external: false
Expand Down
26 changes: 25 additions & 1 deletion integration_tests/starrocks-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
import sys

relations = ['demo.demo_bhv_table']
relations = ['demo.demo_bhv_table', 'demo.upsert_table']

failed_cases = []
for rel in relations:
Expand All @@ -18,6 +18,30 @@
if rows < 1:
failed_cases.append(rel)

# update data
subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True)

# delete
sql = f"SELECT COUNT(*) FROM demo.upsert_table;"
command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command])
rows = int(output.decode('utf-8').split('\n')[1])
print(f"{rows} rows in demo.upsert_table")
if rows != 3:
print(f"rows expected 3, get {rows}")
failed_cases.append("delete demo.upsert_table")

# update
sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;"
command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command])
id = int(output.decode('utf-8').split('\n')[1])
if id != 30:
print(f"target_id expected 30, get {id}")
failed_cases.append("update demo.upsert_table")

if len(failed_cases) != 0:
print(f"Data check failed for case {failed_cases}")
sys.exit(1)
8 changes: 8 additions & 0 deletions integration_tests/starrocks-sink/starrocks_prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,13 @@ CREATE table demo_bhv_table(
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1");

CREATE table upsert_table(
user_id int,
target_id text,
event_timestamp_local datetime
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1");

CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
5 changes: 5 additions & 0 deletions integration_tests/starrocks-sink/update_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DELETE FROM upsert_user_behaviors WHERE user_id = 2;

UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3;

FLUSH;
7 changes: 0 additions & 7 deletions integration_tests/starrocks-sink/upsert/create_mv.sql

This file was deleted.

14 changes: 0 additions & 14 deletions integration_tests/starrocks-sink/upsert/create_sink.sql

This file was deleted.

10 changes: 0 additions & 10 deletions integration_tests/starrocks-sink/upsert/create_table.sql

This file was deleted.

This file was deleted.

0 comments on commit 467e3db

Please sign in to comment.