Skip to content

Commit

Permalink
feat(frontend): support iceberg predicate pushdown (#19228)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 14, 2024
1 parent 9aded71 commit 1a97b4c
Show file tree
Hide file tree
Showing 14 changed files with 599 additions and 28 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml


echo "--- Kill cluster"
Expand Down
3 changes: 1 addition & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,13 @@ steps:
depends_on:
- "build"
- "build-other"

plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 7
timeout_in_minutes: 9
retry: *auto-retry

- label: "end-to-end iceberg sink v2 test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"

if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then
wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
wget --no-verbose https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
tar -xzf $SPARK_FILE --no-same-owner
fi

Expand Down
143 changes: 143 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
drop table if exists s1 cascade;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
insert into s1 select x, 'some str', 'another str' from generate_series(1, 500) t(x);

statement ok
insert into s1 select x, null as y, null as z from generate_series(501, 1000) t(x);

statement ok
flush;

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
drop source if exists iceberg_t1_source;

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select * from iceberg_t1_source order by i1 limit 1;
----
1 some str another str

query I
select count(*) from iceberg_t1_source;
----
1000

query I
select * from iceberg_t1_source where i1 > 990 order by i1;
----
991 NULL NULL
992 NULL NULL
993 NULL NULL
994 NULL NULL
995 NULL NULL
996 NULL NULL
997 NULL NULL
998 NULL NULL
999 NULL NULL
1000 NULL NULL

query I
explain select * from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: (((((i1 = 580) AND (i1 > 500)) AND (i1 < 600)) AND (i1 >= 550)) AND (i1 <= 590)) AND (i1 != 570) }

query I
select i1 from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
580

query I
explain select * from iceberg_t1_source where i1 in (1, 2, 3, 4, 5);
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 IN (5, 4, 1, 3, 2) }

query I
select i1 from iceberg_t1_source where i1 in (1, 2, 3, 4, 5) order by i1;
----
1
2
3
4
5

query I
select count(*), i2, i3 from iceberg_t1_source where i2 = 'some str' and i3 = 'another str' group by i2, i3;
----
500 some str another str

query I
explain select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [i1] }
└─BatchFilter { predicate: (i2 = i3) }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 > 500 }

query I
select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----

# Empty splits should not panic
query I
select i1 from iceberg_t1_source where i1 > 1001;
----

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_predicate_pushdown.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
42 changes: 32 additions & 10 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use anyhow::anyhow;
use async_trait::async_trait;
use futures_async_stream::for_await;
use iceberg::expr::Predicate as IcebergPredicate;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
Expand Down Expand Up @@ -137,6 +138,19 @@ pub struct IcebergSplit {
pub position_delete_files: Vec<IcebergFileScanTaskJsonStr>,
}

impl IcebergSplit {
pub fn empty(table_meta: TableMetadataJsonStr) -> Self {
Self {
split_id: 0,
snapshot_id: 0,
table_meta,
files: vec![],
equality_delete_files: vec![],
position_delete_files: vec![],
}
}
}

impl SplitMetaData for IcebergSplit {
fn id(&self) -> SplitId {
self.split_id.to_string().into()
Expand Down Expand Up @@ -189,6 +203,7 @@ impl IcebergSplitEnumerator {
schema: Schema,
time_traval_info: Option<IcebergTimeTravelInfo>,
batch_parallelism: usize,
predicate: IcebergPredicate,
) -> ConnectorResult<Vec<IcebergSplit>> {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
Expand All @@ -199,14 +214,9 @@ impl IcebergSplitEnumerator {
let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0,
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
equality_delete_files: vec![],
position_delete_files: vec![],
}]);
return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize(
table.metadata(),
))]);
}

let snapshot_id = match time_traval_info {
Expand Down Expand Up @@ -246,11 +256,15 @@ impl IcebergSplitEnumerator {

let require_names = Self::get_require_field_names(&table, snapshot_id, &schema).await?;

let table_schema = table.metadata().current_schema();
tracing::debug!("iceberg_table_schema: {:?}", table_schema);

let mut position_delete_files = vec![];
let mut data_files = vec![];
let mut equality_delete_files = vec![];
let scan = table
.scan()
.with_filter(predicate)
.snapshot_id(snapshot_id)
.select(require_names)
.build()
Expand Down Expand Up @@ -302,10 +316,18 @@ impl IcebergSplitEnumerator {
.files
.push(data_files[split_num * split_size + i].clone());
}
Ok(splits
let splits = splits
.into_iter()
.filter(|split| !split.files.is_empty())
.collect_vec())
.collect_vec();

if splits.is_empty() {
return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize(
table.metadata(),
))]);
}

Ok(splits)
}

/// The required field names are the intersection of the output shema and the equality delete columns.
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,14 @@ impl PlanRoot {
ApplyOrder::BottomUp,
));

// For iceberg scan, we do iceberg predicate pushdown
// BatchFilter -> BatchIcebergScan
let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Iceberg Predicate Pushdown",
vec![BatchIcebergPredicatePushDownRule::create()],
ApplyOrder::BottomUp,
));

assert_eq!(plan.convention(), Convention::Batch);
Ok(plan)
}
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl BatchFilter {
pub fn predicate(&self) -> &Condition {
&self.core.predicate
}

pub fn clone_with_predicate(&self, predicate: Condition) -> Self {
let mut core = self.core.clone();
core.predicate = predicate;
Self::new(core)
}
}
impl_distill_by_unit!(BatchFilter, core, "BatchFilter");

Expand Down
Loading

0 comments on commit 1a97b4c

Please sign in to comment.