diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index bcb530ae9fdd9..a42d9718f625c 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml +poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt new file mode 100644 index 0000000000000..c004dce1ef4ba --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt @@ -0,0 +1,88 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s. +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'upsert', + database.name = 'demo_db', + table.name = 'test_all_delete', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://hummock001/iceberg-data', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true', + primary_key = 'i1,i2', +); + +statement ok +insert into s1 values(1,'2','3'); + +statement ok +insert into s1 values(7,'8','9'); + +statement ok +insert into s1 values(4,'5','6'); + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://hummock001/iceberg-data', + database.name = 'demo_db', + table.name = 'test_all_delete', +); + +query I +explain select * from iceberg_t1_source; +---- + BatchExchange { order: [], dist: Single } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], iceberg_scan_type: DataScan } + +statement ok +flush; + +statement ok +delete from s1 where i1 = 7; + +statement ok +flush; + +sleep 5s + +query I +explain select * from iceberg_t1_source; +---- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: LeftAnti, predicate: i1 = i1 AND i2 = i2 AND (_iceberg_sequence_number < _iceberg_sequence_number) } + ├─BatchExchange { order: [], dist: HashShard(i1, i2) } + │ └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3, _iceberg_sequence_number], iceberg_scan_type: DataScan } + └─BatchExchange { order: [], dist: HashShard(i1, i2) } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, _iceberg_sequence_number], iceberg_scan_type: EqualityDeleteScan } + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.toml new file mode 100644 index 0000000000000..fcd1af5774f92 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.test_explain_for_delte', +] + +slt = 'test_case/iceberg_source_explain_for_delete.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.test_explain_for_delte', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file