From 5a4ec9b557dd739cf1a107d152134594a6391b82 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 26 Sep 2023 18:29:22 +0800 Subject: [PATCH 1/4] fix unary StreamFsFetch --- .../src/optimizer/plan_node/stream_fs_fetch.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 01440b1275d3..966326024dfe 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::stream_plan::stream_node::NodeBody; -use super::{PlanBase, PlanRef}; +use super::{PlanBase, PlanRef, PlanTreeNodeUnary}; use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -28,7 +28,16 @@ pub struct StreamFsFetch { source: generic::Source, } -impl_plan_tree_node_for_leaf!(StreamFsFetch); +impl PlanTreeNodeUnary for StreamFsFetch { + fn input(&self) -> PlanRef { + self.input.clone() + } + + fn clone_with_input(&self, input: PlanRef) -> Self { + Self::new(input, self.source.clone()) + } +} +impl_plan_tree_node_for_unary! { StreamFsFetch } impl StreamFsFetch { pub fn new(input: PlanRef, source: generic::Source) -> Self { From 9c0e62d67c4c1f0388d09351071015c25baf7e5b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Sep 2023 19:31:17 +0800 Subject: [PATCH 2/4] refactor: Refactor stream fetch to support distribution properties - Add `crate::optimizer::property::Distribution` module import - Add `Distribution::SomeShard` as an argument in `PlanBase::new_stream_with_logical` function call Signed-off-by: tabVersion --- .../src/optimizer/plan_node/stream_fs_fetch.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 966326024dfe..61c83dbe97b6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -19,6 +19,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use super::{PlanBase, PlanRef, PlanTreeNodeUnary}; use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -41,15 +42,12 @@ impl_plan_tree_node_for_unary! { StreamFsFetch } impl StreamFsFetch { pub fn new(input: PlanRef, source: generic::Source) -> Self { - let base = PlanBase::new_stream( - input.ctx(), - input.schema().clone(), - input.logical_pk().to_vec(), - input.functional_dependency().clone(), - input.distribution().clone(), + let base = PlanBase::new_stream_with_logical( + &source, + Distribution::SomeShard, source.catalog.as_ref().map_or(true, |s| s.append_only), false, - FixedBitSet::with_capacity(input.schema().len()), + FixedBitSet::with_capacity(source.column_catalog.len()), ); Self { From c4fed1219eff84856aaf4b71ef420cdad238dcd5 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 28 Sep 2023 11:49:57 +0800 Subject: [PATCH 3/4] return error for new s3 source on batch --- src/frontend/src/optimizer/plan_node/logical_source.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 710bf68e44af..c79d847822d2 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -23,7 +23,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result, RwError, TrackingIssue}; use risingwave_connector::source::{ConnectorProperties, DataType}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -513,7 +513,10 @@ impl ToBatch for LogicalSource { &self.core.catalog.as_ref().unwrap().properties, ) { - todo!() + return Err(RwError::from(ErrorCode::NotImplemented( + "New S3 connector for batch".to_string(), + TrackingIssue(None), + ))); } let source = self.wrap_with_optional_generated_columns_batch_proj()?; Ok(source) From fff6d47d897b859e1128029a3fe19b5d41a029f8 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 28 Sep 2023 11:54:26 +0800 Subject: [PATCH 4/4] fix --- src/frontend/src/optimizer/plan_node/logical_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index c79d847822d2..c6172225c5b4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -515,7 +515,7 @@ impl ToBatch for LogicalSource { { return Err(RwError::from(ErrorCode::NotImplemented( "New S3 connector for batch".to_string(), - TrackingIssue(None), + TrackingIssue::from(None), ))); } let source = self.wrap_with_optional_generated_columns_batch_proj()?;