Skip to content

Commit

Permalink
handle case of empty splits
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 9, 2024
1 parent 326cd17 commit c38a2cd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
5 changes: 5 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ query I
select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----

# Empty splits should not panic
query I
select i1 from iceberg_t1_source where i1 > 1001;
----

statement ok
DROP SINK sink1;

Expand Down
29 changes: 28 additions & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::mem;

use futures_async_stream::try_stream;
use futures_util::stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
Expand All @@ -42,6 +43,26 @@ use crate::task::BatchTaskContext;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;

pub struct IcebergScanNoDataExecutor {
schema: Schema,
identity: String,
}

impl Executor for IcebergScanNoDataExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
stream::empty().boxed()
}
}

pub struct IcebergScanExecutor {
iceberg_config: IcebergProperties,
#[allow(dead_code)]
Expand Down Expand Up @@ -199,7 +220,6 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
.iter()
.map(|split| SplitImpl::restore_from_bytes(split).unwrap())
.collect_vec();
assert_eq!(split_list.len(), 1);

let fields = source_node
.columns
Expand All @@ -214,6 +234,13 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
let schema = Schema::new(fields);
let metrics = source.context.batch_metrics().clone();

if split_list.is_empty() {
return Ok(Box::new(IcebergScanNoDataExecutor {
schema,
identity: source.plan_node().get_identity().clone(),
}));
}

if let ConnectorProperties::Iceberg(iceberg_properties) = config
&& let SplitImpl::Iceberg(split) = &split_list[0]
{
Expand Down
4 changes: 0 additions & 4 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ impl IcebergSplitEnumerator {
.filter(|split| !split.files.is_empty())
.collect_vec();

if splits.is_empty() {
bail!("No splits found for the iceberg table");
}

Ok(splits)
}

Expand Down

0 comments on commit c38a2cd

Please sign in to comment.