From 2552e1ff177f923f5bbea78f9354e462dbf3264d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 19 Aug 2024 13:47:39 +0800 Subject: [PATCH] fix fmt --- proto/batch_plan.proto | 9 ++++----- src/batch/src/executor/source.rs | 9 ++++----- src/frontend/src/handler/query.rs | 4 +--- .../batch_iceberg_count_star_scan.rs | 13 ++----------- .../optimizer/plan_node/batch_iceberg_scan.rs | 13 ++----------- .../optimizer/plan_node/batch_kafka_scan.rs | 4 ++-- .../src/optimizer/plan_node/batch_source.rs | 4 ++-- .../plan_node/logical_iceberg_scan.rs | 5 ----- src/frontend/src/scheduler/plan_fragmenter.rs | 19 +++++++++---------- 9 files changed, 26 insertions(+), 54 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8464cbe94672b..f2ae4158fe5d9 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -58,10 +58,9 @@ message ScanRange { } message SourceNode { - enum IcebergSourceType { - ICEBERG_TYPE_UNSPECIFIED = 0; - SCAN = 1; - COUNT_STAR = 2; + enum SourceType { + SCAN = 0; + COUNT_STAR = 1; } uint32 source_id = 1; repeated plan_common.ColumnCatalog columns = 2; @@ -69,7 +68,7 @@ message SourceNode { repeated bytes split = 4; catalog.StreamSourceInfo info = 5; map secret_refs = 6; - IcebergSourceType iceberg_source_type = 7; + SourceType source_type = 7; } message FileScanNode { diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index f884ae80e8290..32e71dd599374 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -29,7 +29,7 @@ use risingwave_connector::source::{ }; use risingwave_connector::WithOptionsSecResolved; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use super::iceberg_count_star_scan::IcebergCountStarExecutor; use super::Executor; @@ -110,8 +110,8 @@ impl BoxedExecutorBuilder for SourceExecutor { assert_eq!(split_list.len(), 1); if let SplitImpl::Iceberg(split) = &split_list[0] { let split: IcebergSplit = split.clone(); - match IcebergSourceType::try_from(source_node.iceberg_source_type).unwrap() { - IcebergSourceType::Scan => Ok(Box::new(IcebergScanExecutor::new( + match SourceType::try_from(source_node.source_type).unwrap() { + SourceType::Scan => Ok(Box::new(IcebergScanExecutor::new( iceberg_properties.to_iceberg_config(), Some(split.snapshot_id), split.table_meta.deserialize(), @@ -120,8 +120,7 @@ impl BoxedExecutorBuilder for SourceExecutor { schema, source.plan_node().get_identity().clone(), ))), - IcebergSourceType::IcebergTypeUnspecified => unreachable!(), - IcebergSourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new( + SourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new( schema, source.plan_node().get_identity().clone(), split.record_counts, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d244745e2029c..bdb32b590300b 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -64,9 +64,7 @@ pub async fn handle_query( let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - Ok(execute(session, plan_fragmenter_result, formats) - .await - .unwrap()) + execute(session, plan_fragmenter_result, formats).await } pub fn handle_parse( diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs index 91672f5b0c3c3..65e78e09db116 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_count_star_scan.rs @@ -18,7 +18,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use risingwave_common::types::DataType; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -63,15 +63,6 @@ impl BatchIcebergCountStarScan { self.core.catalog.clone() } - pub fn clone_with_core(&self, core: generic::Source) -> Self { - let base = PlanBase::new_batch_with_core( - &core, - self.base.distribution().clone(), - self.base.order().clone(), - ); - Self { base, core } - } - pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -128,7 +119,7 @@ impl ToBatchPb for BatchIcebergCountStarScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::CountStar.into(), + source_type: SourceType::CountStar.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 9abc508ce34e2..97db5a449ed74 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -56,15 +56,6 @@ impl BatchIcebergScan { self.core.catalog.clone() } - pub fn clone_with_core(&self, core: generic::Source) -> Self { - let base = PlanBase::new_batch_with_core( - &core, - self.base.distribution().clone(), - self.base.order().clone(), - ); - Self { base, core } - } - pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -121,7 +112,7 @@ impl ToBatchPb for BatchIcebergScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::Scan.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 759547c5056fb..76db232b12d32 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -18,7 +18,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use super::batch::prelude::*; @@ -134,7 +134,7 @@ impl ToBatchPb for BatchKafkaScan { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 763773bb5d739..00877c8b4b754 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::SourceNode; use risingwave_sqlparser::ast::AsOf; @@ -116,7 +116,7 @@ impl ToBatchPb for BatchSource { with_properties, split: vec![], secret_refs, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(), + source_type: SourceType::Scan.into(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs index 8bc2ca33f0a64..b355d6f2057c8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs @@ -51,11 +51,6 @@ impl LogicalIcebergScan { LogicalIcebergScan { base, core } } - pub fn new_with_core(core: generic::Source) -> Self { - let base = PlanBase::new_logical_with_core(&core); - LogicalIcebergScan { base, core } - } - pub fn source_catalog(&self) -> Option> { self.core.catalog.clone() } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6aa8f7d157b53..26e399223ffc9 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -41,7 +41,7 @@ use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::source_node::IcebergSourceType; +use risingwave_pb::batch_plan::source_node::SourceType; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; use risingwave_pb::plan_common::Field as PbField; @@ -275,7 +275,7 @@ pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, - pub iceberg_source_type: IcebergSourceType, + pub source_type: SourceType, } #[derive(Clone, Debug)] @@ -361,9 +361,8 @@ impl SourceScanInfo { None => None, }; - let split_info = match fetch_info.iceberg_source_type { - IcebergSourceType::IcebergTypeUnspecified => unreachable!(), - IcebergSourceType::Scan => iceberg_enumerator + let split_info = match fetch_info.source_type { + SourceType::Scan => iceberg_enumerator .list_splits_batch_scan( fetch_info.schema, time_travel_info, @@ -373,7 +372,7 @@ impl SourceScanInfo { .into_iter() .map(SplitImpl::Iceberg) .collect_vec(), - IcebergSourceType::CountStar => iceberg_enumerator + SourceType::CountStar => iceberg_enumerator .list_splits_batch_count_star(time_travel_info, batch_parallelism) .await? .into_iter() @@ -1069,7 +1068,7 @@ impl BatchPlanFragmenter { connector: property, timebound: timestamp_bound, as_of: None, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, + source_type: SourceType::Scan, }))); } } else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() { @@ -1084,7 +1083,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::Scan, + source_type: SourceType::Scan, }))); } } else if let Some(batch_iceberg_count_star_scan) = node.as_batch_iceberg_count_star_scan() @@ -1101,7 +1100,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::CountStar, + source_type: SourceType::CountStar, }))); } } else if let Some(source_node) = node.as_batch_source() { @@ -1117,7 +1116,7 @@ impl BatchPlanFragmenter { connector: property, timebound: (None, None), as_of, - iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified, + source_type: SourceType::Scan, }))); } }