Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-pg
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Dec 27, 2023
2 parents 486020d + fcaa318 commit a4f765d
Show file tree
Hide file tree
Showing 155 changed files with 3,658 additions and 1,058 deletions.
37 changes: 30 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'nats': ['json'],
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
}

def gen_pipeline_steps():
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"vector-json": ["tao"],
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
"deltalake-sink": ["xinhao"],
}

def get_failed_tests(get_test_status, test_map):
Expand Down
4 changes: 1 addition & 3 deletions e2e_test/batch/catalog/pg_attribute.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ select i.relname, a.attname, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_attribute a on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
where t.relname = 'tmp' order by a.attnum;
----
tmp_idx id2 {2,3,4,5}
tmp_idx id3 {2,3,4,5}
tmp_idx id4 {2,3,4,5}
tmp_idx id2 {2}

statement ok
drop table tmp;
6 changes: 3 additions & 3 deletions e2e_test/batch/catalog/pg_index.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id2_idx';
----
2 {2,3}
1 {2}

statement ok
create index tmp_id2_idx_include_id1 on tmp(id2) include(id1);
Expand All @@ -21,7 +21,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id2_idx_include_id1';
----
3 {2,3,4}
1 {2}

statement ok
create index tmp_id1_id2_idx on tmp(id1, id2);
Expand All @@ -32,7 +32,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id1_id2_idx';
----
3 {2,3,4}
2 {1,2}

statement ok
drop table tmp;
32 changes: 24 additions & 8 deletions e2e_test/over_window/generated/batch/main.slt.part
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
# This file is generated by `gen.py`. Do not edit it manually!

statement ok
SET RW_IMPLICIT_FLUSH TO true;

include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
set rw_implicit_flush = true;

statement ok
set rw_streaming_over_window_cache_policy = full;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_first_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_last_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = default;
8 changes: 8 additions & 0 deletions e2e_test/over_window/generated/batch/run_all.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file is generated by `gen.py`. Do not edit it manually!

include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
32 changes: 24 additions & 8 deletions e2e_test/over_window/generated/streaming/main.slt.part
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
# This file is generated by `gen.py`. Do not edit it manually!

statement ok
SET RW_IMPLICIT_FLUSH TO true;

include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
set rw_implicit_flush = true;

statement ok
set rw_streaming_over_window_cache_policy = full;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_first_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_last_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = default;
8 changes: 8 additions & 0 deletions e2e_test/over_window/generated/streaming/run_all.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file is generated by `gen.py`. Do not edit it manually!

include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
32 changes: 24 additions & 8 deletions e2e_test/over_window/templates/main.slt.part
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
set rw_implicit_flush = true;

statement ok
set rw_streaming_over_window_cache_policy = full;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_first_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_last_n;

include ./run_all.slt.part

statement ok
set rw_streaming_over_window_cache_policy = default;
6 changes: 6 additions & 0 deletions e2e_test/over_window/templates/run_all.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include ./basic/mod.slt.part
include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
25 changes: 25 additions & 0 deletions integration_tests/deltalake-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Demo: Sinking to Delta Lake

In this demo, we will create an append-only source via our datagen source,
and sink the data generated from source to the downstream delta lake table
stored on minio.

1. Launch the cluster via docker compose
```
docker compose up -d
```

2. Create a delta lake table on minio
```
docker compose exec minio-0 mkdir /data/deltalake
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
```

3. Execute the SQL queries in sequence:
- create_source.sql
- create_sink.sql

4. Query delta lake table. The following command will query the total count of records.
```
docker compose exec spark bash /spark-script/run-sql-file.sh query-table
```
10 changes: 10 additions & 0 deletions integration_tests/deltalake-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
create sink delta_lake_sink from source
with (
connector = 'deltalake',
type = 'append-only',
force_append_only='true',
location = 's3a://deltalake/delta',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.endpoint = 'http://minio-0:9301'
);
3 changes: 3 additions & 0 deletions integration_tests/deltalake-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE table source (id int, name varchar);

INSERT into source values (1, 'a'), (2, 'b'), (3, 'c');
43 changes: 43 additions & 0 deletions integration_tests/deltalake-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
version: "3"
services:
spark:
image: apache/spark:3.3.1
command: tail -f /dev/null
depends_on:
- minio-0
volumes:
- "./spark-script:/spark-script"
container_name: spark
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
name: risingwave-compose
7 changes: 7 additions & 0 deletions integration_tests/deltalake-sink/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

# build minio dir and create table
docker compose exec minio-0 mkdir /data/deltalake
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
Loading

0 comments on commit a4f765d

Please sign in to comment.