Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into yiming/recreate-shared-context
Browse files Browse the repository at this point in the history
wenym1 committed Feb 26, 2024
2 parents 058e75d + 1dd2c3d commit e195c92
Showing 269 changed files with 3,836 additions and 3,793 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 18 additions & 10 deletions backwards-compat-tests/scripts/utils.sh
Original file line number Diff line number Diff line change
@@ -103,19 +103,21 @@ insert_json_kafka() {
local JSON=$1
echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \
--topic backwards_compat_test_kafka_source \
--bootstrap-server localhost:29092
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=,"
}

seed_json_kafka() {
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
insert_json_kafka '{"user_id": 1},{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}'
insert_json_kafka '{"user_id": 2},{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}'
insert_json_kafka '{"user_id": 3},{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}'
insert_json_kafka '{"user_id": 4},{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}'
insert_json_kafka '{"user_id": 5},{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}'
insert_json_kafka '{"user_id": 6},{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}'
insert_json_kafka '{"user_id": 7},{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}'
insert_json_kafka '{"user_id": 8},{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}'
insert_json_kafka '{"user_id": 9},{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}'
}

# https://stackoverflow.com/a/4024263
@@ -225,6 +227,12 @@ seed_old_cluster() {
create_kafka_topic
seed_json_kafka
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt"
# use the old syntax for version at most 1.5.4
if version_le "$OLD_VERSION" "1.5.4" ; then
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/deprecate_upsert.slt"
else
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/upsert/include_key_as.slt"
fi

echo "--- KAFKA TEST: wait 5s for kafka to process data"
sleep 5
16 changes: 16 additions & 0 deletions backwards-compat-tests/slt/kafka/upsert/deprecate_upsert.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
statement ok
CREATE TABLE IF NOT EXISTS kafka_table
(
action varchar,
user_id integer,
obj_id integer,
name varchar,
page_id integer,
age integer
)
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
18 changes: 18 additions & 0 deletions backwards-compat-tests/slt/kafka/upsert/include_key_as.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
statement ok
CREATE TABLE IF NOT EXISTS kafka_table
(
action varchar,
user_id integer,
obj_id integer,
name varchar,
page_id integer,
age integer,
primary key (_rw_key)
)
INCLUDE key as _rw_key
WITH (
connector='kafka',
topic='backwards_compat_test_kafka_source',
properties.bootstrap.server='localhost:29092',
scan.startup.mode='earliest',
) FORMAT UPSERT ENCODE JSON;
13 changes: 13 additions & 0 deletions backwards-compat-tests/slt/kafka/validate_restart.slt
Original file line number Diff line number Diff line change
@@ -50,3 +50,16 @@ werwerwwe 8 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL
yjtyjtyyy 9 NULL NULL 4 NULL

# kafka_table should do the upsert and overwrite the existing records
query I rowsort
SELECT action, user_id, obj_id, name, page_id, age, _rw_key FROM kafka_table;
----
6786745ge 6 NULL NULL 3 NULL \x7b22757365725f6964223a20367d
erwerhghj 4 NULL NULL 2 NULL \x7b22757365725f6964223a20347d
fgbgfnyyy 7 NULL NULL 3 NULL \x7b22757365725f6964223a20377d
fsdfgerrg 2 NULL NULL 1 NULL \x7b22757365725f6964223a20327d
gtrgretrg 1 NULL NULL 1 NULL \x7b22757365725f6964223a20317d
kiku7ikkk 5 NULL NULL 2 NULL \x7b22757365725f6964223a20357d
sdfergtth 3 NULL NULL 1 NULL \x7b22757365725f6964223a20337d
werwerwwe 8 NULL NULL 4 NULL \x7b22757365725f6964223a20387d
yjtyjtyyy 9 NULL NULL 4 NULL \x7b22757365725f6964223a20397d
16 changes: 8 additions & 8 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String)ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
@@ -41,13 +41,13 @@ sleep 5

# check sink destination using shell
if cat ./query_result.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"") c7++; }
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Clickhouse sink check passed"
else
33 changes: 29 additions & 4 deletions e2e_test/batch/basic/make_time.slt.part
Original file line number Diff line number Diff line change
@@ -9,15 +9,20 @@ SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33);
query T
SELECT make_timestamptz(-1973, 07, 15, 08, 15, 55.33);
----
-1972-07-15 08:15:55.330+00:00
1973-07-15 08:15:55.330+00:00 BC

query T
SELECT make_timestamptz(20240, 1, 26, 14, 20, 26);
----
20240-01-26 14:20:26+00:00

