Skip to content

Commit

Permalink
refactor(frontend): improve readability of LogicalSource (#14687)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Jan 22, 2024
1 parent f5c6389 commit b4f9b48
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 136 deletions.
10 changes: 4 additions & 6 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::handler::util::{
get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
use crate::session::SessionImpl;
use crate::utils::resolve_privatelink_in_with_option;
Expand Down Expand Up @@ -1262,12 +1263,9 @@ pub async fn handle_create_source(
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
// cdc source is an append-only source in plain json format
let source_node = LogicalSource::new(
Some(Rc::new(SourceCatalog::from(&source))),
columns.clone(),
row_id_index,
false,
false,
let source_node = LogicalSource::with_catalog(
Rc::new(SourceCatalog::from(&source)),
SourceNodeKind::CreateSourceWithStreamjob,
context.into(),
)?;

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::handler::create_source::{
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -682,8 +683,7 @@ fn gen_table_plan_inner(
source_catalog.clone(),
columns.clone(),
row_id_index,
false,
true,
SourceNodeKind::CreateTable,
context.clone(),
)?
.into();
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::TimestamptzExprFinder;
use crate::optimizer::plan_node::generic::Union;
use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
ToStream, VisitExprsRecursive,
Expand Down Expand Up @@ -622,8 +622,7 @@ impl PlanRoot {
None,
columns.clone(),
row_id_index,
false,
true,
SourceNodeKind::CreateTable,
context.clone(),
)
.and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
Expand Down
54 changes: 48 additions & 6 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use educe::Educe;
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::ConnectorProperties;

use super::super::utils::TableCatalogBuilder;
use super::GenericPlanNode;
Expand All @@ -28,20 +29,37 @@ use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::{TableCatalog, WithOptions};

/// In which scnario the source node is created
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[expect(clippy::enum_variant_names)]
pub enum SourceNodeKind {
/// `CREATE TABLE` with a connector.
CreateTable,
/// `CREATE SOURCE` with a streaming job (backfill-able source).
CreateSourceWithStreamjob,
/// `CREATE MATERIALIZED VIEW` which selects from a source.
///
/// Note:
/// - For non backfill-able source, `CREATE SOURCE` will not create a source node, and `CREATE MATERIALIZE VIEW` will create a `LogicalSource`.
/// - For backfill-able source, `CREATE MATERIALIZE VIEW` will create `LogicalSourceBackfill` instead of `LogicalSource`.
CreateMViewOrBatch,
}

/// [`Source`] returns contents of a table or other equivalent object
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct Source {
/// If there is an external stream source, `catalog` will be `Some`. Otherwise, it is `None`.
pub catalog: Option<Rc<SourceCatalog>>,
/// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan
/// generating, even if there is no external stream source.

// NOTE: Here we store `column_catalog` and `row_id_index`
// because they are needed when `catalog` is None.
// When `catalog` is Some, they are the same as these fields in `catalog`.
pub column_catalog: Vec<ColumnCatalog>,
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
pub gen_row_id: bool,
/// True if it is a source created when creating table with a source.
pub for_table: bool,

pub kind: SourceNodeKind,

#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,
Expand Down Expand Up @@ -80,6 +98,30 @@ impl GenericPlanNode for Source {
}

impl Source {
pub fn is_new_fs_connector(&self) -> bool {
self.catalog.as_ref().is_some_and(|catalog| {
ConnectorProperties::is_new_fs_connector_b_tree_map(&catalog.with_properties)
})
}

/// The columns in stream/batch source node indicate the actual columns it will produce,
/// instead of the columns defined in source catalog. The difference is generated columns.
pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
let original_row_id_index = self.row_id_index;
// minus the number of generated columns before row_id_index.
self.row_id_index = original_row_id_index.map(|idx| {
let mut cnt = 0;
for col in self.column_catalog.iter().take(idx + 1) {
if col.is_generated() {
cnt += 1;
}
}
idx - cnt
});
self.column_catalog.retain(|c| !c.is_generated());
(self, original_row_id_index)
}

pub fn kafka_timestamp_range_value(&self) -> (Option<i64>, Option<i64>) {
let (lower_bound, upper_bound) = &self.kafka_timestamp_range;
let lower_bound = match lower_bound {
Expand Down
Loading

0 comments on commit b4f9b48

Please sign in to comment.