diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 710bf68e44af..c6172225c5b4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -23,7 +23,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result, RwError, TrackingIssue}; use risingwave_connector::source::{ConnectorProperties, DataType}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -513,7 +513,10 @@ impl ToBatch for LogicalSource { &self.core.catalog.as_ref().unwrap().properties, ) { - todo!() + return Err(RwError::from(ErrorCode::NotImplemented( + "New S3 connector for batch".to_string(), + TrackingIssue::from(None), + ))); } let source = self.wrap_with_optional_generated_columns_batch_proj()?; Ok(source) diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 01440b1275d3..61c83dbe97b6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -16,9 +16,10 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::stream_plan::stream_node::NodeBody; -use super::{PlanBase, PlanRef}; +use super::{PlanBase, PlanRef, PlanTreeNodeUnary}; use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -28,19 +29,25 @@ pub struct StreamFsFetch { source: generic::Source, } -impl_plan_tree_node_for_leaf!(StreamFsFetch); +impl PlanTreeNodeUnary for StreamFsFetch { + fn input(&self) -> PlanRef { + self.input.clone() + } + + fn clone_with_input(&self, input: PlanRef) -> Self { + Self::new(input, self.source.clone()) + } +} +impl_plan_tree_node_for_unary! { StreamFsFetch } impl StreamFsFetch { pub fn new(input: PlanRef, source: generic::Source) -> Self { - let base = PlanBase::new_stream( - input.ctx(), - input.schema().clone(), - input.logical_pk().to_vec(), - input.functional_dependency().clone(), - input.distribution().clone(), + let base = PlanBase::new_stream_with_logical( + &source, + Distribution::SomeShard, source.catalog.as_ref().map_or(true, |s| s.append_only), false, - FixedBitSet::with_capacity(input.schema().len()), + FixedBitSet::with_capacity(source.column_catalog.len()), ); Self {