Skip to content

Commit

Permalink
Merge branch 'main' into rc/sentinel-cmp-by
Browse files Browse the repository at this point in the history
  • Loading branch information
stdrc authored Jan 16, 2024
2 parents 61647ed + 83e829e commit 895edb7
Show file tree
Hide file tree
Showing 65 changed files with 531 additions and 270 deletions.
35 changes: 27 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"src/tests/sqlsmith",
"src/tests/state_cleaning_test",
"src/utils/delta_btree_map",
"src/utils/futures_util",
"src/utils/local_stats_alloc",
"src/utils/pgwire",
"src/utils/runtime",
Expand Down Expand Up @@ -120,7 +121,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down Expand Up @@ -187,6 +188,7 @@ risingwave_udf = { path = "./src/expr/udf" }
risingwave_variables = { path = "./src/utils/variables" }
risingwave_java_binding = { path = "./src/java_binding" }
risingwave_jni_core = { path = "src/jni_core" }
rw_futures_util = { path = "src/utils/futures_util" }
tokio-util = "0.7"

[workspace.lints.rust]
Expand Down
22 changes: 4 additions & 18 deletions e2e_test/source/basic/pubsub.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
statement error
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5981',
pubsub.split_count = 3
pubsub.emulator_host = 'invalid_host:5981'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
Expand All @@ -25,25 +23,14 @@ statement error
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-not-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
) FORMAT PLAIN ENCODE JSON;

# fail with invalid split count
statement error
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 0
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

# fail if both start_offset and start_snapshot are provided
Expand All @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 2,
pubsub.start_offset = "121212",
pubsub.start_snapshot = "snapshot-that-doesnt-exist"
) FORMAT PLAIN ENCODE JSON;
Expand Down
14 changes: 4 additions & 10 deletions integration_tests/doris-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,10 @@ GRANT ALL ON *.* TO 'users'@'%';

4. Execute the SQL queries in sequence:

- append-only sql:
- create_source.sql
- create_mv.sql
- create_sink.sql

- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql
- create_source.sql
- create_mv.sql
- create_sink.sql
- update_delete.sql

We only support `upsert` with doris' `UNIQUE KEY`

Expand Down
8 changes: 8 additions & 0 deletions integration_tests/doris-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ SELECT
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
user_behaviors;

CREATE MATERIALIZED VIEW upsert_bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local
FROM
upsert_user_behaviors;
15 changes: 14 additions & 1 deletion integration_tests/doris-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,17 @@ FROM
doris.database = 'demo',
doris.table='demo_bhv_table',
force_append_only='true'
);
);

CREATE SINK upsert_doris_sink
FROM
upsert_bhv_mv WITH (
connector = 'doris',
type = 'upsert',
doris.url = 'http://fe:8030',
doris.user = 'users',
doris.password = '123456',
doris.database = 'demo',
doris.table='upsert_table',
primary_key = 'user_id'
);
17 changes: 17 additions & 0 deletions integration_tests/doris-sink/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,20 @@ CREATE table user_behaviors (
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;

CREATE table upsert_user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);

INSERT INTO upsert_user_behaviors VALUES
(1,'1','1','2020-01-01T01:01:01Z','1','1','1'),
(2,'2','2','2020-01-01T01:01:02Z','2','2','2'),
(3,'3','3','2020-01-01T01:01:03Z','3','3','3'),
(4,'4','4','2020-01-01T01:01:04Z','4','4','4');
9 changes: 9 additions & 0 deletions integration_tests/doris-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ services:
networks:
mynetwork:
ipv4_address: 172.21.0.9
postgres:
image: postgres:latest
command: tail -f /dev/null
volumes:
- "./update_delete.sql:/update_delete.sql"
restart: on-failure
networks:
mynetwork:
ipv4_address: 172.21.0.11
volumes:
risingwave-standalone:
external: false
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/doris-sink/doris_prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,15 @@ PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

CREATE table upsert_table(
user_id int,
target_id text,
event_timestamp_local datetime
) UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
26 changes: 25 additions & 1 deletion integration_tests/doris-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
import sys

relations = ['demo.demo_bhv_table']
relations = ['demo.demo_bhv_table', 'demo.upsert_table']

failed_cases = []
for rel in relations:
Expand All @@ -18,6 +18,30 @@
if rows < 1:
failed_cases.append(rel)

# update data
subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True)

# delete
sql = f"SELECT COUNT(*) FROM demo.upsert_table;"
command = f'mysql -uroot -P9030 -hfe -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "mysql", "bash", "-c", command])
rows = int(output.decode('utf-8').split('\n')[1])
print(f"{rows} rows in demo.upsert_table")
if rows != 3:
print(f"rows expected 3, get {rows}")
failed_cases.append("delete demo.upsert_table")

# update
sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;"
command = f'mysql -uroot -P9030 -hfe -e "{sql}"'
output = subprocess.check_output(
["docker", "compose", "exec", "mysql", "bash", "-c", command])
id = int(output.decode('utf-8').split('\n')[1])
if id != 30:
print(f"target_id expected 30, get {id}")
failed_cases.append("update demo.upsert_table")

if len(failed_cases) != 0:
print(f"Data check failed for case {failed_cases}")
sys.exit(1)
5 changes: 5 additions & 0 deletions integration_tests/doris-sink/update_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DELETE FROM upsert_user_behaviors WHERE user_id = 2;

UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3;

FLUSH;
Loading

0 comments on commit 895edb7

Please sign in to comment.