From 1a97b4cccf266594477e487c063de76f2a085944 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:35:07 +0800 Subject: [PATCH] feat(frontend): support iceberg predicate pushdown (#19228) --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + ci/workflows/main-cron.yml | 3 +- ci/workflows/pull-request.yml | 2 +- .../iceberg/start_spark_connect_server.sh | 2 +- .../test_case/iceberg_predicate_pushdown.slt | 143 ++++++++ .../test_case/iceberg_predicate_pushdown.toml | 11 + src/connector/src/source/iceberg/mod.rs | 42 ++- src/frontend/src/optimizer/mod.rs | 8 + .../src/optimizer/plan_node/batch_filter.rs | 6 + .../optimizer/plan_node/batch_iceberg_scan.rs | 46 ++- .../batch/batch_iceberg_predicate_pushdown.rs | 305 ++++++++++++++++++ src/frontend/src/optimizer/rule/batch/mod.rs | 1 + src/frontend/src/optimizer/rule/mod.rs | 2 + src/frontend/src/scheduler/plan_fragmenter.rs | 55 +++- 14 files changed, 599 insertions(+), 28 deletions(-) create mode 100644 e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt create mode 100644 e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml create mode 100644 src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index bcb530ae9fdd..27fc92a789d1 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml +poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml echo "--- Kill cluster" diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 3f3cd705f09f..e8a0fa32f101 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -414,14 +414,13 @@ steps: depends_on: - "build" - "build-other" - plugins: - docker-compose#v5.1.0: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 7 + timeout_in_minutes: 9 retry: *auto-retry - label: "end-to-end iceberg sink v2 test (release)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 2991397dfb1c..e10ffb2d0091 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -270,7 +270,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 17 retry: *auto-retry - label: "end-to-end iceberg cdc test" diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index 8f0c2640a1b5..7996899f7a4d 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -11,7 +11,7 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz" if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then - wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE + wget --no-verbose https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE tar -xzf $SPARK_FILE --no-same-owner fi diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt new file mode 100644 index 000000000000..2075d129a8a1 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt @@ -0,0 +1,143 @@ +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +drop table if exists s1 cascade; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +insert into s1 select x, 'some str', 'another str' from generate_series(1, 500) t(x); + +statement ok +insert into s1 select x, null as y, null as z from generate_series(501, 1000) t(x); + +statement ok +flush; + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + commit_checkpoint_interval = 1, + create_table_if_not_exists = 'true' +); + +statement ok +drop source if exists iceberg_t1_source; + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + s3.path.style.access = 'true', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + +statement ok +flush; + +query I +select * from iceberg_t1_source order by i1 limit 1; +---- +1 some str another str + +query I +select count(*) from iceberg_t1_source; +---- +1000 + +query I +select * from iceberg_t1_source where i1 > 990 order by i1; +---- +991 NULL NULL +992 NULL NULL +993 NULL NULL +994 NULL NULL +995 NULL NULL +996 NULL NULL +997 NULL NULL +998 NULL NULL +999 NULL NULL +1000 NULL NULL + +query I +explain select * from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580; +---- + BatchExchange { order: [], dist: Single } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: (((((i1 = 580) AND (i1 > 500)) AND (i1 < 600)) AND (i1 >= 550)) AND (i1 <= 590)) AND (i1 != 570) } + +query I +select i1 from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580; +---- +580 + +query I +explain select * from iceberg_t1_source where i1 in (1, 2, 3, 4, 5); +---- + BatchExchange { order: [], dist: Single } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 IN (5, 4, 1, 3, 2) } + +query I +select i1 from iceberg_t1_source where i1 in (1, 2, 3, 4, 5) order by i1; +---- +1 +2 +3 +4 +5 + +query I +select count(*), i2, i3 from iceberg_t1_source where i2 = 'some str' and i3 = 'another str' group by i2, i3; +---- +500 some str another str + +query I +explain select i1 from iceberg_t1_source where i1 > 500 and i2 = i3; +---- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [i1] } + └─BatchFilter { predicate: (i2 = i3) } + └─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 > 500 } + +query I +select i1 from iceberg_t1_source where i1 > 500 and i2 = i3; +---- + +# Empty splits should not panic +query I +select i1 from iceberg_t1_source where i1 > 1001; +---- + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml new file mode 100644 index 000000000000..c08dcbb827db --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.t1', +] + +slt = 'test_case/iceberg_predicate_pushdown.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP SCHEMA IF EXISTS demo_db', +] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 60a26e43e1d3..aeb642c80a01 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; +use iceberg::expr::Predicate as IcebergPredicate; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use iceberg::table::Table; @@ -137,6 +138,19 @@ pub struct IcebergSplit { pub position_delete_files: Vec, } +impl IcebergSplit { + pub fn empty(table_meta: TableMetadataJsonStr) -> Self { + Self { + split_id: 0, + snapshot_id: 0, + table_meta, + files: vec![], + equality_delete_files: vec![], + position_delete_files: vec![], + } + } +} + impl SplitMetaData for IcebergSplit { fn id(&self) -> SplitId { self.split_id.to_string().into() @@ -189,6 +203,7 @@ impl IcebergSplitEnumerator { schema: Schema, time_traval_info: Option, batch_parallelism: usize, + predicate: IcebergPredicate, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); @@ -199,14 +214,9 @@ impl IcebergSplitEnumerator { let current_snapshot = table.metadata().current_snapshot(); if current_snapshot.is_none() { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. - return Ok(vec![IcebergSplit { - split_id: 0, - snapshot_id: 0, - table_meta: TableMetadataJsonStr::serialize(table.metadata()), - files: vec![], - equality_delete_files: vec![], - position_delete_files: vec![], - }]); + return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize( + table.metadata(), + ))]); } let snapshot_id = match time_traval_info { @@ -246,11 +256,15 @@ impl IcebergSplitEnumerator { let require_names = Self::get_require_field_names(&table, snapshot_id, &schema).await?; + let table_schema = table.metadata().current_schema(); + tracing::debug!("iceberg_table_schema: {:?}", table_schema); + let mut position_delete_files = vec![]; let mut data_files = vec![]; let mut equality_delete_files = vec![]; let scan = table .scan() + .with_filter(predicate) .snapshot_id(snapshot_id) .select(require_names) .build() @@ -302,10 +316,18 @@ impl IcebergSplitEnumerator { .files .push(data_files[split_num * split_size + i].clone()); } - Ok(splits + let splits = splits .into_iter() .filter(|split| !split.files.is_empty()) - .collect_vec()) + .collect_vec(); + + if splits.is_empty() { + return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize( + table.metadata(), + ))]); + } + + Ok(splits) } /// The required field names are the intersection of the output shema and the equality delete columns. diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e08f6b2c4dd4..30d51bb93326 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -402,6 +402,14 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + // For iceberg scan, we do iceberg predicate pushdown + // BatchFilter -> BatchIcebergScan + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg Predicate Pushdown", + vec![BatchIcebergPredicatePushDownRule::create()], + ApplyOrder::BottomUp, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index ff89eacd485c..6404fd852e6d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -45,6 +45,12 @@ impl BatchFilter { pub fn predicate(&self) -> &Condition { &self.core.predicate } + + pub fn clone_with_predicate(&self, predicate: Condition) -> Self { + let mut core = self.core.clone(); + core.predicate = predicate; + Self::new(core) + } } impl_distill_by_unit!(BatchFilter, core, "BatchFilter"); diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 4333fcaa3e90..815b711faa29 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::{Hash, Hasher}; use std::rc::Rc; +use iceberg::expr::Predicate as IcebergPredicate; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::IcebergScanNode; @@ -29,10 +31,36 @@ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::property::{Distribution, Order}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone)] pub struct BatchIcebergScan { pub base: PlanBase, pub core: generic::Source, + pub predicate: IcebergPredicate, +} + +impl PartialEq for BatchIcebergScan { + fn eq(&self, other: &Self) -> bool { + if self.predicate == IcebergPredicate::AlwaysTrue + && other.predicate == IcebergPredicate::AlwaysTrue + { + self.base == other.base && self.core == other.core + } else { + panic!("BatchIcebergScan::eq: comparing non-AlwaysTrue predicates is not supported") + } + } +} + +impl Eq for BatchIcebergScan {} + +impl Hash for BatchIcebergScan { + fn hash(&self, state: &mut H) { + if self.predicate != IcebergPredicate::AlwaysTrue { + panic!("BatchIcebergScan::hash: hashing non-AlwaysTrue predicates is not supported") + } else { + self.base.hash(state); + self.core.hash(state); + } + } } impl BatchIcebergScan { @@ -44,7 +72,11 @@ impl BatchIcebergScan { Order::any(), ); - Self { base, core } + Self { + base, + core, + predicate: IcebergPredicate::AlwaysTrue, + } } pub fn column_names(&self) -> Vec<&str> { @@ -62,6 +94,15 @@ impl BatchIcebergScan { Self { base, core: self.core.clone(), + predicate: self.predicate.clone(), + } + } + + pub fn clone_with_predicate(&self, predicate: IcebergPredicate) -> Self { + Self { + base: self.base.clone(), + core: self.core.clone(), + predicate, } } @@ -78,6 +119,7 @@ impl Distill for BatchIcebergScan { let fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), + ("predicate", Pretty::from(self.predicate.to_string())), ]; childless_record("BatchIcebergScan", fields) } diff --git a/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs new file mode 100644 index 000000000000..8df8777d5938 --- /dev/null +++ b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs @@ -0,0 +1,305 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +use chrono::Datelike; +use iceberg::expr::{Predicate as IcebergPredicate, Reference}; +use iceberg::spec::Datum as IcebergDatum; +use risingwave_common::catalog::Field; +use risingwave_common::types::{Decimal, ScalarImpl}; + +use crate::expr::{ExprImpl, ExprType, Literal}; +use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::optimizer::PlanRef; +use crate::utils::Condition; + +/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here. +/// zone-map is used to evaluate predicates on iceberg tables. +/// Without zone-map, iceberg-sdk will still apply the predicate on its own. +/// See: . +pub struct BatchIcebergPredicatePushDownRule {} + +impl Rule for BatchIcebergPredicatePushDownRule { + fn apply(&self, plan: PlanRef) -> Option { + let filter: &BatchFilter = plan.as_batch_filter()?; + let input = filter.input(); + let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?; + // NOTE(kwannoel): We only fill iceberg predicate here. + assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue); + + let predicate = filter.predicate().clone(); + let (iceberg_predicate, rw_predicate) = + rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields()); + let scan = scan.clone_with_predicate(iceberg_predicate); + if rw_predicate.always_true() { + Some(scan.into()) + } else { + let filter = filter + .clone_with_input(scan.into()) + .clone_with_predicate(rw_predicate); + Some(filter.into()) + } + } +} + +fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option { + let Some(scalar) = literal.get_data() else { + return None; + }; + match scalar { + ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)), + ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)), + ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)), + ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)), + ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)), + ScalarImpl::Decimal(d) => { + let Decimal::Normalized(d) = d else { + return None; + }; + let Ok(d) = IcebergDatum::decimal(*d) else { + return None; + }; + Some(d) + } + ScalarImpl::Date(d) => { + let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else { + return None; + }; + Some(datum) + } + ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_nanos( + t.0.and_utc().timestamp_nanos_opt()?, + )), + ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())), + ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)), + ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())), + _ => None, + } +} + +fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option { + match expr { + ExprImpl::Literal(l) => match l.get_data() { + Some(ScalarImpl::Bool(b)) => { + if *b { + Some(IcebergPredicate::AlwaysTrue) + } else { + Some(IcebergPredicate::AlwaysFalse) + } + } + _ => None, + }, + ExprImpl::FunctionCall(f) => { + let args = f.inputs(); + match f.func_type() { + ExprType::Not => { + let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?; + Some(IcebergPredicate::negate(arg)) + } + ExprType::And => { + let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?; + let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?; + Some(IcebergPredicate::and(arg0, arg1)) + } + ExprType::Or => { + let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?; + let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?; + Some(IcebergPredicate::or(arg0, arg1)) + } + ExprType::Equal => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.equal_to(datum)) + } + _ => None, + }, + ExprType::NotEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.not_equal_to(datum)) + } + _ => None, + }, + ExprType::GreaterThan => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + _ => None, + }, + ExprType::GreaterThanOrEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + _ => None, + }, + ExprType::LessThan => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + _ => None, + }, + ExprType::LessThanOrEqual => match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + _ => None, + }, + ExprType::IsNull => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + Some(reference.is_null()) + } + _ => None, + }, + ExprType::IsNotNull => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + Some(reference.is_not_null()) + } + _ => None, + }, + ExprType::In => match &args[0] { + ExprImpl::InputRef(lhs) => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let mut datums = Vec::with_capacity(args.len() - 1); + for arg in &args[1..] { + if let ExprImpl::Literal(l) = arg { + if let Some(datum) = rw_literal_to_iceberg_datum(l) { + datums.push(datum); + } else { + return None; + } + } else { + return None; + } + } + Some(reference.is_in(datums)) + } + _ => None, + }, + _ => None, + } + } + _ => None, + } +} +fn rw_predicate_to_iceberg_predicate( + predicate: Condition, + fields: &[Field], +) -> (IcebergPredicate, Condition) { + if predicate.always_true() { + return (IcebergPredicate::AlwaysTrue, predicate); + } + + let mut conjunctions = predicate.conjunctions; + let mut ignored_conjunctions: Vec = Vec::with_capacity(conjunctions.len()); + + let mut iceberg_condition_root = None; + while let Some(conjunction) = conjunctions.pop() { + match rw_expr_to_iceberg_predicate(&conjunction, fields) { + iceberg_predicate @ Some(_) => { + iceberg_condition_root = iceberg_predicate; + break; + } + None => { + ignored_conjunctions.push(conjunction); + continue; + } + } + } + + let mut iceberg_condition_root = match iceberg_condition_root { + Some(p) => p, + None => { + return ( + IcebergPredicate::AlwaysTrue, + Condition { + conjunctions: ignored_conjunctions, + }, + ) + } + }; + + for rw_condition in conjunctions { + match rw_expr_to_iceberg_predicate(&rw_condition, fields) { + Some(iceberg_predicate) => { + iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate) + } + None => ignored_conjunctions.push(rw_condition), + } + } + ( + iceberg_condition_root, + Condition { + conjunctions: ignored_conjunctions, + }, + ) +} + +impl BatchIcebergPredicatePushDownRule { + pub fn create() -> BoxedRule { + Box::new(BatchIcebergPredicatePushDownRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/batch/mod.rs b/src/frontend/src/optimizer/rule/batch/mod.rs index 6061c985b669..c4d31faf3cfb 100644 --- a/src/frontend/src/optimizer/rule/batch/mod.rs +++ b/src/frontend/src/optimizer/rule/batch/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch_iceberg_predicate_pushdown; pub(crate) mod batch_project_merge_rule; pub mod batch_push_limit_to_scan_rule; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 7468f1c96524..e9bd08e6c679 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -165,6 +165,7 @@ mod table_function_to_mysql_query_rule; mod table_function_to_postgres_query_rule; mod values_extract_project_rule; +pub use batch::batch_iceberg_predicate_pushdown::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; @@ -248,6 +249,7 @@ macro_rules! for_all_rules { , { AggCallMergeRule } , { ValuesExtractProjectRule } , { BatchPushLimitToScanRule } + , { BatchIcebergPredicatePushDownRule } , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 90984750bc46..9cec27601a24 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -22,6 +22,7 @@ use anyhow::anyhow; use async_recursion::async_recursion; use enum_as_inner::EnumAsInner; use futures::TryStreamExt; +use iceberg::expr::Predicate as IcebergPredicate; use itertools::Itertools; use pgwire::pg_server::SessionId; use risingwave_batch::error::BatchError; @@ -268,11 +269,25 @@ impl Query { } } +#[derive(Debug, Clone)] +pub enum SourceFetchParameters { + IcebergPredicate(IcebergPredicate), + KafkaTimebound { + lower: Option, + upper: Option, + }, + Empty, +} + #[derive(Debug, Clone)] pub struct SourceFetchInfo { pub schema: Schema, + /// These are user-configured connector properties. + /// e.g. host, username, etc... pub connector: ConnectorProperties, - pub timebound: (Option, Option), + /// These parameters are internally derived by the plan node. + /// e.g. predicate pushdown for iceberg, timebound for kafka. + pub fetch_parameters: SourceFetchParameters, pub as_of: Option, } @@ -295,13 +310,16 @@ impl SourceScanInfo { unreachable!("Never call complete when SourceScanInfo is already complete") } }; - match fetch_info.connector { - ConnectorProperties::Kafka(prop) => { + match (fetch_info.connector, fetch_info.fetch_parameters) { + ( + ConnectorProperties::Kafka(prop), + SourceFetchParameters::KafkaTimebound { lower, upper }, + ) => { let mut kafka_enumerator = KafkaSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; let split_info = kafka_enumerator - .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) + .list_splits_batch(lower, upper) .await? .into_iter() .map(SplitImpl::Kafka) @@ -309,7 +327,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(split_info)) } - ConnectorProperties::OpendalS3(prop) => { + (ConnectorProperties::OpendalS3(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -322,7 +340,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Gcs(prop) => { + (ConnectorProperties::Gcs(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -331,7 +349,7 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Azblob(prop) => { + (ConnectorProperties::Azblob(prop), SourceFetchParameters::Empty) => { let lister: OpendalEnumerator = OpendalEnumerator::new_azblob_source(*prop)?; let stream = build_opendal_fs_list_for_batch(lister); @@ -340,7 +358,10 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } - ConnectorProperties::Iceberg(prop) => { + ( + ConnectorProperties::Iceberg(prop), + SourceFetchParameters::IcebergPredicate(predicate), + ) => { let iceberg_enumerator = IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) .await?; @@ -369,7 +390,12 @@ impl SourceScanInfo { }; let split_info = iceberg_enumerator - .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism) + .list_splits_batch( + fetch_info.schema, + time_travel_info, + batch_parallelism, + predicate, + ) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -1068,7 +1094,10 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: batch_kafka_scan.base.schema().clone(), connector: property, - timebound: timestamp_bound, + fetch_parameters: SourceFetchParameters::KafkaTimebound { + lower: timestamp_bound.0, + upper: timestamp_bound.1, + }, as_of: None, }))); } @@ -1082,7 +1111,9 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: batch_iceberg_scan.base.schema().clone(), connector: property, - timebound: (None, None), + fetch_parameters: SourceFetchParameters::IcebergPredicate( + batch_iceberg_scan.predicate.clone(), + ), as_of, }))); } @@ -1097,7 +1128,7 @@ impl BatchPlanFragmenter { return Ok(Some(SourceScanInfo::new(SourceFetchInfo { schema: source_node.base.schema().clone(), connector: property, - timebound: (None, None), + fetch_parameters: SourceFetchParameters::Empty, as_of, }))); }