diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt index cb0855c77593..2075d129a8a1 100644 --- a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt @@ -128,6 +128,11 @@ query I select i1 from iceberg_t1_source where i1 > 500 and i2 = i3; ---- +# Empty splits should not panic +query I +select i1 from iceberg_t1_source where i1 > 1001; +---- + statement ok DROP SINK sink1; diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 229cff77e9b9..91d469f9a38b 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::mem; use futures_async_stream::try_stream; +use futures_util::stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; @@ -42,6 +43,26 @@ use crate::task::BatchTaskContext; static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0; static POSITION_DELETE_FILE_POS: usize = 1; + +pub struct IcebergScanNoDataExecutor { + schema: Schema, + identity: String, +} + +impl Executor for IcebergScanNoDataExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + stream::empty().boxed() + } +} + pub struct IcebergScanExecutor { iceberg_config: IcebergProperties, #[allow(dead_code)] @@ -199,7 +220,6 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { .iter() .map(|split| SplitImpl::restore_from_bytes(split).unwrap()) .collect_vec(); - assert_eq!(split_list.len(), 1); let fields = source_node .columns @@ -214,6 +234,13 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { let schema = Schema::new(fields); let metrics = source.context.batch_metrics().clone(); + if split_list.is_empty() { + return Ok(Box::new(IcebergScanNoDataExecutor { + schema, + identity: source.plan_node().get_identity().clone(), + })); + } + if let ConnectorProperties::Iceberg(iceberg_properties) = config && let SplitImpl::Iceberg(split) = &split_list[0] { diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 5c879dea63f1..87f1565622e0 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -313,10 +313,6 @@ impl IcebergSplitEnumerator { .filter(|split| !split.files.is_empty()) .collect_vec(); - if splits.is_empty() { - bail!("No splits found for the iceberg table"); - } - Ok(splits) }