diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 834c293f212b..14fa66be68d5 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -427,7 +427,10 @@ static LOGICAL_FILTER_EXPRESSION_SIMPLIFY: LazyLock = LazyLoc static REWRITE_SOURCE_FOR_BATCH: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Rewrite Source For Batch", - vec![SourceToKafkaScanRule::create()], + vec![ + SourceToKafkaScanRule::create(), + SourceToIcebergScanRule::create(), + ], ApplyOrder::TopDown, ) }); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f68f2f3bc9e0..e0b69d27f647 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -975,6 +975,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo fn is_source(plan: &PlanRef) -> bool { plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan + || plan.node_type() == PlanNodeType::BatchIcebergScan } fn is_insert(plan: &PlanRef) -> bool { @@ -1007,6 +1008,7 @@ fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool { fn is_source(plan: &PlanRef) -> bool { plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan + || plan.node_type() == PlanNodeType::BatchIcebergScan } fn is_insert(plan: &PlanRef) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs new file mode 100644 index 000000000000..699d32c5a2bf --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -0,0 +1,118 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Rc; + +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::SourceNode; +use risingwave_sqlparser::ast::AsOf; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{Distribution, Order}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchIcebergScan { + pub base: PlanBase, + pub core: generic::Source, +} + +impl BatchIcebergScan { + pub fn new(core: generic::Source) -> Self { + let base = PlanBase::new_batch_with_core( + &core, + // Use `Single` by default, will be updated later with `clone_with_dist`. + Distribution::Single, + Order::any(), + ); + + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } + + pub fn source_catalog(&self) -> Option> { + self.core.catalog.clone() + } + + pub fn clone_with_dist(&self) -> Self { + let base = self + .base + .clone_with_new_distribution(Distribution::SomeShard); + Self { + base, + core: self.core.clone(), + } + } + + pub fn as_of(&self) -> Option { + self.core.as_of.clone() + } +} + +impl_plan_tree_node_for_leaf! { BatchIcebergScan } + +impl Distill for BatchIcebergScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let src = Pretty::from(self.source_catalog().unwrap().name.clone()); + let fields = vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ]; + childless_record("BatchIcebergScan", fields) + } +} + +impl ToLocalBatch for BatchIcebergScan { + fn to_local(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToDistributedBatch for BatchIcebergScan { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToBatchPb for BatchIcebergScan { + fn to_batch_prost_body(&self) -> NodeBody { + let source_catalog = self.source_catalog().unwrap(); + NodeBody::Source(SourceNode { + source_id: source_catalog.id, + info: Some(source_catalog.info.clone()), + columns: self + .core + .column_catalog + .iter() + .map(|c| c.to_protobuf()) + .collect(), + with_properties: source_catalog.with_properties.clone().into_iter().collect(), + split: vec![], + }) + } +} + +impl ExprRewritable for BatchIcebergScan {} + +impl ExprVisitable for BatchIcebergScan {} diff --git a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs new file mode 100644 index 000000000000..f4a14b837f1b --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs @@ -0,0 +1,116 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Rc; + +use pretty_xmlish::{Pretty, XmlNode}; + +use super::generic::GenericPlanRef; +use super::utils::{childless_record, Distill}; +use super::{ + generic, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef, + PredicatePushdown, ToBatch, ToStream, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext, + RewriteStreamContext, ToStreamContext, +}; +use crate::utils::{ColIndexMapping, Condition}; + +/// `LogicalIcebergScan` is only used by batch queries. At the beginning of the batch query optimization, `LogicalSource` with a iceberg property would be converted into a `LogicalIcebergScan`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalIcebergScan { + pub base: PlanBase, + pub core: generic::Source, +} + +impl LogicalIcebergScan { + pub fn new(logical_source: &LogicalSource) -> Self { + assert!(logical_source.core.is_iceberg_connector()); + + let core = logical_source.core.clone(); + let base = PlanBase::new_logical_with_core(&core); + + assert!(logical_source.output_exprs.is_none()); + + LogicalIcebergScan { base, core } + } + + pub fn source_catalog(&self) -> Option> { + self.core.catalog.clone() + } +} + +impl_plan_tree_node_for_leaf! {LogicalIcebergScan} +impl Distill for LogicalIcebergScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = if let Some(catalog) = self.source_catalog() { + let src = Pretty::from(catalog.name.clone()); + vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ] + } else { + vec![] + }; + childless_record("LogicalIcebergScan", fields) + } +} + +impl ColPrunable for LogicalIcebergScan { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + // TODO: support column pruning for iceberg scan + let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len()); + LogicalProject::with_mapping(self.clone().into(), mapping).into() + } +} + +impl ExprRewritable for LogicalIcebergScan {} + +impl ExprVisitable for LogicalIcebergScan {} + +impl PredicatePushdown for LogicalIcebergScan { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + // No pushdown. + LogicalFilter::create(self.clone().into(), predicate) + } +} + +impl ToBatch for LogicalIcebergScan { + fn to_batch(&self) -> Result { + let plan: PlanRef = BatchIcebergScan::new(self.core.clone()).into(); + Ok(plan) + } +} + +impl ToStream for LogicalIcebergScan { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + unreachable!() + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + unreachable!() + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 8a8fd37bd66b..50a28494e81d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -315,6 +315,10 @@ impl ToBatch for LogicalSource { !self.core.is_kafka_connector(), "LogicalSource with a kafka property should be converted to LogicalKafkaScan" ); + assert!( + !self.core.is_iceberg_connector(), + "LogicalSource with a iceberg property should be converted to LogicalIcebergScan" + ); let mut plan: PlanRef = BatchSource::new(self.core.clone()).into(); if let Some(exprs) = &self.output_exprs { diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index c2c46e0766ef..e7ad78f373ba 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -904,8 +904,10 @@ mod stream_topn; mod stream_values; mod stream_watermark_filter; +mod batch_iceberg_scan; mod batch_kafka_scan; mod derive; +mod logical_iceberg_scan; mod stream_cdc_table_scan; mod stream_share; mod stream_temporal_join; @@ -920,6 +922,7 @@ pub use batch_group_topn::BatchGroupTopN; pub use batch_hash_agg::BatchHashAgg; pub use batch_hash_join::BatchHashJoin; pub use batch_hop_window::BatchHopWindow; +pub use batch_iceberg_scan::BatchIcebergScan; pub use batch_insert::BatchInsert; pub use batch_kafka_scan::BatchKafkaScan; pub use batch_limit::BatchLimit; @@ -949,6 +952,7 @@ pub use logical_except::LogicalExcept; pub use logical_expand::LogicalExpand; pub use logical_filter::LogicalFilter; pub use logical_hop_window::LogicalHopWindow; +pub use logical_iceberg_scan::LogicalIcebergScan; pub use logical_insert::LogicalInsert; pub use logical_intersect::LogicalIntersect; pub use logical_join::LogicalJoin; @@ -1058,6 +1062,7 @@ macro_rules! for_all_plan_nodes { , { Logical, Except } , { Logical, MaxOneRow } , { Logical, KafkaScan } + , { Logical, IcebergScan } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1086,6 +1091,7 @@ macro_rules! for_all_plan_nodes { , { Batch, OverWindow } , { Batch, MaxOneRow } , { Batch, KafkaScan } + , { Batch, IcebergScan } , { Stream, Project } , { Stream, Filter } , { Stream, TableScan } @@ -1158,6 +1164,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, Except } , { Logical, MaxOneRow } , { Logical, KafkaScan } + , { Logical, IcebergScan } } }; } @@ -1195,6 +1202,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, OverWindow } , { Batch, MaxOneRow } , { Batch, KafkaScan } + , { Batch, IcebergScan } } }; } diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index e52e0ca7a8b3..9364a6f2b7f5 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -157,10 +157,13 @@ pub use apply_hop_window_transpose_rule::*; mod agg_call_merge_rule; pub use agg_call_merge_rule::*; mod pull_up_correlated_predicate_agg_rule; +mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; mod values_extract_project_rule; + pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; +pub use source_to_iceberg_scan_rule::*; pub use source_to_kafka_scan_rule::*; pub use values_extract_project_rule::*; @@ -236,6 +239,7 @@ macro_rules! for_all_rules { , { BatchPushLimitToScanRule } , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } + , { SourceToIcebergScanRule } } }; } diff --git a/src/frontend/src/optimizer/rule/source_to_iceberg_scan_rule.rs b/src/frontend/src/optimizer/rule/source_to_iceberg_scan_rule.rs new file mode 100644 index 000000000000..a69fb8e193b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/source_to_iceberg_scan_rule.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::{LogicalIcebergScan, LogicalSource}; +use crate::optimizer::PlanRef; + +pub struct SourceToIcebergScanRule {} +impl Rule for SourceToIcebergScanRule { + fn apply(&self, plan: PlanRef) -> Option { + let source: &LogicalSource = plan.as_logical_source()?; + if source.core.is_iceberg_connector() { + Some(LogicalIcebergScan::new(source).into()) + } else { + None + } + } +} + +impl SourceToIcebergScanRule { + pub fn create() -> BoxedRule { + Box::new(SourceToIcebergScanRule {}) + } +} diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 32dac7200619..193ee6417abd 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -987,7 +987,9 @@ impl StageRunner { node_body: Some(NodeBody::RowSeqScan(scan_node)), } } - PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { + PlanNodeType::BatchSource + | PlanNodeType::BatchKafkaScan + | PlanNodeType::BatchIcebergScan => { let node_body = execution_plan_node.node.clone(); let NodeBody::Source(mut source_node) = node_body else { unreachable!(); diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index c6340189743f..7877ab658b79 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -472,7 +472,9 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } - PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { + PlanNodeType::BatchSource + | PlanNodeType::BatchKafkaScan + | PlanNodeType::BatchIcebergScan => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { NodeBody::Source(ref mut source_node) => { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 9fc9316d0281..f5ed1b1205b6 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -54,7 +54,9 @@ use crate::catalog::catalog_service::CatalogReader; use crate::catalog::TableId; use crate::error::RwError; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; -use crate::optimizer::plan_node::{BatchKafkaScan, BatchSource, PlanNodeId, PlanNodeType}; +use crate::optimizer::plan_node::{ + BatchIcebergScan, BatchKafkaScan, BatchSource, PlanNodeId, PlanNodeType, +}; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; use crate::scheduler::SchedulerResult; @@ -1018,6 +1020,21 @@ impl BatchPlanFragmenter { as_of: None, }))); } + } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { + let batch_iceberg_scan: &BatchIcebergScan = batch_iceberg_scan; + let source_catalog = batch_iceberg_scan.source_catalog(); + if let Some(source_catalog) = source_catalog { + let property = ConnectorProperties::extract( + source_catalog.with_properties.clone().into_iter().collect(), + false, + )?; + let as_of = batch_iceberg_scan.as_of(); + return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + connector: property, + timebound: (None, None), + as_of, + }))); + } } else if let Some(source_node) = node.as_batch_source() { // TODO: use specific batch operator instead of batch source. let source_node: &BatchSource = source_node;