Skip to content

Commit

Permalink
Merge branch 'main' into xxchan/painful-tarantula
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Nov 14, 2024
2 parents fe734a0 + c920694 commit b0312ab
Show file tree
Hide file tree
Showing 54 changed files with 958 additions and 262 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml


echo "--- Kill cluster"
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
pkill java

echo "--- e2e, $mode, embedded udf"
python3 -m pip install --break-system-packages flask waitress
sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt'
Expand Down
3 changes: 1 addition & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,13 @@ steps:
depends_on:
- "build"
- "build-other"

plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 7
timeout_in_minutes: 9
retry: *auto-retry

- label: "end-to-end iceberg sink v2 test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
17 changes: 17 additions & 0 deletions e2e_test/batch/catalog/slash_l_database.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# wrapped test of `\l` command for better consistency.
query T
SELECT count(*) > 0
FROM
(SELECT
d.datname AS "Name",
pg_catalog.pg_get_userbyid (d.datdba) AS "Owner",
pg_catalog.pg_encoding_to_char (d.encoding) AS "Encoding",
d.datcollate AS "Collate",
d.datctype AS "Ctype",
pg_catalog.array_to_string (d.datacl, E'\n') AS "Access privileges"
FROM
pg_catalog.pg_database AS d
ORDER BY
1);
----
t
2 changes: 1 addition & 1 deletion e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"

if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then
wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
wget --no-verbose https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
tar -xzf $SPARK_FILE --no-same-owner
fi

Expand Down
143 changes: 143 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
drop table if exists s1 cascade;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
insert into s1 select x, 'some str', 'another str' from generate_series(1, 500) t(x);

statement ok
insert into s1 select x, null as y, null as z from generate_series(501, 1000) t(x);

statement ok
flush;

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
drop source if exists iceberg_t1_source;

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select * from iceberg_t1_source order by i1 limit 1;
----
1 some str another str

query I
select count(*) from iceberg_t1_source;
----
1000

query I
select * from iceberg_t1_source where i1 > 990 order by i1;
----
991 NULL NULL
992 NULL NULL
993 NULL NULL
994 NULL NULL
995 NULL NULL
996 NULL NULL
997 NULL NULL
998 NULL NULL
999 NULL NULL
1000 NULL NULL

query I
explain select * from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: (((((i1 = 580) AND (i1 > 500)) AND (i1 < 600)) AND (i1 >= 550)) AND (i1 <= 590)) AND (i1 != 570) }

query I
select i1 from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
580

query I
explain select * from iceberg_t1_source where i1 in (1, 2, 3, 4, 5);
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 IN (5, 4, 1, 3, 2) }

query I
select i1 from iceberg_t1_source where i1 in (1, 2, 3, 4, 5) order by i1;
----
1
2
3
4
5

query I
select count(*), i2, i3 from iceberg_t1_source where i2 = 'some str' and i3 = 'another str' group by i2, i3;
----
500 some str another str

query I
explain select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [i1] }
└─BatchFilter { predicate: (i2 = i3) }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 > 500 }

query I
select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----

# Empty splits should not panic
query I
select i1 from iceberg_t1_source where i1 > 1001;
----

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_predicate_pushdown.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ publication.autocreate.mode=disabled
publication.name=${publication.name:-rw_publication}
# default heartbeat interval 5 mins
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
# emit a WAL message to the replication stream
# emit a transactional WAL message to the replication stream
# see https://github.com/risingwavelabs/risingwave/issues/16697 for more details
heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)
heartbeat.action.query=SELECT pg_logical_emit_message(true, 'heartbeat', now()::varchar)
# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing}
Expand Down
12 changes: 11 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ message WaitEpochCommitResponse {
}

message StreamingControlStreamRequest {
message InitRequest {
message InitialPartialGraph {
uint64 partial_graph_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

message InitRequest {
repeated InitialPartialGraph graphs = 1;
}

message CreatePartialGraphRequest {
uint64 partial_graph_id = 1;
}

message RemovePartialGraphRequest {
repeated uint64 partial_graph_ids = 1;
}
Expand All @@ -75,6 +84,7 @@ message StreamingControlStreamRequest {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
CreatePartialGraphRequest create_partial_graph = 4;
}
}

Expand Down
Loading

0 comments on commit b0312ab

Please sign in to comment.