query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamptz(-4, 02, 29, 08, 15, 55.33);

query T
SELECT make_timestamptz(-5, 02, 29, 08, 15, 55.33);
----
-0004-02-29 08:15:55.330+00:00
0005-02-29 08:15:55.330+00:00 BC

query error Invalid parameter sec: invalid sec: -55.33
SELECT make_timestamptz(1973, 07, 15, 08, 15, -55.33);
@@ -105,6 +110,11 @@ SELECT make_date(2024, 1, 26);
----
2024-01-26

query T
SELECT make_date(20240, 1, 26);
----
20240-01-26

query T
SELECT make_date(-2024, 1, 26);
----
@@ -146,15 +156,30 @@ SELECT make_timestamp(2024, 1, 26, 14, 20, 26);
----
2024-01-26 14:20:26

query T
SELECT make_timestamp(20240, 1, 26, 14, 20, 26);
----
20240-01-26 14:20:26

query T
SELECT make_timestamp(-1973, 07, 15, 08, 15, 55.33);
----
-1972-07-15 08:15:55.330
1973-07-15 08:15:55.330 BC

query error Invalid parameter year, month, day: invalid date: -3-2-29
SELECT make_timestamp(-4, 02, 29, 08, 15, 55.33);

query T
SELECT make_timestamp(-5, 02, 29, 08, 15, 55.33);
----
-0004-02-29 08:15:55.330
0005-02-29 08:15:55.330 BC

query T
select '0001-01-01 12:34:56'::timestamp - '10 year'::interval;
----
0010-01-01 12:34:56 BC

query T
select '0001-01-01 12:34:56'::timestamptz - '10 year'::interval;
----
0010-01-01 12:34:56+00:00 BC
4 changes: 2 additions & 2 deletions e2e_test/batch/catalog/pg_class.slt.part
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit
8 pg_cast 1 r
9 pg_class 1 v
10 pg_collation 1 v
11 pg_constraint 1 v
11 pg_constraint 1 r
12 pg_conversion 1 v
13 pg_database 1 v
14 pg_depend 1 v
@@ -20,4 +20,4 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit
query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass;
----
24 pg_namespace 1 v
25 pg_namespace 1 v
10 changes: 10 additions & 0 deletions e2e_test/batch/catalog/pg_constraint.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
statement ok
create table t(a int, b int, c varchar, primary key(a,b));

query TTTT
select conname, contype, conkey from pg_constraint where conname='t_pkey';
----
t_pkey p {1,2}

statement ok
drop table t;
5 changes: 5 additions & 0 deletions e2e_test/ddl/search_path.slt
Original file line number Diff line number Diff line change
@@ -76,6 +76,11 @@ select a from test order by a;
1
2

# Issue #15195
# index shall be created in `search_path_test2` (same as table) rather than `search_path_test1` (first in path)
statement ok
create index if not exists index1_test_a on test(a);

statement ok
drop table test;

6 changes: 3 additions & 3 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
@@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
);

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');
INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1);

statement ok
FLUSH;
3 changes: 2 additions & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
@@ -28,7 +28,8 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
3: missing field `properties.bootstrap.server`
3: failed to parse json
4: missing field `properties.bootstrap.server`


statement error
File renamed without changes.
44 changes: 44 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This test will test that barrier latency does not spike
# when there's rate limit on source.
# The upstream side should backpressure the source reader,
# but still allow barriers to flow through.

statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
WITH (
connector = 'datagen',
fields.i1.start = '1',
fields.i1.end = '5',
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
JOIN source_table s1 ON x.i1 = s1.i1
JOIN source_table s2 ON x.i1 = s2.i1
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
drop sink sink;

statement ok
drop table source_table;
5 changes: 3 additions & 2 deletions integration_tests/starrocks-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
version: "3"
services:
starrocks-fe:
image: starrocks/fe-ubuntu:latest
image: starrocks/fe-ubuntu:3.1.7
hostname: starrocks-fe
container_name: starrocks-fe
volumes:
@@ -19,14 +19,15 @@ services:
timeout: 5s
retries: 30
starrocks-be:
image: starrocks/be-ubuntu:latest
image: starrocks/be-ubuntu:3.1.7
command:
- /bin/bash
- -c
- |
sleep 15s; mysql --connect-timeout 2 -h starrocks-fe -P9030 -uroot -e "alter system add backend \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
- 9050:9050
- 8040:8040
hostname: starrocks-be
container_name: starrocks-be
Loading

0 comments on commit e195c92

Please sign in to comment.