Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support position delete for iceberg source #18579

Merged
merged 9 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml


echo "--- Kill cluster"
Expand Down
98 changes: 98 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_all_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_all_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 5,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3)
SELECT s, s::text, s::text FROM generate_series(1, 10000) s;

statement ok
flush

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 2) s
);

sleep 15s

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'test_all_delete',
);

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 3) s
);

sleep 15s

query I
select * from iceberg_t1_source order by i1 limit 5;
----
2 2 2
6 6 6
8 8 8
12 12 12
14 14 14

query I
select * from iceberg_t1_source order by i1 desc limit 5;
----
9998 9998 9998
9996 9996 9996
9992 9992 9992
9990 9990 9990
9986 9986 9986

query I
select count(*) from iceberg_t1_source
----
3333

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_all_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_all_delete',
]

slt = 'test_case/iceberg_source_equality_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_all_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
11 changes: 0 additions & 11 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s.
statement ok
set sink_decouple = false;

Expand All @@ -15,7 +16,7 @@ CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 't1',
table.name = 'test_equality_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
Expand Down Expand Up @@ -58,7 +59,7 @@ WITH (
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
table.name = 'test_equality_delete',
);

query I
Expand Down
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_equality_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_equality_delete',
]

slt = 'test_case/iceberg_source_equality_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_equality_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
89 changes: 89 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_position_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_position_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 5,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3)
SELECT s, s::text, s::text FROM generate_series(1, 10000) s;

statement ok
flush

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 2) s
);

sleep 15s

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'test_position_delete',
);

query I
select * from iceberg_t1_source order by i1 limit 5;
----
2 2 2
4 4 4
6 6 6
8 8 8
10 10 10

query I
select * from iceberg_t1_source order by i1 desc limit 5;
----
10000 10000 10000
9998 9998 9998
9996 9996 9996
9994 9994 9994
9992 9992 9992

query I
select count(*) from iceberg_t1_source
----
5000

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_position_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_position_delete',
]

slt = 'test_case/iceberg_source_position_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_position_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
Loading
Loading