From 4f41f926204b6641a7f174537078947fadbd0661 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 12 Aug 2024 12:28:36 +0800 Subject: [PATCH 1/7] su --- proto/batch_plan.proto | 1 + risedev.yml | 4 +-- src/batch/src/executor/iceberg_scan.rs | 20 ++++++++++- src/batch/src/executor/source.rs | 2 ++ src/connector/src/source/iceberg/mod.rs | 29 ++++++++++++++- src/frontend/src/optimizer/mod.rs | 12 +++++++ .../optimizer/plan_node/batch_iceberg_scan.rs | 1 + .../optimizer/plan_node/batch_kafka_scan.rs | 1 + .../optimizer/plan_node/batch_simple_agg.rs | 2 +- .../src/optimizer/plan_node/batch_source.rs | 1 + .../src/optimizer/plan_node/generic/source.rs | 10 ++++-- .../plan_node/logical_iceberg_scan.rs | 5 +++ .../src/optimizer/plan_node/logical_source.rs | 1 + .../rule/agg_count_for_iceberg_rule.rs | 36 +++++++++++++++++++ src/frontend/src/optimizer/rule/mod.rs | 3 ++ src/frontend/src/scheduler/plan_fragmenter.rs | 2 ++ 16 files changed, 123 insertions(+), 7 deletions(-) create mode 100644 src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 9b12d0b583d1..d0e820862393 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -64,6 +64,7 @@ message SourceNode { repeated bytes split = 4; catalog.StreamSourceInfo info = 5; map secret_refs = 6; + bool is_iceberg_count = 7; } message FileScanNode { diff --git a/risedev.yml b/risedev.yml index b4b558face0e..9c084b9b5227 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,7 +20,7 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - # - use: minio + - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 @@ -40,7 +40,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - # - use: compactor + - use: compactor # If you want to create source from Kafka, uncomment the following lines # - use: kafka diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index ee4e463422c1..f39f4c3564f4 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -13,12 +13,14 @@ // limitations under the License. use std::mem; +use std::sync::Arc; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::I64Array; use risingwave_common::catalog::Schema; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -34,6 +36,8 @@ pub struct IcebergScanExecutor { batch_size: usize, schema: Schema, identity: String, + is_iceberg_count: bool, + record_counts: Vec, } impl Executor for IcebergScanExecutor { @@ -59,6 +63,8 @@ impl IcebergScanExecutor { batch_size: usize, schema: Schema, identity: String, + is_iceberg_count: bool, + record_counts: Vec, ) -> Self { Self { iceberg_config, @@ -68,12 +74,23 @@ impl IcebergScanExecutor { batch_size, schema, identity, + is_iceberg_count, + record_counts, } } #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(mut self: Box) { - let table = self + println!("is_iceberg_count: {}", self.is_iceberg_count); + if self.is_iceberg_count{ + let record_count = mem::take(&mut self.record_counts).into_iter().sum::() as i64; + println!("record_count: {}", record_count); + let chunk = DataChunk::new(vec![ + Arc::new(I64Array::from_iter([record_count]).into()), + ], 1); + yield chunk; + }else{ + let table = self .iceberg_config .load_table_v2_with_metadata(self.table_meta) .await?; @@ -106,5 +123,6 @@ impl IcebergScanExecutor { debug_assert_eq!(chunk.data_types(), data_types); yield chunk; } + } } } diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index e01ee9e4b7de..1b7a95cc7fc7 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -116,6 +116,8 @@ impl BoxedExecutorBuilder for SourceExecutor { source.context.get_config().developer.chunk_size, schema, source.plan_node().get_identity().clone(), + source_node.is_iceberg_count, + split.record_counts, ))) } else { unreachable!() diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index f101ff9ed6d4..c1f55de793a3 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -20,13 +20,14 @@ use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; use iceberg::scan::FileScanTask; -use iceberg::spec::TableMetadata; +use iceberg::spec::{ManifestList, TableMetadata}; use itertools::Itertools; pub use parquet_file_reader::*; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use tokio_stream::StreamExt; use crate::error::ConnectorResult; use crate::parser::ParserConfig; @@ -144,6 +145,7 @@ pub struct IcebergSplit { pub snapshot_id: i64, pub table_meta: TableMetadataJsonStr, pub files: Vec, + pub record_counts: Vec, } impl SplitMetaData for IcebergSplit { @@ -238,6 +240,27 @@ impl IcebergSplitEnumerator { }, }; let mut files = vec![]; + let mut record_counts = vec![]; + + let manifest_list: ManifestList = table.metadata().snapshot_by_id(snapshot_id) + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|e| anyhow!(e))?; + + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + record_counts.push(file.record_count()); + } + } let scan = table .scan() @@ -255,6 +278,9 @@ impl IcebergSplitEnumerator { } let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + if files.len() != record_counts.len() { + bail!("The number of files does not match the number of record count."); + } let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. @@ -269,6 +295,7 @@ impl IcebergSplitEnumerator { snapshot_id, table_meta: table_meta.clone(), files: files[start..end].to_vec(), + record_counts: record_counts[start..end].to_vec(), }; splits.push(split); } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f59c0635b8bd..a07da2e60047 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -401,6 +401,12 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg count star", + vec![AggCountForIcebergRule::create()], + ApplyOrder::TopDown, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } @@ -444,6 +450,12 @@ impl PlanRoot { ApplyOrder::BottomUp, )); + let plan = plan.optimize_by_rules(&OptimizationStage::new( + "Iceberg count star", + vec![AggCountForIcebergRule::create()], + ApplyOrder::TopDown, + )); + assert_eq!(plan.convention(), Convention::Batch); Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 3433feb8d210..118fb7351bcf 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -111,6 +111,7 @@ impl ToBatchPb for BatchIcebergScan { with_properties, split: vec![], secret_refs, + is_iceberg_count: self.core.is_iceberg_count, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 0883f3aa697c..6fd329e5c9dd 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -133,6 +133,7 @@ impl ToBatchPb for BatchKafkaScan { with_properties, split: vec![], secret_refs, + is_iceberg_count: self.core.is_iceberg_count, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index ff20df7a4d17..bdee2412ae68 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -28,7 +28,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSimpleAgg { pub base: PlanBase, - core: generic::Agg, + pub core: generic::Agg, } impl BatchSimpleAgg { diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index fd33d2dba003..40d43f9392b7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -115,6 +115,7 @@ impl ToBatchPb for BatchSource { with_properties, split: vec![], secret_refs, + is_iceberg_count: self.core.is_iceberg_count, }) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index af138f7dd80a..c8f25259e973 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -64,15 +64,21 @@ pub struct Source { pub ctx: OptimizerContextRef, pub as_of: Option, + + pub is_iceberg_count: bool, } impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = self + let fields = if self.is_iceberg_count{ + vec![Field::with_name(DataType::Int64, "count".to_string())] + }else{ + self .column_catalog .iter() .map(|c| (&c.column_desc).into()) - .collect(); + .collect() + }; Schema { fields } } diff --git a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs index b355d6f2057c..8bc2ca33f0a6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs @@ -51,6 +51,11 @@ impl LogicalIcebergScan { LogicalIcebergScan { base, core } } + pub fn new_with_core(core: generic::Source) -> Self { + let base = PlanBase::new_logical_with_core(&core); + LogicalIcebergScan { base, core } + } + pub fn source_catalog(&self) -> Option> { self.core.catalog.clone() } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 50024b4274e7..1f51bc01186f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -83,6 +83,7 @@ impl LogicalSource { kind, ctx, as_of, + is_iceberg_count: false, }; if core.as_of.is_some() && !core.support_time_travel() { diff --git a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs new file mode 100644 index 000000000000..c18b1c6e0d23 --- /dev/null +++ b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs @@ -0,0 +1,36 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{BoxedRule, Rule}; +use crate::optimizer::{plan_node::{BatchIcebergScan, LogicalIcebergScan, PlanAggCall}, PlanRef}; + +pub struct AggCountForIcebergRule {} +impl Rule for AggCountForIcebergRule { + fn apply(&self, plan: PlanRef) -> Option { + let agg = plan.as_batch_simple_agg()?; + if agg.core.group_key.is_empty() && agg.agg_calls().len() == 1 && agg.agg_calls()[0].eq(&PlanAggCall::count_star()){ + let mut source = agg.core.input.as_batch_iceberg_scan()?.core.clone(); + source.is_iceberg_count = true; + println!("plan213321{:?}",source); + return Some(BatchIcebergScan::new(source).into()) + } + None + } +} + +impl AggCountForIcebergRule { + pub fn create() -> BoxedRule { + Box::new(AggCountForIcebergRule {}) + } +} \ No newline at end of file diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 180dafa0c79b..6dd8298f95d8 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -159,6 +159,7 @@ mod agg_call_merge_rule; pub use agg_call_merge_rule::*; mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; +mod agg_count_for_iceberg_rule; mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; mod values_extract_project_rule; @@ -166,6 +167,7 @@ mod values_extract_project_rule; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; +pub use agg_count_for_iceberg_rule::*; pub use source_to_kafka_scan_rule::*; pub use table_function_to_file_scan_rule::*; pub use values_extract_project_rule::*; @@ -245,6 +247,7 @@ macro_rules! for_all_rules { , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } + , { AggCountForIcebergRule } } }; } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 7643e5c5e7ba..8aa48000ff86 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -191,6 +191,7 @@ impl BatchPlanFragmenter { /// Split the plan node into each stages, based on exchange node. fn split_into_stage(&mut self, batch_node: PlanRef) -> SchedulerResult<()> { + println!("split_into_stage{:?}",batch_node); let root_stage = self.new_stage( batch_node, Some(Distribution::Single.to_prost( @@ -884,6 +885,7 @@ impl BatchPlanFragmenter { } else { None }; + println!("split_into_stage{:?},{:?},{:?}",source_info,table_scan_info,root.distribution()); let mut has_lookup_join = false; let parallelism = match root.distribution() { From 6eb747171e9b974822776142145e4da4032ba624 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 16 Aug 2024 13:45:51 +0800 Subject: [PATCH 2/7] support --- src/batch/src/executor/source.rs | 1 + src/connector/src/source/iceberg/mod.rs | 138 ++++++++++-------- src/frontend/src/handler/query.rs | 3 +- .../optimizer/plan_node/batch_iceberg_scan.rs | 12 ++ .../rule/agg_count_for_iceberg_rule.rs | 5 +- .../src/scheduler/distributed/stage.rs | 1 + src/frontend/src/scheduler/plan_fragmenter.rs | 6 +- 7 files changed, 105 insertions(+), 61 deletions(-) diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 1b7a95cc7fc7..dd95344cfaf6 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -56,6 +56,7 @@ impl BoxedExecutorBuilder for SourceExecutor { source: &ExecutorBuilder<'_, C>, inputs: Vec, ) -> Result { + println!("source_executor_builder"); ensure!(inputs.is_empty(), "Source should not have input executor!"); let source_node = try_match_expand!( source.plan_node().get_node_body().unwrap(), diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index c1f55de793a3..243de37e8a92 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -203,6 +203,7 @@ impl IcebergSplitEnumerator { schema: Schema, time_traval_info: Option, batch_parallelism: usize, + is_iceberg_count: bool, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); @@ -239,74 +240,97 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; - let mut files = vec![]; - let mut record_counts = vec![]; - - let manifest_list: ManifestList = table.metadata().snapshot_by_id(snapshot_id) + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + let splits = if is_iceberg_count { + let mut record_counts = vec![]; + let manifest_list: ManifestList = table.metadata().snapshot_by_id(snapshot_id) .unwrap() .load_manifest_list(table.file_io(), table.metadata()) .await .map_err(|e| anyhow!(e))?; - for entry in manifest_list.entries() { - let manifest = entry - .load_manifest(table.file_io()) - .await - .map_err(|e| anyhow!(e))?; - let mut manifest_entries_stream = - futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); - - while let Some(manifest_entry) = manifest_entries_stream.next().await { - let file = manifest_entry.data_file(); - record_counts.push(file.record_count()); + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + record_counts.push(file.record_count()); + } } - } - - let scan = table - .scan() - .snapshot_id(snapshot_id) - .select(schema.names()) - .build() - .map_err(|e| anyhow!(e))?; - - let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; + let split_num = batch_parallelism; + println!("record_counts: {:?},{:?}", record_counts,split_num); + // evenly split the files into splits based on the parallelism. + let split_size = record_counts.len() / split_num; + let remaining = record_counts.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + table_meta: table_meta.clone(), + files: vec![], + record_counts: record_counts[start..end].to_vec(), + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .record_counts + .push(record_counts[split_num * split_size + i].clone()); + } + splits + + }else{ + let mut files = vec![]; + let scan = table + .scan() + .snapshot_id(snapshot_id) + .select(schema.names()) + .build() + .map_err(|e| anyhow!(e))?; - #[for_await] - for task in file_scan_stream { - let task = task.map_err(|e| anyhow!(e))?; - files.push(IcebergFileScanTaskJsonStr::serialize(&task)); - } + let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; - let table_meta = TableMetadataJsonStr::serialize(table.metadata()); - if files.len() != record_counts.len() { - bail!("The number of files does not match the number of record count."); - } + #[for_await] + for task in file_scan_stream { + let task = task.map_err(|e| anyhow!(e))?; + files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } - let split_num = batch_parallelism; - // evenly split the files into splits based on the parallelism. - let split_size = files.len() / split_num; - let remaining = files.len() % split_num; - let mut splits = vec![]; - for i in 0..split_num { - let start = i * split_size; - let end = (i + 1) * split_size; - let split = IcebergSplit { - split_id: i as i64, - snapshot_id, - table_meta: table_meta.clone(), - files: files[start..end].to_vec(), - record_counts: record_counts[start..end].to_vec(), - }; - splits.push(split); - } - for i in 0..remaining { - splits[i] - .files - .push(files[split_num * split_size + i].clone()); - } + let split_num = batch_parallelism; + // evenly split the files into splits based on the parallelism. + let split_size = files.len() / split_num; + let remaining = files.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + table_meta: table_meta.clone(), + files: files[start..end].to_vec(), + record_counts: vec![], + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .files + .push(files[split_num * split_size + i].clone()); + } + splits + }; Ok(splits .into_iter() - .filter(|split| !split.files.is_empty()) + .filter(|split| !split.files.is_empty() | !split.record_counts.is_empty()) .collect_vec()) } } diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index bdb32b590300..0bd31191612f 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -64,7 +64,8 @@ pub async fn handle_query( let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - execute(session, plan_fragmenter_result, formats).await + Ok(execute(session, plan_fragmenter_result, formats).await.unwrap()) + } pub fn handle_parse( diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 118fb7351bcf..0419f3c19d25 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -55,6 +55,18 @@ impl BatchIcebergScan { self.core.catalog.clone() } + pub fn clone_with_core(&self, core: generic::Source) -> Self { + let base = PlanBase::new_batch_with_core( + &core, + self.base.distribution().clone(), + self.base.order().clone(), + ); + Self { + base, + core, + } + } + pub fn clone_with_dist(&self) -> Self { let base = self .base diff --git a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs index c18b1c6e0d23..8789123c83d6 100644 --- a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs +++ b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs @@ -20,10 +20,11 @@ impl Rule for AggCountForIcebergRule { fn apply(&self, plan: PlanRef) -> Option { let agg = plan.as_batch_simple_agg()?; if agg.core.group_key.is_empty() && agg.agg_calls().len() == 1 && agg.agg_calls()[0].eq(&PlanAggCall::count_star()){ - let mut source = agg.core.input.as_batch_iceberg_scan()?.core.clone(); + let batch_iceberg = agg.core.input.as_batch_iceberg_scan()?; + let mut source = batch_iceberg.core.clone(); source.is_iceberg_count = true; println!("plan213321{:?}",source); - return Some(BatchIcebergScan::new(source).into()) + return Some(batch_iceberg.clone_with_core(source).into()) } None } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 543d0c0a3ae6..fa3b56485215 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -380,6 +380,7 @@ impl StageRunner { )); } } else if let Some(source_info) = self.stage.source_info.as_ref() { + println!("source_info: {:?}", source_info); let chunk_size = (source_info.split_info().unwrap().len() as f32 / self.stage.parallelism.unwrap() as f32) .ceil() as usize; diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 8aa48000ff86..620efa8b4110 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -274,6 +274,7 @@ pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, + pub is_iceberg_count: bool, } #[derive(Clone, Debug)] @@ -360,7 +361,7 @@ impl SourceScanInfo { }; let split_info = iceberg_enumerator - .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism) + .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism,fetch_info.is_iceberg_count) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -1055,6 +1056,7 @@ impl BatchPlanFragmenter { connector: property, timebound: timestamp_bound, as_of: None, + is_iceberg_count: false, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1069,6 +1071,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, + is_iceberg_count: batch_iceberg_scan.core.is_iceberg_count, }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1084,6 +1087,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, + is_iceberg_count: false, }))); } } From a2ec2591271679054dc6c5ced8d739b77b87c43f Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 16 Aug 2024 15:55:14 +0800 Subject: [PATCH 3/7] ad all --- proto/batch_plan.proto | 7 +- src/batch/src/executor/iceberg_scan.rs | 20 +- src/batch/src/executor/mod.rs | 1 + src/batch/src/executor/source.rs | 30 ++- src/connector/src/source/iceberg/mod.rs | 219 ++++++++++-------- src/frontend/src/handler/query.rs | 5 +- src/frontend/src/optimizer/mod.rs | 2 + .../optimizer/plan_node/batch_iceberg_scan.rs | 10 +- .../optimizer/plan_node/batch_kafka_scan.rs | 3 +- .../src/optimizer/plan_node/batch_source.rs | 3 +- .../src/optimizer/plan_node/generic/source.rs | 10 +- .../src/optimizer/plan_node/logical_source.rs | 1 - src/frontend/src/optimizer/plan_node/mod.rs | 4 + .../rule/agg_count_for_iceberg_rule.rs | 17 +- src/frontend/src/optimizer/rule/mod.rs | 4 +- .../src/scheduler/distributed/stage.rs | 1 + src/frontend/src/scheduler/local.rs | 1 + src/frontend/src/scheduler/plan_fragmenter.rs | 63 +++-- 18 files changed, 230 insertions(+), 171 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index d0e820862393..743a21f6ee2f 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -58,13 +58,18 @@ message ScanRange { } message SourceNode { + enum IcebergSourceType{ + ICEBERG_TYPE_UNSPECIFIED = 0; + SCAN = 1; + COUNT_STAR = 2; + } uint32 source_id = 1; repeated plan_common.ColumnCatalog columns = 2; map with_properties = 3; repeated bytes split = 4; catalog.StreamSourceInfo info = 5; map secret_refs = 6; - bool is_iceberg_count = 7; + IcebergSourceType iceberg_source_type = 7; } message FileScanNode { diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index f39f4c3564f4..ee4e463422c1 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -13,14 +13,12 @@ // limitations under the License. use std::mem; -use std::sync::Arc; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::array::I64Array; use risingwave_common::catalog::Schema; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -36,8 +34,6 @@ pub struct IcebergScanExecutor { batch_size: usize, schema: Schema, identity: String, - is_iceberg_count: bool, - record_counts: Vec, } impl Executor for IcebergScanExecutor { @@ -63,8 +59,6 @@ impl IcebergScanExecutor { batch_size: usize, schema: Schema, identity: String, - is_iceberg_count: bool, - record_counts: Vec, ) -> Self { Self { iceberg_config, @@ -74,23 +68,12 @@ impl IcebergScanExecutor { batch_size, schema, identity, - is_iceberg_count, - record_counts, } } #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(mut self: Box) { - println!("is_iceberg_count: {}", self.is_iceberg_count); - if self.is_iceberg_count{ - let record_count = mem::take(&mut self.record_counts).into_iter().sum::() as i64; - println!("record_count: {}", record_count); - let chunk = DataChunk::new(vec![ - Arc::new(I64Array::from_iter([record_count]).into()), - ], 1); - yield chunk; - }else{ - let table = self + let table = self .iceberg_config .load_table_v2_with_metadata(self.table_meta) .await?; @@ -123,6 +106,5 @@ impl IcebergScanExecutor { debug_assert_eq!(chunk.data_types(), data_types); yield chunk; } - } } } diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3a64901c64a0..e6223c664cbf 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_count_star_scan; mod iceberg_scan; mod insert; mod join; diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index dd95344cfaf6..0e8d5c075a9c 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -29,7 +29,9 @@ use risingwave_connector::source::{ }; use risingwave_connector::WithOptionsSecResolved; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use super::iceberg_count_star_scan::IcebergCountStarExecutor; use super::Executor; use crate::error::{BatchError, Result}; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor}; @@ -109,17 +111,23 @@ impl BoxedExecutorBuilder for SourceExecutor { assert_eq!(split_list.len(), 1); if let SplitImpl::Iceberg(split) = &split_list[0] { let split: IcebergSplit = split.clone(); - Ok(Box::new(IcebergScanExecutor::new( - iceberg_properties.to_iceberg_config(), - Some(split.snapshot_id), - split.table_meta.deserialize(), - split.files.into_iter().map(|x| x.deserialize()).collect(), - source.context.get_config().developer.chunk_size, - schema, - source.plan_node().get_identity().clone(), - source_node.is_iceberg_count, - split.record_counts, - ))) + match IcebergSourceType::try_from(source_node.iceberg_source_type).unwrap() { + IcebergSourceType::Scan => Ok(Box::new(IcebergScanExecutor::new( + iceberg_properties.to_iceberg_config(), + Some(split.snapshot_id), + split.table_meta.deserialize(), + split.files.into_iter().map(|x| x.deserialize()).collect(), + source.context.get_config().developer.chunk_size, + schema, + source.plan_node().get_identity().clone(), + ))), + IcebergSourceType::IcebergTypeUnspecified => unreachable!(), + IcebergSourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new( + schema, + source.plan_node().get_identity().clone(), + split.record_counts, + ))), + } } else { unreachable!() } diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 243de37e8a92..8f89a6580b28 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use futures_async_stream::for_await; use iceberg::scan::FileScanTask; use iceberg::spec::{ManifestList, TableMetadata}; +use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_reader::*; use risingwave_common::bail; @@ -198,23 +199,135 @@ pub enum IcebergTimeTravelInfo { } impl IcebergSplitEnumerator { - pub async fn list_splits_batch( + pub async fn list_splits_batch_count_star( + &self, + time_traval_info: Option, + batch_parallelism: usize, + ) -> ConnectorResult> { + if batch_parallelism == 0 { + bail!("Batch parallelism is 0. Cannot split the iceberg files."); + } + let table = self.config.load_table_v2().await?; + let snapshot_id = self.get_snapshot_id(&table, time_traval_info)?; + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + let mut record_counts = vec![]; + let manifest_list: ManifestList = table + .metadata() + .snapshot_by_id(snapshot_id) + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|e| anyhow!(e))?; + + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + record_counts.push(file.record_count()); + } + } + let split_num = batch_parallelism; + // evenly split the files into splits based on the parallelism. + let split_size = record_counts.len() / split_num; + let remaining = record_counts.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + table_meta: table_meta.clone(), + files: vec![], + record_counts: record_counts[start..end].to_vec(), + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .record_counts + .push(record_counts[split_num * split_size + i]); + } + + Ok(splits + .into_iter() + .filter(|split| !split.record_counts.is_empty()) + .collect_vec()) + } + + pub async fn list_splits_batch_scan( &self, schema: Schema, time_traval_info: Option, batch_parallelism: usize, - is_iceberg_count: bool, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table_v2().await?; - let snapshot_id = match time_traval_info { + let snapshot_id = self.get_snapshot_id(&table, time_traval_info)?; + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + let mut files = vec![]; + let scan = table + .scan() + .snapshot_id(snapshot_id) + .select(schema.names()) + .build() + .map_err(|e| anyhow!(e))?; + + let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; + + #[for_await] + for task in file_scan_stream { + let task = task.map_err(|e| anyhow!(e))?; + files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + + let split_num = batch_parallelism; + // evenly split the files into splits based on the parallelism. + let split_size = files.len() / split_num; + let remaining = files.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + table_meta: table_meta.clone(), + files: files[start..end].to_vec(), + record_counts: vec![], + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .files + .push(files[split_num * split_size + i].clone()); + } + Ok(splits + .into_iter() + .filter(|split| !split.files.is_empty()) + .collect_vec()) + } + + fn get_snapshot_id( + &self, + table: &Table, + time_traval_info: Option, + ) -> ConnectorResult { + match time_traval_info { Some(IcebergTimeTravelInfo::Version(version)) => { let Some(snapshot) = table.metadata().snapshot_by_id(version) else { bail!("Cannot find the snapshot id in the iceberg table."); }; - snapshot.snapshot_id() + Ok(snapshot.snapshot_id()) } Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { let snapshot = table @@ -223,7 +336,7 @@ impl IcebergSplitEnumerator { .filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp) .max_by_key(|snapshot| snapshot.timestamp().timestamp_millis()); match snapshot { - Some(snapshot) => snapshot.snapshot_id(), + Some(snapshot) => Ok(snapshot.snapshot_id()), None => { // convert unix time to human readable time let time = chrono::DateTime::from_timestamp_millis(timestamp); @@ -236,102 +349,10 @@ impl IcebergSplitEnumerator { } } None => match table.metadata().current_snapshot() { - Some(snapshot) => snapshot.snapshot_id(), + Some(snapshot) => Ok(snapshot.snapshot_id()), None => bail!("Cannot find the current snapshot id in the iceberg table."), }, - }; - let table_meta = TableMetadataJsonStr::serialize(table.metadata()); - let splits = if is_iceberg_count { - let mut record_counts = vec![]; - let manifest_list: ManifestList = table.metadata().snapshot_by_id(snapshot_id) - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .map_err(|e| anyhow!(e))?; - - for entry in manifest_list.entries() { - let manifest = entry - .load_manifest(table.file_io()) - .await - .map_err(|e| anyhow!(e))?; - let mut manifest_entries_stream = - futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); - - while let Some(manifest_entry) = manifest_entries_stream.next().await { - let file = manifest_entry.data_file(); - record_counts.push(file.record_count()); - } - } - let split_num = batch_parallelism; - println!("record_counts: {:?},{:?}", record_counts,split_num); - // evenly split the files into splits based on the parallelism. - let split_size = record_counts.len() / split_num; - let remaining = record_counts.len() % split_num; - let mut splits = vec![]; - for i in 0..split_num { - let start = i * split_size; - let end = (i + 1) * split_size; - let split = IcebergSplit { - split_id: i as i64, - snapshot_id, - table_meta: table_meta.clone(), - files: vec![], - record_counts: record_counts[start..end].to_vec(), - }; - splits.push(split); - } - for i in 0..remaining { - splits[i] - .record_counts - .push(record_counts[split_num * split_size + i].clone()); - } - splits - - }else{ - let mut files = vec![]; - let scan = table - .scan() - .snapshot_id(snapshot_id) - .select(schema.names()) - .build() - .map_err(|e| anyhow!(e))?; - - let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; - - #[for_await] - for task in file_scan_stream { - let task = task.map_err(|e| anyhow!(e))?; - files.push(IcebergFileScanTaskJsonStr::serialize(&task)); - } - - let split_num = batch_parallelism; - // evenly split the files into splits based on the parallelism. - let split_size = files.len() / split_num; - let remaining = files.len() % split_num; - let mut splits = vec![]; - for i in 0..split_num { - let start = i * split_size; - let end = (i + 1) * split_size; - let split = IcebergSplit { - split_id: i as i64, - snapshot_id, - table_meta: table_meta.clone(), - files: files[start..end].to_vec(), - record_counts: vec![], - }; - splits.push(split); - } - for i in 0..remaining { - splits[i] - .files - .push(files[split_num * split_size + i].clone()); - } - splits - }; - Ok(splits - .into_iter() - .filter(|split| !split.files.is_empty() | !split.record_counts.is_empty()) - .collect_vec()) + } } } diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 0bd31191612f..d244745e2029 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -64,8 +64,9 @@ pub async fn handle_query( let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - Ok(execute(session, plan_fragmenter_result, formats).await.unwrap()) - + Ok(execute(session, plan_fragmenter_result, formats) + .await + .unwrap()) } pub fn handle_parse( diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a07da2e60047..715667354cb7 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -1060,6 +1060,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan || plan.node_type() == PlanNodeType::BatchIcebergScan + || plan.node_type() == PlanNodeType::BatchIcebergCountStarScan } fn is_insert(plan: &PlanRef) -> bool { @@ -1094,6 +1095,7 @@ fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool { plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan || plan.node_type() == PlanNodeType::BatchIcebergScan + || plan.node_type() == PlanNodeType::BatchIcebergCountStarScan } fn is_insert(plan: &PlanRef) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 0419f3c19d25..9abc508ce34e 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,6 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -59,12 +60,9 @@ impl BatchIcebergScan { let base = PlanBase::new_batch_with_core( &core, self.base.distribution().clone(), - self.base.order().clone(), + self.base.order().clone(), ); - Self { - base, - core, - } + Self { base, core } } pub fn clone_with_dist(&self) -> Self { @@ -123,7 +121,7 @@ impl ToBatchPb for BatchIcebergScan { with_properties, split: vec![], secret_refs, - is_iceberg_count: self.core.is_iceberg_count, + iceberg_source_type: IcebergSourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 6fd329e5c9dd..759547c5056f 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -18,6 +18,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; use risingwave_pb::batch_plan::SourceNode; use super::batch::prelude::*; @@ -133,7 +134,7 @@ impl ToBatchPb for BatchKafkaScan { with_properties, split: vec![], secret_refs, - is_iceberg_count: self.core.is_iceberg_count, + iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 40d43f9392b7..763773bb5d73 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -16,6 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -115,7 +116,7 @@ impl ToBatchPb for BatchSource { with_properties, split: vec![], secret_refs, - is_iceberg_count: self.core.is_iceberg_count, + iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index c8f25259e973..af138f7dd80a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -64,21 +64,15 @@ pub struct Source { pub ctx: OptimizerContextRef, pub as_of: Option, - - pub is_iceberg_count: bool, } impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = if self.is_iceberg_count{ - vec![Field::with_name(DataType::Int64, "count".to_string())] - }else{ - self + let fields = self .column_catalog .iter() .map(|c| (&c.column_desc).into()) - .collect() - }; + .collect(); Schema { fields } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 1f51bc01186f..50024b4274e7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -83,7 +83,6 @@ impl LogicalSource { kind, ctx, as_of, - is_iceberg_count: false, }; if core.as_of.is_some() && !core.support_time_travel() { diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 2cf7e67dd2b6..1b7155674254 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -918,6 +918,7 @@ mod stream_values; mod stream_watermark_filter; mod batch_file_scan; +mod batch_iceberg_count_star_scan; mod batch_iceberg_scan; mod batch_kafka_scan; mod derive; @@ -938,6 +939,7 @@ pub use batch_group_topn::BatchGroupTopN; pub use batch_hash_agg::BatchHashAgg; pub use batch_hash_join::BatchHashJoin; pub use batch_hop_window::BatchHopWindow; +pub use batch_iceberg_count_star_scan::BatchIcebergCountStarScan; pub use batch_iceberg_scan::BatchIcebergScan; pub use batch_insert::BatchInsert; pub use batch_kafka_scan::BatchKafkaScan; @@ -1121,6 +1123,7 @@ macro_rules! for_all_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergCountStarScan } , { Batch, FileScan } , { Stream, Project } , { Stream, Filter } @@ -1241,6 +1244,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergCountStarScan } , { Batch, FileScan } } }; diff --git a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs index 8789123c83d6..740c69d03b65 100644 --- a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs +++ b/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs @@ -13,18 +13,21 @@ // limitations under the License. use super::{BoxedRule, Rule}; -use crate::optimizer::{plan_node::{BatchIcebergScan, LogicalIcebergScan, PlanAggCall}, PlanRef}; +use crate::optimizer::plan_node::{BatchIcebergCountStarScan, PlanAggCall}; +use crate::optimizer::PlanRef; pub struct AggCountForIcebergRule {} impl Rule for AggCountForIcebergRule { fn apply(&self, plan: PlanRef) -> Option { let agg = plan.as_batch_simple_agg()?; - if agg.core.group_key.is_empty() && agg.agg_calls().len() == 1 && agg.agg_calls()[0].eq(&PlanAggCall::count_star()){ + if agg.core.group_key.is_empty() + && agg.agg_calls().len() == 1 + && agg.agg_calls()[0].eq(&PlanAggCall::count_star()) + { let batch_iceberg = agg.core.input.as_batch_iceberg_scan()?; - let mut source = batch_iceberg.core.clone(); - source.is_iceberg_count = true; - println!("plan213321{:?}",source); - return Some(batch_iceberg.clone_with_core(source).into()) + return Some( + BatchIcebergCountStarScan::new_with_batch_iceberg_scan(batch_iceberg).into(), + ); } None } @@ -34,4 +37,4 @@ impl AggCountForIcebergRule { pub fn create() -> BoxedRule { Box::new(AggCountForIcebergRule {}) } -} \ No newline at end of file +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 6dd8298f95d8..4b01c52da94e 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -157,17 +157,17 @@ mod apply_hop_window_transpose_rule; pub use apply_hop_window_transpose_rule::*; mod agg_call_merge_rule; pub use agg_call_merge_rule::*; +mod agg_count_for_iceberg_rule; mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; -mod agg_count_for_iceberg_rule; mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; mod values_extract_project_rule; +pub use agg_count_for_iceberg_rule::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; -pub use agg_count_for_iceberg_rule::*; pub use source_to_kafka_scan_rule::*; pub use table_function_to_file_scan_rule::*; pub use values_extract_project_rule::*; diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index fa3b56485215..11c1626f441a 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -1028,6 +1028,7 @@ impl StageRunner { } PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan + | PlanNodeType::BatchIcebergCountStarScan | PlanNodeType::BatchIcebergScan => { let node_body = execution_plan_node.node.clone(); let NodeBody::Source(mut source_node) = node_body else { diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index a7ff6eabdf7f..f8dd17d2ebef 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -556,6 +556,7 @@ impl LocalQueryExecution { } PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan + | PlanNodeType::BatchIcebergCountStarScan | PlanNodeType::BatchIcebergScan => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 620efa8b4110..05fe90487a79 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -41,6 +41,7 @@ use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; use risingwave_pb::plan_common::Field as PbField; @@ -55,7 +56,8 @@ use crate::catalog::TableId; use crate::error::RwError; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; use crate::optimizer::plan_node::{ - BatchIcebergScan, BatchKafkaScan, BatchSource, PlanNodeId, PlanNodeType, + BatchIcebergCountStarScan, BatchIcebergScan, BatchKafkaScan, BatchSource, PlanNodeId, + PlanNodeType, }; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; @@ -191,7 +193,7 @@ impl BatchPlanFragmenter { /// Split the plan node into each stages, based on exchange node. fn split_into_stage(&mut self, batch_node: PlanRef) -> SchedulerResult<()> { - println!("split_into_stage{:?}",batch_node); + println!("split_into_stage{:?}", batch_node); let root_stage = self.new_stage( batch_node, Some(Distribution::Single.to_prost( @@ -274,7 +276,7 @@ pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, - pub is_iceberg_count: bool, + pub iceberg_source_type: IcebergSourceType, } #[derive(Clone, Debug)] @@ -360,12 +362,25 @@ impl SourceScanInfo { None => None, }; - let split_info = iceberg_enumerator - .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism,fetch_info.is_iceberg_count) - .await? - .into_iter() - .map(SplitImpl::Iceberg) - .collect_vec(); + let split_info = match fetch_info.iceberg_source_type { + IcebergSourceType::IcebergTypeUnspecified => unreachable!(), + IcebergSourceType::Scan => iceberg_enumerator + .list_splits_batch_scan( + fetch_info.schema, + time_travel_info, + batch_parallelism, + ) + .await? + .into_iter() + .map(SplitImpl::Iceberg) + .collect_vec(), + IcebergSourceType::CountStar => iceberg_enumerator + .list_splits_batch_count_star(time_travel_info, batch_parallelism) + .await? + .into_iter() + .map(SplitImpl::Iceberg) + .collect_vec(), + }; Ok(SourceScanInfo::Complete(split_info)) } @@ -886,7 +901,12 @@ impl BatchPlanFragmenter { } else { None }; - println!("split_into_stage{:?},{:?},{:?}",source_info,table_scan_info,root.distribution()); + println!( + "split_into_stage{:?},{:?},{:?}", + source_info, + table_scan_info, + root.distribution() + ); let mut has_lookup_join = false; let parallelism = match root.distribution() { @@ -1056,7 +1076,7 @@ impl BatchPlanFragmenter { connector: property, timebound: timestamp_bound, as_of: None, - is_iceberg_count: false, + iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1071,7 +1091,24 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - is_iceberg_count: batch_iceberg_scan.core.is_iceberg_count, + iceberg_source_type: IcebergSourceType::Scan, + }))); + } + } else if let Some(batch_iceberg_count_star_scan) = node.as_batch_iceberg_count_star_scan() + { + let batch_iceberg_count_star_scan: &BatchIcebergCountStarScan = + batch_iceberg_count_star_scan; + let source_catalog = batch_iceberg_count_star_scan.source_catalog(); + if let Some(source_catalog) = source_catalog { + let property = + ConnectorProperties::extract(source_catalog.with_properties.clone(), false)?; + let as_of = batch_iceberg_count_star_scan.as_of(); + return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + schema: batch_iceberg_count_star_scan.base.schema().clone(), + connector: property, + timebound: (None, None), + as_of, + iceberg_source_type: IcebergSourceType::CountStar, }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1087,7 +1124,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - is_iceberg_count: false, + iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, }))); } } From fadc084a72e9b9338fe57eeeaa8b1865c4ba3499 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 16 Aug 2024 16:01:59 +0800 Subject: [PATCH 4/7] add --- .../src/executor/iceberg_count_star_scan.rs | 64 ++++++++ src/batch/src/executor/source.rs | 1 - .../batch_iceberg_count_star_scan.rs | 138 ++++++++++++++++++ .../src/scheduler/distributed/stage.rs | 1 - src/frontend/src/scheduler/plan_fragmenter.rs | 7 - 5 files changed, 202 insertions(+), 9 deletions(-) create mode 100644 src/batch/src/executor/iceberg_count_star_scan.rs create mode 100644 src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs diff --git a/src/batch/src/executor/iceberg_count_star_scan.rs b/src/batch/src/executor/iceberg_count_star_scan.rs new file mode 100644 index 000000000000..b35231b88c42 --- /dev/null +++ b/src/batch/src/executor/iceberg_count_star_scan.rs @@ -0,0 +1,64 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::I64Array; +use risingwave_common::catalog::Schema; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +pub struct IcebergCountStarExecutor { + schema: Schema, + identity: String, + record_counts: Vec, +} + +impl Executor for IcebergCountStarExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergCountStarExecutor { + pub fn new(schema: Schema, identity: String, record_counts: Vec) -> Self { + Self { + schema, + identity, + record_counts, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(mut self: Box) { + let record_count = mem::take(&mut self.record_counts).into_iter().sum::() as i64; + let chunk = DataChunk::new( + vec![Arc::new(I64Array::from_iter([record_count]).into())], + 1, + ); + yield chunk; + } +} diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 0e8d5c075a9c..f884ae80e829 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -58,7 +58,6 @@ impl BoxedExecutorBuilder for SourceExecutor { source: &ExecutorBuilder<'_, C>, inputs: Vec, ) -> Result { - println!("source_executor_builder"); ensure!(inputs.is_empty(), "Source should not have input executor!"); let source_node = try_match_expand!( source.plan_node().get_node_body().unwrap(), diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs new file mode 100644 index 000000000000..91672f5b0c3c --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs @@ -0,0 +1,138 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Rc; + +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; +use risingwave_common::types::DataType; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::SourceNode; +use risingwave_sqlparser::ast::AsOf; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, BatchIcebergScan, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, + ToLocalBatch, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::Distribution; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchIcebergCountStarScan { + pub base: PlanBase, + pub core: generic::Source, +} + +impl BatchIcebergCountStarScan { + pub fn new_with_batch_iceberg_scan(batch_iceberg_scan: &BatchIcebergScan) -> Self { + let mut core = batch_iceberg_scan.core.clone(); + core.column_catalog = vec![ColumnCatalog::visible(ColumnDesc::named( + "count", + ColumnId::first_user_column(), + DataType::Int64, + ))]; + let base = PlanBase::new_batch_with_core( + &core, + batch_iceberg_scan.base.distribution().clone(), + batch_iceberg_scan.base.order().clone(), + ); + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } + + pub fn source_catalog(&self) -> Option> { + self.core.catalog.clone() + } + + pub fn clone_with_core(&self, core: generic::Source) -> Self { + let base = PlanBase::new_batch_with_core( + &core, + self.base.distribution().clone(), + self.base.order().clone(), + ); + Self { base, core } + } + + pub fn clone_with_dist(&self) -> Self { + let base = self + .base + .clone_with_new_distribution(Distribution::SomeShard); + Self { + base, + core: self.core.clone(), + } + } + + pub fn as_of(&self) -> Option { + self.core.as_of.clone() + } +} + +impl_plan_tree_node_for_leaf! { BatchIcebergCountStarScan } + +impl Distill for BatchIcebergCountStarScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let src = Pretty::from(self.source_catalog().unwrap().name.clone()); + let fields = vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ]; + childless_record("BatchIcebergCountStarScan", fields) + } +} + +impl ToLocalBatch for BatchIcebergCountStarScan { + fn to_local(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToDistributedBatch for BatchIcebergCountStarScan { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToBatchPb for BatchIcebergCountStarScan { + fn to_batch_prost_body(&self) -> NodeBody { + let source_catalog = self.source_catalog().unwrap(); + let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts(); + NodeBody::Source(SourceNode { + source_id: source_catalog.id, + info: Some(source_catalog.info.clone()), + columns: self + .core + .column_catalog + .iter() + .map(|c| c.to_protobuf()) + .collect(), + with_properties, + split: vec![], + secret_refs, + iceberg_source_type: IcebergSourceType::CountStar.into(), + }) + } +} + +impl ExprRewritable for BatchIcebergCountStarScan {} + +impl ExprVisitable for BatchIcebergCountStarScan {} diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 11c1626f441a..c5eec1543e53 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -380,7 +380,6 @@ impl StageRunner { )); } } else if let Some(source_info) = self.stage.source_info.as_ref() { - println!("source_info: {:?}", source_info); let chunk_size = (source_info.split_info().unwrap().len() as f32 / self.stage.parallelism.unwrap() as f32) .ceil() as usize; diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 05fe90487a79..6aa8f7d157b5 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -193,7 +193,6 @@ impl BatchPlanFragmenter { /// Split the plan node into each stages, based on exchange node. fn split_into_stage(&mut self, batch_node: PlanRef) -> SchedulerResult<()> { - println!("split_into_stage{:?}", batch_node); let root_stage = self.new_stage( batch_node, Some(Distribution::Single.to_prost( @@ -901,12 +900,6 @@ impl BatchPlanFragmenter { } else { None }; - println!( - "split_into_stage{:?},{:?},{:?}", - source_info, - table_scan_info, - root.distribution() - ); let mut has_lookup_join = false; let parallelism = match root.distribution() { From 1c526be4e490a96250e2a48ca986f09fc8e24114 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 16 Aug 2024 16:07:52 +0800 Subject: [PATCH 5/7] mv rule in batch --- src/frontend/src/optimizer/mod.rs | 4 ++-- .../batch_agg_count_for_iceberg_rule.rs} | 11 +++++------ src/frontend/src/optimizer/rule/batch/mod.rs | 1 + src/frontend/src/optimizer/rule/mod.rs | 5 ++--- 4 files changed, 10 insertions(+), 11 deletions(-) rename src/frontend/src/optimizer/rule/{agg_count_for_iceberg_rule.rs => batch/batch_agg_count_for_iceberg_rule.rs} (84%) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 715667354cb7..8dd485ebe158 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -403,7 +403,7 @@ impl PlanRoot { let plan = plan.optimize_by_rules(&OptimizationStage::new( "Iceberg count star", - vec![AggCountForIcebergRule::create()], + vec![BatchAggCountForIcebergRule::create()], ApplyOrder::TopDown, )); @@ -452,7 +452,7 @@ impl PlanRoot { let plan = plan.optimize_by_rules(&OptimizationStage::new( "Iceberg count star", - vec![AggCountForIcebergRule::create()], + vec![BatchAggCountForIcebergRule::create()], ApplyOrder::TopDown, )); diff --git a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs b/src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs similarity index 84% rename from src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs rename to src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs index 740c69d03b65..d712da7af79c 100644 --- a/src/frontend/src/optimizer/rule/agg_count_for_iceberg_rule.rs +++ b/src/frontend/src/optimizer/rule/batch/batch_agg_count_for_iceberg_rule.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::{BoxedRule, Rule}; use crate::optimizer::plan_node::{BatchIcebergCountStarScan, PlanAggCall}; -use crate::optimizer::PlanRef; +use crate::optimizer::{BoxedRule, PlanRef, Rule}; -pub struct AggCountForIcebergRule {} -impl Rule for AggCountForIcebergRule { +pub struct BatchAggCountForIcebergRule {} +impl Rule for BatchAggCountForIcebergRule { fn apply(&self, plan: PlanRef) -> Option { let agg = plan.as_batch_simple_agg()?; if agg.core.group_key.is_empty() @@ -33,8 +32,8 @@ impl Rule for AggCountForIcebergRule { } } -impl AggCountForIcebergRule { +impl BatchAggCountForIcebergRule { pub fn create() -> BoxedRule { - Box::new(AggCountForIcebergRule {}) + Box::new(BatchAggCountForIcebergRule {}) } } diff --git a/src/frontend/src/optimizer/rule/batch/mod.rs b/src/frontend/src/optimizer/rule/batch/mod.rs index 6061c985b669..328821e692c2 100644 --- a/src/frontend/src/optimizer/rule/batch/mod.rs +++ b/src/frontend/src/optimizer/rule/batch/mod.rs @@ -14,3 +14,4 @@ pub(crate) mod batch_project_merge_rule; pub mod batch_push_limit_to_scan_rule; +pub mod batch_agg_count_for_iceberg_rule; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 4b01c52da94e..d4babb799c09 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -157,14 +157,13 @@ mod apply_hop_window_transpose_rule; pub use apply_hop_window_transpose_rule::*; mod agg_call_merge_rule; pub use agg_call_merge_rule::*; -mod agg_count_for_iceberg_rule; mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; mod values_extract_project_rule; -pub use agg_count_for_iceberg_rule::*; +pub use batch::batch_agg_count_for_iceberg_rule::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; @@ -247,7 +246,7 @@ macro_rules! for_all_rules { , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } - , { AggCountForIcebergRule } + , { BatchAggCountForIcebergRule } } }; } From 497352e336d2bcf216aa293d1894a8f88a52f5c6 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 16 Aug 2024 16:41:28 +0800 Subject: [PATCH 6/7] fmt fix --- proto/batch_plan.proto | 2 +- risedev.yml | 4 ++-- src/frontend/src/optimizer/rule/batch/mod.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 743a21f6ee2f..8464cbe94672 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -58,7 +58,7 @@ message ScanRange { } message SourceNode { - enum IcebergSourceType{ + enum IcebergSourceType { ICEBERG_TYPE_UNSPECIFIED = 0; SCAN = 1; COUNT_STAR = 2; diff --git a/risedev.yml b/risedev.yml index 9c084b9b5227..b4b558face0e 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,7 +20,7 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - - use: minio + # - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 @@ -40,7 +40,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - - use: compactor + # - use: compactor # If you want to create source from Kafka, uncomment the following lines # - use: kafka diff --git a/src/frontend/src/optimizer/rule/batch/mod.rs b/src/frontend/src/optimizer/rule/batch/mod.rs index 328821e692c2..72537a6d06a1 100644 --- a/src/frontend/src/optimizer/rule/batch/mod.rs +++ b/src/frontend/src/optimizer/rule/batch/mod.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch_agg_count_for_iceberg_rule; pub(crate) mod batch_project_merge_rule; pub mod batch_push_limit_to_scan_rule; -pub mod batch_agg_count_for_iceberg_rule; From ae95e1e43a02ce1c9f119f1be5293a0c7c901e91 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 19 Aug 2024 13:47:39 +0800 Subject: [PATCH 7/7] fix fmt fix ci --- proto/batch_plan.proto | 6 +++--- src/batch/src/executor/source.rs | 10 +++++----- src/frontend/src/handler/query.rs | 4 +--- .../batch_iceberg_count_star_scan.rs | 13 ++---------- .../optimizer/plan_node/batch_iceberg_scan.rs | 13 ++---------- .../optimizer/plan_node/batch_kafka_scan.rs | 4 ++-- .../src/optimizer/plan_node/batch_source.rs | 4 ++-- .../plan_node/logical_iceberg_scan.rs | 5 ----- src/frontend/src/scheduler/plan_fragmenter.rs | 20 +++++++++---------- 9 files changed, 27 insertions(+), 52 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8464cbe94672..fde633c51626 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -58,8 +58,8 @@ message ScanRange { } message SourceNode { - enum IcebergSourceType { - ICEBERG_TYPE_UNSPECIFIED = 0; + enum SourceType { + UNSPECIFIED = 0; SCAN = 1; COUNT_STAR = 2; } @@ -69,7 +69,7 @@ message SourceNode { repeated bytes split = 4; catalog.StreamSourceInfo info = 5; map secret_refs = 6; - IcebergSourceType iceberg_source_type = 7; + SourceType source_type = 7; } message FileScanNode { diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index f884ae80e829..5debb209c53b 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -29,7 +29,7 @@ use risingwave_connector::source::{ }; use risingwave_connector::WithOptionsSecResolved; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use super::iceberg_count_star_scan::IcebergCountStarExecutor; use super::Executor; @@ -110,8 +110,8 @@ impl BoxedExecutorBuilder for SourceExecutor { assert_eq!(split_list.len(), 1); if let SplitImpl::Iceberg(split) = &split_list[0] { let split: IcebergSplit = split.clone(); - match IcebergSourceType::try_from(source_node.iceberg_source_type).unwrap() { - IcebergSourceType::Scan => Ok(Box::new(IcebergScanExecutor::new( + match SourceType::try_from(source_node.source_type).unwrap() { + SourceType::Scan => Ok(Box::new(IcebergScanExecutor::new( iceberg_properties.to_iceberg_config(), Some(split.snapshot_id), split.table_meta.deserialize(), @@ -120,12 +120,12 @@ impl BoxedExecutorBuilder for SourceExecutor { schema, source.plan_node().get_identity().clone(), ))), - IcebergSourceType::IcebergTypeUnspecified => unreachable!(), - IcebergSourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new( + SourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new( schema, source.plan_node().get_identity().clone(), split.record_counts, ))), + SourceType::Unspecified => unreachable!(), } } else { unreachable!() diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d244745e2029..bdb32b590300 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -64,9 +64,7 @@ pub async fn handle_query( let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - Ok(execute(session, plan_fragmenter_result, formats) - .await - .unwrap()) + execute(session, plan_fragmenter_result, formats).await } pub fn handle_parse( diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs index 91672f5b0c3c..65e78e09db11 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs @@ -18,7 +18,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use risingwave_common::types::DataType; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -63,15 +63,6 @@ impl BatchIcebergCountStarScan { self.core.catalog.clone() } - pub fn clone_with_core(&self, core: generic::Source) -> Self { - let base = PlanBase::new_batch_with_core( - &core, - self.base.distribution().clone(), - self.base.order().clone(), - ); - Self { base, core } - } - pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -128,7 +119,7 @@ impl ToBatchPb for BatchIcebergCountStarScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::CountStar.into(), + source_type: SourceType::CountStar.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 9abc508ce34e..97db5a449ed7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -56,15 +56,6 @@ impl BatchIcebergScan { self.core.catalog.clone() } - pub fn clone_with_core(&self, core: generic::Source) -> Self { - let base = PlanBase::new_batch_with_core( - &core, - self.base.distribution().clone(), - self.base.order().clone(), - ); - Self { base, core } - } - pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -121,7 +112,7 @@ impl ToBatchPb for BatchIcebergScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::Scan.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 759547c5056f..76db232b12d3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -18,7 +18,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use super::batch::prelude::*; @@ -134,7 +134,7 @@ impl ToBatchPb for BatchKafkaScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 763773bb5d73..00877c8b4b75 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -116,7 +116,7 @@ impl ToBatchPb for BatchSource { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs index 8bc2ca33f0a6..b355d6f2057c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs @@ -51,11 +51,6 @@ impl LogicalIcebergScan { LogicalIcebergScan { base, core } } - pub fn new_with_core(core: generic::Source) -> Self { - let base = PlanBase::new_logical_with_core(&core); - LogicalIcebergScan { base, core } - } - pub fn source_catalog(&self) -> Option> { self.core.catalog.clone() } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6aa8f7d157b5..b870d2c362b5 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -41,7 +41,7 @@ use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; use risingwave_pb::plan_common::Field as PbField; @@ -275,7 +275,7 @@ pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, - pub iceberg_source_type: IcebergSourceType, + pub source_type: SourceType, } #[derive(Clone, Debug)] @@ -361,9 +361,8 @@ impl SourceScanInfo { None => None, }; - let split_info = match fetch_info.iceberg_source_type { - IcebergSourceType::IcebergTypeUnspecified => unreachable!(), - IcebergSourceType::Scan => iceberg_enumerator + let split_info = match fetch_info.source_type { + SourceType::Scan => iceberg_enumerator .list_splits_batch_scan( fetch_info.schema, time_travel_info, @@ -373,12 +372,13 @@ impl SourceScanInfo { .into_iter() .map(SplitImpl::Iceberg) .collect_vec(), - IcebergSourceType::CountStar => iceberg_enumerator + SourceType::CountStar => iceberg_enumerator .list_splits_batch_count_star(time_travel_info, batch_parallelism) .await? .into_iter() .map(SplitImpl::Iceberg) .collect_vec(), + SourceType::Unspecified => unreachable!(), }; Ok(SourceScanInfo::Complete(split_info)) @@ -1069,7 +1069,7 @@ impl BatchPlanFragmenter { connector: property, timebound: timestamp_bound, as_of: None, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, + source_type: SourceType::Scan, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1084,7 +1084,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::Scan, + source_type: SourceType::Scan, }))); } } else if let Some(batch_iceberg_count_star_scan) = node.as_batch_iceberg_count_star_scan() @@ -1101,7 +1101,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::CountStar, + source_type: SourceType::CountStar, }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1117,7 +1117,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, + source_type: SourceType::Scan, }))); } }