Skip to content

Commit

Permalink
feat(iceberg): support eq delete merge on read for iceberg (#18448)
Browse files Browse the repository at this point in the history
Co-authored-by: Dylan Chen <[email protected]>
  • Loading branch information
xxhZs and chenzl25 authored Sep 13, 2024
1 parent f226e21 commit 870bcde
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 95 deletions.
70 changes: 46 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" }
# branch dev
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" }
opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml


echo "--- Kill cluster"
Expand Down
113 changes: 113 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
primary_key = 'i1,i2',
);

statement ok
insert into s1 values(1,'2','3');

statement ok
insert into s1 values(7,'8','9');

statement ok
insert into s1 values(4,'5','6');

statement ok
flush;

statement ok
delete from s1 where i1 = 7;

statement ok
flush;

sleep 5s

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

query I
select * from iceberg_t1_source order by i1;
----
1 2 3
4 5 6

query I
select i1,i2,i3 from iceberg_t1_source order by i1;
----
1 2 3
4 5 6

query I
select i3,i2 from iceberg_t1_source order by i2;
----
3 2
6 5

query I
select i2,i1 from iceberg_t1_source order by i1;
----
2 1
5 4

query I
select i1 from iceberg_t1_source order by i1;
----
1
4

query I
select i2 from iceberg_t1_source order by i2;
----
2
5

query I
select i3 from iceberg_t1_source order by i3;
----
3
6

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_source_eq_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
Loading

0 comments on commit 870bcde

Please sign in to comment.