diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8c74b93d96fad..88c615e3680bd 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -58,12 +58,18 @@ message ScanRange { } message SourceNode { + enum SourceType { + UNSPECIFIED = 0; + SCAN = 1; + COUNT_STAR = 2; + } uint32 source_id = 1; repeated plan_common.ColumnCatalog columns = 2; map with_properties = 3; repeated bytes split = 4; catalog.StreamSourceInfo info = 5; map secret_refs = 6; + SourceType source_type = 7; } message IcebergScanNode { diff --git a/src/batch/src/executor/iceberg_count_star_scan.rs b/src/batch/src/executor/iceberg_count_star_scan.rs new file mode 100644 index 0000000000000..b35231b88c42d --- /dev/null +++ b/src/batch/src/executor/iceberg_count_star_scan.rs @@ -0,0 +1,64 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::I64Array; +use risingwave_common::catalog::Schema; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +pub struct IcebergCountStarExecutor { + schema: Schema, + identity: String, + record_counts: Vec, +} + +impl Executor for IcebergCountStarExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergCountStarExecutor { + pub fn new(schema: Schema, identity: String, record_counts: Vec) -> Self { + Self { + schema, + identity, + record_counts, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(mut self: Box) { + let record_count = mem::take(&mut self.record_counts).into_iter().sum::() as i64; + let chunk = DataChunk::new( + vec![Arc::new(I64Array::from_iter([record_count]).into())], + 1, + ); + yield chunk; + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 07be18ca72988..c3727c99abb86 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_count_star_scan; mod iceberg_scan; mod insert; mod join; diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 7a37be9183898..e971f64ad3b15 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -28,7 +28,9 @@ use risingwave_connector::source::{ }; use risingwave_connector::WithOptionsSecResolved; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; +use super::iceberg_count_star_scan::IcebergCountStarExecutor; use super::Executor; use crate::error::{BatchError, Result}; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; @@ -101,7 +103,7 @@ impl BoxedExecutorBuilder for SourceExecutor { }) .collect(); let schema = Schema::new(fields); - + assert!(!matches!(config, ConnectorProperties::Iceberg(_))); let source_reader = SourceReader { config, diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 845ffb66804d3..e1c0e1b4e5757 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -20,7 +20,7 @@ use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; use iceberg::scan::FileScanTask; -use iceberg::spec::TableMetadata; +use iceberg::spec::{ManifestList, TableMetadata}; use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_reader::*; @@ -28,6 +28,7 @@ use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use tokio_stream::StreamExt; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; @@ -145,6 +146,7 @@ pub struct IcebergSplit { pub snapshot_id: i64, pub table_meta: TableMetadataJsonStr, pub files: Vec, + pub record_counts: Vec, pub eq_delete_files: Vec, } @@ -198,9 +200,8 @@ pub enum IcebergTimeTravelInfo { } impl IcebergSplitEnumerator { - pub async fn list_splits_batch( + pub async fn list_splits_batch_count_star( &self, - schema: Schema, time_traval_info: Option, batch_parallelism: usize, ) -> ConnectorResult> { @@ -208,53 +209,81 @@ impl IcebergSplitEnumerator { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table_v2().await?; - - let current_snapshot = table.metadata().current_snapshot(); - if current_snapshot.is_none() { + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + let Some(snapshot_id) = self.get_snapshot_id(&table, time_traval_info)? else{ // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. - return Ok(vec![IcebergSplit { - split_id: 0, - snapshot_id: 0, // unused - table_meta: TableMetadataJsonStr::serialize(table.metadata()), + return Ok(vec![IcebergSplit {split_id:0,snapshot_id:0,table_meta,files:vec![],eq_delete_files:vec![], record_counts: vec![] }]); + }; + let mut record_counts = vec![]; + let manifest_list: ManifestList = table + .metadata() + .snapshot_by_id(snapshot_id) + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|e| anyhow!(e))?; + + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + record_counts.push(file.record_count()); + } + } + let split_num = batch_parallelism; + // evenly split the files into splits based on the parallelism. + let split_size = record_counts.len() / split_num; + let remaining = record_counts.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + table_meta: table_meta.clone(), files: vec![], + record_counts: record_counts[start..end].to_vec(), eq_delete_files: vec![], - }]); + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .record_counts + .push(record_counts[split_num * split_size + i]); } - let snapshot_id = match time_traval_info { - Some(IcebergTimeTravelInfo::Version(version)) => { - let Some(snapshot) = table.metadata().snapshot_by_id(version) else { - bail!("Cannot find the snapshot id in the iceberg table."); - }; - snapshot.snapshot_id() - } - Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { - let snapshot = table - .metadata() - .snapshots() - .map(|snapshot| snapshot.timestamp().map(|ts| ts.timestamp_millis())) - .collect::, _>>()? - .into_iter() - .filter(|&snapshot_millis| snapshot_millis <= timestamp) - .max_by_key(|&snapshot_millis| snapshot_millis); - match snapshot { - Some(snapshot) => snapshot, - None => { - // convert unix time to human readable time - let time = chrono::DateTime::from_timestamp_millis(timestamp); - if time.is_some() { - bail!("Cannot find a snapshot older than {}", time.unwrap()); - } else { - bail!("Cannot find a snapshot"); - } - } - } - } - None => { - assert!(current_snapshot.is_some()); - current_snapshot.unwrap().snapshot_id() - } + Ok(splits + .into_iter() + .filter(|split| !split.record_counts.is_empty()) + .collect_vec()) + } + + pub async fn list_splits_batch_scan( + &self, + schema: Schema, + time_traval_info: Option, + batch_parallelism: usize, + ) -> ConnectorResult> { + if batch_parallelism == 0 { + bail!("Batch parallelism is 0. Cannot split the iceberg files."); + } + let table = self.config.load_table_v2().await?; + + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + + let Some(snapshot_id) = self.get_snapshot_id(&table, time_traval_info)? else{ + // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. + return Ok(vec![IcebergSplit {split_id:0,snapshot_id:0,table_meta,files:vec![],eq_delete_files:vec![], record_counts: vec![] }]); }; + let require_names = Self::get_require_field_names(&table, snapshot_id, schema).await?; let mut data_files = vec![]; @@ -286,8 +315,6 @@ impl IcebergSplitEnumerator { } } - let table_meta = TableMetadataJsonStr::serialize(table.metadata()); - let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. let split_size = data_files.len() / split_num; @@ -300,6 +327,7 @@ impl IcebergSplitEnumerator { split_id: i as i64, snapshot_id, table_meta: table_meta.clone(), + record_counts: vec![], files: data_files[start..end].to_vec(), eq_delete_files: eq_delete_files.clone(), }; @@ -356,6 +384,48 @@ impl IcebergSplitEnumerator { } Ok(require_field_names) } + + fn get_snapshot_id(&self, table: &Table, time_traval_info: Option) -> ConnectorResult> { + let current_snapshot = table.metadata().current_snapshot(); + if current_snapshot.is_none() { + // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. + return Ok(None); + } + match time_traval_info { + Some(IcebergTimeTravelInfo::Version(version)) => { + let Some(snapshot) = table.metadata().snapshot_by_id(version) else { + bail!("Cannot find the snapshot id in the iceberg table."); + }; + Ok(Some(snapshot.snapshot_id())) + } + Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { + let snapshot = table + .metadata() + .snapshots() + .map(|snapshot| snapshot.timestamp().map(|ts| ts.timestamp_millis())) + .collect::, _>>()? + .into_iter() + .filter(|&snapshot_millis| snapshot_millis <= timestamp) + .max_by_key(|&snapshot_millis| snapshot_millis); + match snapshot { + Some(snapshot) => Ok(Some(snapshot)), + None => { + // convert unix time to human readable time + let time = chrono::DateTime::from_timestamp_millis(timestamp); + if time.is_some() { + bail!("Cannot find a snapshot older than {}", time.unwrap()); + } else { + bail!("Cannot find a snapshot"); + } + } + } + } + None => { + assert!(current_snapshot.is_some()); + Ok(Some(current_snapshot.unwrap().snapshot_id())) + } + } + } } #[derive(Debug)] @@ -380,3 +450,4 @@ impl SplitReader for IcebergFileReader { unimplemented!() } } + diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de5c3deaf0d6b..b26d230725459 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -401,6 +401,12 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg count star", + vec![BatchAggCountForIcebergRule::create()], + ApplyOrder::TopDown, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } @@ -444,6 +450,12 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg count star", + vec![BatchAggCountForIcebergRule::create()], + ApplyOrder::TopDown, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } @@ -1068,6 +1080,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan || plan.node_type() == PlanNodeType::BatchIcebergScan + || plan.node_type() == PlanNodeType::BatchIcebergCountStarScan } fn is_insert(plan: &PlanRef) -> bool { @@ -1102,6 +1115,7 @@ fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool { plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan || plan.node_type() == PlanNodeType::BatchIcebergScan + || plan.node_type() == PlanNodeType::BatchIcebergCountStarScan } fn is_insert(plan: &PlanRef) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs new file mode 100644 index 0000000000000..65e78e09db116 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs @@ -0,0 +1,129 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Rc; + +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; +use risingwave_common::types::DataType; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; +use risingwave_pb::batch_plan::SourceNode; +use risingwave_sqlparser::ast::AsOf; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, BatchIcebergScan, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, + ToLocalBatch, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::Distribution; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchIcebergCountStarScan { + pub base: PlanBase, + pub core: generic::Source, +} + +impl BatchIcebergCountStarScan { + pub fn new_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self { + let mut core = batch_iceberg_scan.core.clone(); + core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named( + "count", + ColumnId::first_user_column(), + DataType::Int64, + ))]; + let base = PlanBase::new_batch_with_core( + &core, + batch_iceberg_scan.base.distribution().clone(), + batch_iceberg_scan.base.order().clone(), + ); + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } + + pub fn source_catalog(&self) -> Option> { + self.core.catalog.clone() + } + + pub fn clone_with_dist(&self) -> Self { + let base = self + .base + .clone_with_new_distribution(Distribution::SomeShard); + Self { + base, + core: self.core.clone(), + } + } + + pub fn as_of(&self) -> Option { + self.core.as_of.clone() + } +} + +impl_plan_tree_node_for_leaf! { BatchIcebergCountStarScan } + +impl Distill for BatchIcebergCountStarScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let src = Pretty::from(self.source_catalog().unwrap().name.clone()); + let fields = vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ]; + childless_record("BatchIcebergCountStarScan", fields) + } +} + +impl ToLocalBatch for BatchIcebergCountStarScan { + fn to_local(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToDistributedBatch for BatchIcebergCountStarScan { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToBatchPb for BatchIcebergCountStarScan { + fn to_batch_prost_body(&self) -> NodeBody { + let source_catalog = self.source_catalog().unwrap(); + let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts(); + NodeBody::Source(SourceNode { + source_id: source_catalog.id, + info: Some(source_catalog.info.clone()), + columns: self + .core + .column_catalog + .iter() + .map(|c| c.to_protobuf()) + .collect(), + with_properties, + split: vec![], + secret_refs, + source_type: SourceType::CountStar.into(), + }) + } +} + +impl ExprRewritable for BatchIcebergCountStarScan {} + +impl ExprVisitable for BatchIcebergCountStarScan {} diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 4333fcaa3e90a..69dbb0bdf3e7e 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,6 +16,8 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; +use risingwave_pb::batch_plan::SourceNode; use risingwave_pb::batch_plan::IcebergScanNode; use risingwave_sqlparser::ast::AsOf; @@ -109,6 +111,7 @@ impl ToBatchPb for BatchIcebergScan { with_properties, split: vec![], secret_refs, + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 0883f3aa697cd..76db232b12d32 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -18,6 +18,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use super::batch::prelude::*; @@ -133,6 +134,7 @@ impl ToBatchPb for BatchKafkaScan { with_properties, split: vec![], secret_refs, + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index 894ad92011008..f56b32fc14b41 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -29,7 +29,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSimpleAgg { pub base: PlanBase, - core: generic::Agg, + pub core: generic::Agg, } impl BatchSimpleAgg { diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index fd33d2dba0035..00877c8b4b754 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -16,6 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -115,6 +116,7 @@ impl ToBatchPb for BatchSource { with_properties, split: vec![], secret_refs, + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index db1200de2a27a..41e9093e7a10f 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -918,6 +918,7 @@ mod stream_values; mod stream_watermark_filter; mod batch_file_scan; +mod batch_iceberg_count_star_scan; mod batch_iceberg_scan; mod batch_kafka_scan; mod derive; @@ -938,6 +939,7 @@ pub use batch_group_topn::BatchGroupTopN; pub use batch_hash_agg::BatchHashAgg; pub use batch_hash_join::BatchHashJoin; pub use batch_hop_window::BatchHopWindow; +pub use batch_iceberg_count_star_scan::BatchIcebergCountStarScan; pub use batch_iceberg_scan::BatchIcebergScan; pub use batch_insert::BatchInsert; pub use batch_kafka_scan::BatchKafkaScan; @@ -1121,6 +1123,7 @@ macro_rules! for_all_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergCountStarScan } , { Batch, FileScan } , { Stream, Project } , { Stream, Filter } @@ -1241,6 +1244,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergCountStarScan } , { Batch, FileScan } } }; diff --git a/src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs b/src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs new file mode 100644 index 0000000000000..d712da7af79c0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs @@ -0,0 +1,39 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::optimizer::plan_node::{BatchIcebergCountStarScan, PlanAggCall}; +use crate::optimizer::{BoxedRule, PlanRef, Rule}; + +pub struct BatchAggCountForIcebergRule {} +impl Rule for BatchAggCountForIcebergRule { + fn apply(&self, plan: PlanRef) -> Option { + let agg = plan.as_batch_simple_agg()?; + if agg.core.group_key.is_empty() + && agg.agg_calls().len() == 1 + && agg.agg_calls()[0].eq(&PlanAggCall::count_star()) + { + let batch_iceberg = agg.core.input.as_batch_iceberg_scan()?; + return Some( + BatchIcebergCountStarScan::new_with_batch_iceberg_scan(batch_iceberg).into(), + ); + } + None + } +} + +impl BatchAggCountForIcebergRule { + pub fn create() -> BoxedRule { + Box::new(BatchAggCountForIcebergRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/batch/mod.rs b/src/frontend/src/optimizer/rule/batch/mod.rs index 6061c985b6696..72537a6d06a1f 100644 --- a/src/frontend/src/optimizer/rule/batch/mod.rs +++ b/src/frontend/src/optimizer/rule/batch/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch_agg_count_for_iceberg_rule; pub(crate) mod batch_project_merge_rule; pub mod batch_push_limit_to_scan_rule; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 180dafa0c79b6..d4babb799c09a 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -163,6 +163,7 @@ mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; mod values_extract_project_rule; +pub use batch::batch_agg_count_for_iceberg_rule::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; @@ -245,6 +246,7 @@ macro_rules! for_all_rules { , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } + , { BatchAggCountForIcebergRule } } }; } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e933d3f271108..93958e5095066 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -1072,7 +1072,7 @@ impl StageRunner { node_body: Some(NodeBody::Source(source_node)), } } - PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchIcebergScan | PlanNodeType::BatchIcebergScan => { let node_body = execution_plan_node.node.clone(); let NodeBody::IcebergScan(mut iceberg_scan_node) = node_body else { unreachable!(); diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index fcd15368bb5fc..24627f96f76ab 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -577,7 +577,7 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } - PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchIcebergScan | PlanNodeType::BatchIcebergCountStarScan => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { NodeBody::IcebergScan(ref mut iceberg_scan_node) => { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6777f9373b841..443a96d597a10 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -43,6 +43,7 @@ use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::plan_common::Field as PbField; use risingwave_sqlparser::ast::AsOf; @@ -56,7 +57,8 @@ use crate::catalog::TableId; use crate::error::RwError; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; use crate::optimizer::plan_node::{ - BatchIcebergScan, BatchKafkaScan, BatchSource, PlanNodeId, PlanNodeType, + BatchIcebergCountStarScan, BatchIcebergScan, BatchKafkaScan, BatchSource, PlanNodeId, + PlanNodeType, }; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; @@ -274,6 +276,7 @@ pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, + pub source_type: SourceType, } #[derive(Clone, Debug)] @@ -368,12 +371,25 @@ impl SourceScanInfo { None => None, }; - let split_info = iceberg_enumerator - .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism) - .await? - .into_iter() - .map(SplitImpl::Iceberg) - .collect_vec(); + let split_info = match fetch_info.source_type { + SourceType::Scan => iceberg_enumerator + .list_splits_batch_scan( + fetch_info.schema, + time_travel_info, + batch_parallelism, + ) + .await? + .into_iter() + .map(SplitImpl::Iceberg) + .collect_vec(), + SourceType::CountStar => iceberg_enumerator + .list_splits_batch_count_star(time_travel_info, batch_parallelism) + .await? + .into_iter() + .map(SplitImpl::Iceberg) + .collect_vec(), + SourceType::Unspecified => unreachable!(), + }; Ok(SourceScanInfo::Complete(split_info)) } @@ -1070,6 +1086,7 @@ impl BatchPlanFragmenter { connector: property, timebound: timestamp_bound, as_of: None, + source_type: SourceType::Scan, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1084,6 +1101,24 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, + source_type: SourceType::Scan, + }))); + } + } else if let Some(batch_iceberg_count_star_scan) = node.as_batch_iceberg_count_star_scan() + { + let batch_iceberg_count_star_scan: &BatchIcebergCountStarScan = + batch_iceberg_count_star_scan; + let source_catalog = batch_iceberg_count_star_scan.source_catalog(); + if let Some(source_catalog) = source_catalog { + let property = + ConnectorProperties::extract(source_catalog.with_properties.clone(), false)?; + let as_of = batch_iceberg_count_star_scan.as_of(); + return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + schema: batch_iceberg_count_star_scan.base.schema().clone(), + connector: property, + timebound: (None, None), + as_of, + source_type: SourceType::CountStar, }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1099,6 +1134,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, + source_type: SourceType::Scan, }))); } }