From becb896acd442d32eab4c14dccf558a4b346c643 Mon Sep 17 00:00:00 2001 From: Li0k Date: Sun, 8 Sep 2024 01:24:11 +0800 Subject: [PATCH] fix(iceberg): fix select empty iceberg table (#18449) --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + .../test_case/iceberg_select_empty_table.slt | 60 +++++++++++++++++++ .../test_case/iceberg_select_empty_table.toml | 11 ++++ src/connector/src/source/iceberg/mod.rs | 19 ++++-- 4 files changed, 87 insertions(+), 4 deletions(-) create mode 100644 e2e_test/iceberg/test_case/iceberg_select_empty_table.slt create mode 100644 e2e_test/iceberg/test_case/iceberg_select_empty_table.toml diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index dd2f78037a5f2..1a46f30682bdd 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -45,6 +45,7 @@ poetry run python main.py -t ./test_case/partition_upsert.toml poetry run python main.py -t ./test_case/range_partition_append_only.toml poetry run python main.py -t ./test_case/range_partition_upsert.toml poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml +poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt new file mode 100644 index 0000000000000..832a7b781f7fb --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt @@ -0,0 +1,60 @@ +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + commit_checkpoint_interval = 1, + create_table_if_not_exists = 'true' +); + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + +statement ok +flush; + +query I +select count(*) from iceberg_t1_source; +---- +0 + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_select_empty_table.toml b/e2e_test/iceberg/test_case/iceberg_select_empty_table.toml new file mode 100644 index 0000000000000..fa6eeff134c26 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_select_empty_table.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.t1', +] + +slt = 'test_case/iceberg_select_empty_table.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP SCHEMA IF EXISTS demo_db', +] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index f101ff9ed6d4b..d65929faafba1 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -206,6 +206,17 @@ impl IcebergSplitEnumerator { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table_v2().await?; + let current_snapshot = table.metadata().current_snapshot(); + if current_snapshot.is_none() { + // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. + return Ok(vec![IcebergSplit { + split_id: 0, + snapshot_id: 0, // unused + table_meta: TableMetadataJsonStr::serialize(table.metadata()), + files: vec![], + }]); + } + let snapshot_id = match time_traval_info { Some(IcebergTimeTravelInfo::Version(version)) => { let Some(snapshot) = table.metadata().snapshot_by_id(version) else { @@ -232,10 +243,10 @@ impl IcebergSplitEnumerator { } } } - None => match table.metadata().current_snapshot() { - Some(snapshot) => snapshot.snapshot_id(), - None => bail!("Cannot find the current snapshot id in the iceberg table."), - }, + None => { + assert!(current_snapshot.is_some()); + current_snapshot.unwrap().snapshot_id() + } }; let mut files = vec![];