Skip to content

Commit

Permalink
fix: cdc source should not add row id gen
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 19, 2024
1 parent 98ab255 commit e36c80a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 48 deletions.
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::handler::util::{
get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
use crate::session::SessionImpl;
use crate::utils::resolve_privatelink_in_with_option;
Expand Down Expand Up @@ -1264,7 +1265,7 @@ pub async fn handle_create_source(
// cdc source is an append-only source in plain json format
let source_node = LogicalSource::with_catalog(
Rc::new(SourceCatalog::from(&source)),
false,
SourceNodeKind::CreateSourceWithStreamjob,
context.into(),
)?;

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::handler::create_source::{
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -682,7 +683,7 @@ fn gen_table_plan_inner(
source_catalog.clone(),
columns.clone(),
row_id_index,
true,
SourceNodeKind::CreateTable,
context.clone(),
)?
.into();
Expand Down
13 changes: 9 additions & 4 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::TimestamptzExprFinder;
use crate::optimizer::plan_node::generic::Union;
use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
ToStream, VisitExprsRecursive,
Expand Down Expand Up @@ -618,9 +618,14 @@ impl PlanRoot {
}
};

let dummy_source_node =
LogicalSource::new(None, columns.clone(), row_id_index, true, context.clone())
.and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
let dummy_source_node = LogicalSource::new(
None,
columns.clone(),
row_id_index,
SourceNodeKind::CreateTable,
context.clone(),
)
.and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;

let dml_node = inject_dml_node(
&columns,
Expand Down
19 changes: 17 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::{TableCatalog, WithOptions};

/// In which scnario the source node is created
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[expect(clippy::enum_variant_names)]
pub enum SourceNodeKind {
/// `CREATE TABLE` with a connector.
CreateTable,
/// `CREATE SOURCE` with a streaming job (backfill-able source).
CreateSourceWithStreamjob,
/// `CREATE MATERIALIZED VIEW` which selects from a non backfill-able source.
///
/// Note:
/// - For non backfill-able source, `CREATE SOURCE` will not create a source node.
/// - For backfill-able source, `CREATE MATERIALIZE VIEW` will not create a source node.
CreateMView,
}

/// [`Source`] returns contents of a table or other equivalent object
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
Expand All @@ -42,8 +58,7 @@ pub struct Source {
pub column_catalog: Vec<ColumnCatalog>,
pub row_id_index: Option<usize>,

/// True if it is a source created when creating table with a source.
pub for_table: bool,
pub kind: SourceNodeKind,

#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
Expand Down
79 changes: 40 additions & 39 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_connector::source::DataType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;

use super::generic::GenericPlanRef;
use super::generic::{GenericPlanRef, SourceNodeKind};
use super::stream_watermark_filter::StreamWatermarkFilter;
use super::utils::{childless_record, Distill};
use super::{
Expand Down Expand Up @@ -69,15 +69,15 @@ impl LogicalSource {
source_catalog: Option<Rc<SourceCatalog>>,
column_catalog: Vec<ColumnCatalog>,
row_id_index: Option<usize>,
for_table: bool,
kind: SourceNodeKind,
ctx: OptimizerContextRef,
) -> Result<Self> {
let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded);
let core = generic::Source {
catalog: source_catalog,
column_catalog,
row_id_index,
for_table,
kind,
ctx,
kafka_timestamp_range,
};
Expand All @@ -97,7 +97,7 @@ impl LogicalSource {

pub fn with_catalog(
source_catalog: Rc<SourceCatalog>,
for_table: bool,
kind: SourceNodeKind,
ctx: OptimizerContextRef,
) -> Result<Self> {
let column_catalogs = source_catalog.columns.clone();
Expand All @@ -110,7 +110,7 @@ impl LogicalSource {
Some(source_catalog),
column_catalogs,
row_id_index,
for_table,
kind,
ctx,
)
}
Expand Down Expand Up @@ -506,45 +506,46 @@ impl ToBatch for LogicalSource {
impl ToStream for LogicalSource {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
let mut plan: PlanRef;

if self.core.for_table {
// Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan
if self.core.is_new_fs_connector() {
plan = Self::create_fs_list_plan(self.core.clone())?;
plan = StreamFsFetch::new(plan, self.core.clone()).into();
} else {
plan = StreamSource::new(self.core.clone()).into()
}
} else {
// Create MV on source.
if self.core.is_new_fs_connector() {
plan = Self::create_fs_list_plan(self.core.clone())?;
plan = StreamFsFetch::new(plan, self.core.clone()).into();
} else {
plan = StreamSource::new(self.core.clone()).into()
match self.core.kind {
SourceNodeKind::CreateTable | SourceNodeKind::CreateSourceWithStreamjob => {
// Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan
if self.core.is_new_fs_connector() {
plan = Self::create_fs_list_plan(self.core.clone())?;
plan = StreamFsFetch::new(plan, self.core.clone()).into();
} else {
plan = StreamSource::new(self.core.clone()).into()
}
}
SourceNodeKind::CreateMView => {
// Create MV on source.
if self.core.is_new_fs_connector() {
plan = Self::create_fs_list_plan(self.core.clone())?;
plan = StreamFsFetch::new(plan, self.core.clone()).into();
} else {
plan = StreamSource::new(self.core.clone()).into()
}

if let Some(exprs) = &self.output_exprs {
let logical_project = generic::Project::new(exprs.to_vec(), plan);
plan = StreamProject::new(logical_project).into();
}
if let Some(exprs) = &self.output_exprs {
let logical_project = generic::Project::new(exprs.to_vec(), plan);
plan = StreamProject::new(logical_project).into();
}

if let Some(catalog) = self.source_catalog()
&& !catalog.watermark_descs.is_empty()
{
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
}
if let Some(catalog) = self.source_catalog()
&& !catalog.watermark_descs.is_empty()
{
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
}

if let Some(row_id_index) = self.output_row_id_index {
plan = StreamRowIdGen::new_with_dist(
plan,
row_id_index,
HashShard(vec![row_id_index]),
)
.into();
if let Some(row_id_index) = self.output_row_id_index {
plan = StreamRowIdGen::new_with_dist(
plan,
row_id_index,
HashShard(vec![row_id_index]),
)
.into();
}
}
};

}
Ok(plan)
}

Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::binder::{
BoundWindowTableFunction, Relation, WindowTableFunctionKind,
};
use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{
LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare,
LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef,
Expand Down Expand Up @@ -85,7 +86,12 @@ impl Planner {
}

pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into())
Ok(LogicalSource::with_catalog(
Rc::new(source.catalog),
SourceNodeKind::CreateMView,
self.ctx(),
)?
.into())
}

pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
Expand Down

0 comments on commit e36c80a

Please sign in to comment.