Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fmt
  • Loading branch information
xxhZs committed Aug 19, 2024
1 parent 497352e commit 2552e1f
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 54 deletions.
9 changes: 4 additions & 5 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,17 @@ message ScanRange {
}

message SourceNode {
enum IcebergSourceType {
ICEBERG_TYPE_UNSPECIFIED = 0;
SCAN = 1;
COUNT_STAR = 2;
enum SourceType {
SCAN = 0;
COUNT_STAR = 1;
}
uint32 source_id = 1;
repeated plan_common.ColumnCatalog columns = 2;
map<string, string> with_properties = 3;
repeated bytes split = 4;
catalog.StreamSourceInfo info = 5;
map<string, secret.SecretRef> secret_refs = 6;
IcebergSourceType iceberg_source_type = 7;
SourceType source_type = 7;
}

message FileScanNode {
Expand Down
9 changes: 4 additions & 5 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_connector::source::{
};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;

use super::iceberg_count_star_scan::IcebergCountStarExecutor;
use super::Executor;
Expand Down Expand Up @@ -110,8 +110,8 @@ impl BoxedExecutorBuilder for SourceExecutor {
assert_eq!(split_list.len(), 1);
if let SplitImpl::Iceberg(split) = &split_list[0] {
let split: IcebergSplit = split.clone();
match IcebergSourceType::try_from(source_node.iceberg_source_type).unwrap() {
IcebergSourceType::Scan => Ok(Box::new(IcebergScanExecutor::new(
match SourceType::try_from(source_node.source_type).unwrap() {
SourceType::Scan => Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
split.table_meta.deserialize(),
Expand All @@ -120,8 +120,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
schema,
source.plan_node().get_identity().clone(),
))),
IcebergSourceType::IcebergTypeUnspecified => unreachable!(),
IcebergSourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new(
SourceType::CountStar => Ok(Box::new(IcebergCountStarExecutor::new(
schema,
source.plan_node().get_identity().clone(),
split.record_counts,
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ pub async fn handle_query(
let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
gen_batch_plan_fragmenter(&session, plan_result)?
};
Ok(execute(session, plan_fragmenter_result, formats)
.await
.unwrap())
execute(session, plan_fragmenter_result, formats).await
}

pub fn handle_parse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_sqlparser::ast::AsOf;

Expand Down Expand Up @@ -63,15 +63,6 @@ impl BatchIcebergCountStarScan {
self.core.catalog.clone()
}

pub fn clone_with_core(&self, core: generic::Source) -> Self {
let base = PlanBase::new_batch_with_core(
&core,
self.base.distribution().clone(),
self.base.order().clone(),
);
Self { base, core }
}

pub fn clone_with_dist(&self) -> Self {
let base = self
.base
Expand Down Expand Up @@ -128,7 +119,7 @@ impl ToBatchPb for BatchIcebergCountStarScan {
with_properties,
split: vec![],
secret_refs,
iceberg_source_type: IcebergSourceType::CountStar.into(),
source_type: SourceType::CountStar.into(),
})
}
}
Expand Down
13 changes: 2 additions & 11 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_sqlparser::ast::AsOf;

Expand Down Expand Up @@ -56,15 +56,6 @@ impl BatchIcebergScan {
self.core.catalog.clone()
}

pub fn clone_with_core(&self, core: generic::Source) -> Self {
let base = PlanBase::new_batch_with_core(
&core,
self.base.distribution().clone(),
self.base.order().clone(),
);
Self { base, core }
}

pub fn clone_with_dist(&self) -> Self {
let base = self
.base
Expand Down Expand Up @@ -121,7 +112,7 @@ impl ToBatchPb for BatchIcebergScan {
with_properties,
split: vec![],
secret_refs,
iceberg_source_type: IcebergSourceType::Scan.into(),
source_type: SourceType::Scan.into(),
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;
use risingwave_pb::batch_plan::SourceNode;

use super::batch::prelude::*;
Expand Down Expand Up @@ -134,7 +134,7 @@ impl ToBatchPb for BatchKafkaScan {
with_properties,
split: vec![],
secret_refs,
iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(),
source_type: SourceType::Scan.into(),
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_sqlparser::ast::AsOf;

Expand Down Expand Up @@ -116,7 +116,7 @@ impl ToBatchPb for BatchSource {
with_properties,
split: vec![],
secret_refs,
iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified.into(),
source_type: SourceType::Scan.into(),
})
}
}
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ impl LogicalIcebergScan {
LogicalIcebergScan { base, core }
}

pub fn new_with_core(core: generic::Source) -> Self {
let base = PlanBase::new_logical_with_core(&core);
LogicalIcebergScan { base, core }
}

pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
self.core.catalog.clone()
}
Expand Down
19 changes: 9 additions & 10 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl,
};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::source_node::IcebergSourceType;
use risingwave_pb::batch_plan::source_node::SourceType;
use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto};
use risingwave_pb::common::Buffer;
use risingwave_pb::plan_common::Field as PbField;
Expand Down Expand Up @@ -275,7 +275,7 @@ pub struct SourceFetchInfo {
pub connector: ConnectorProperties,
pub timebound: (Option<i64>, Option<i64>),
pub as_of: Option<AsOf>,
pub iceberg_source_type: IcebergSourceType,
pub source_type: SourceType,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -361,9 +361,8 @@ impl SourceScanInfo {
None => None,
};

let split_info = match fetch_info.iceberg_source_type {
IcebergSourceType::IcebergTypeUnspecified => unreachable!(),
IcebergSourceType::Scan => iceberg_enumerator
let split_info = match fetch_info.source_type {
SourceType::Scan => iceberg_enumerator
.list_splits_batch_scan(
fetch_info.schema,
time_travel_info,
Expand All @@ -373,7 +372,7 @@ impl SourceScanInfo {
.into_iter()
.map(SplitImpl::Iceberg)
.collect_vec(),
IcebergSourceType::CountStar => iceberg_enumerator
SourceType::CountStar => iceberg_enumerator
.list_splits_batch_count_star(time_travel_info, batch_parallelism)
.await?
.into_iter()
Expand Down Expand Up @@ -1069,7 +1068,7 @@ impl BatchPlanFragmenter {
connector: property,
timebound: timestamp_bound,
as_of: None,
iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified,
source_type: SourceType::Scan,
})));
}
} else if let Some(batch_iceberg_scan) = node.as_batch_iceberg_scan() {
Expand All @@ -1084,7 +1083,7 @@ impl BatchPlanFragmenter {
connector: property,
timebound: (None, None),
as_of,
iceberg_source_type: IcebergSourceType::Scan,
source_type: SourceType::Scan,
})));
}
} else if let Some(batch_iceberg_count_star_scan) = node.as_batch_iceberg_count_star_scan()
Expand All @@ -1101,7 +1100,7 @@ impl BatchPlanFragmenter {
connector: property,
timebound: (None, None),
as_of,
iceberg_source_type: IcebergSourceType::CountStar,
source_type: SourceType::CountStar,
})));
}
} else if let Some(source_node) = node.as_batch_source() {
Expand All @@ -1117,7 +1116,7 @@ impl BatchPlanFragmenter {
connector: property,
timebound: (None, None),
as_of,
iceberg_source_type: IcebergSourceType::IcebergTypeUnspecified,
source_type: SourceType::Scan,
})));
}
}
Expand Down

0 comments on commit 2552e1f

Please sign in to comment.