Skip to content

Commit

Permalink
refactor: separate CdcScan from Scan in logical and core (#13494
Browse files Browse the repository at this point in the history
)

Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
kwannoel and fuyufjh authored Nov 17, 2023
1 parent 4833477 commit a34f46a
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 155 deletions.
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@
) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true);
expected_outputs:
- explain_output
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
with_config_map:
CDC_BACKFILL: 'true'
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
explain_output: |
LogicalCdcScan { table: mydb.t1, columns: [v1, v2] }
with_config_map:
CDC_BACKFILL: 'true'
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::catalog::root_catalog::SchemaPath;
use crate::expr::{Expr, ExprImpl, InputRef};
use crate::handler::privilege::ObjectCheckItem;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::ScanTableType;
use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
Expand Down Expand Up @@ -325,7 +324,6 @@ fn assemble_materialize(

let logical_scan = LogicalScan::create(
table_name,
ScanTableType::default(),
table_desc.clone(),
// Index table has no indexes.
vec![],
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::handler::create_source::{
check_source_schema, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{LogicalScan, LogicalSource};
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
use crate::session::SessionImpl;
Expand Down Expand Up @@ -863,7 +863,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

tracing::debug!(?cdc_table_desc, "create cdc table");

let logical_scan = LogicalScan::create_for_cdc(
let logical_scan = LogicalCdcScan::create(
external_table_name,
Rc::new(cdc_table_desc),
context.clone(),
Expand Down
173 changes: 173 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::rc::Rc;

use educe::Educe;
use fixedbitset::FixedBitSet;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, Field, Schema};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;

use super::GenericPlanNode;
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

/// [`CdcScan`] reads rows of a table from an external upstream database
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct CdcScan {
pub table_name: String,
/// Include `output_col_idx` and columns required in `predicate`
pub output_col_idx: Vec<usize>,
/// Descriptor of the external table for CDC
pub cdc_table_desc: Rc<CdcTableDesc>,
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,
}

impl CdcScan {
pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {}

/// Get the ids of the output columns.
pub fn output_column_ids(&self) -> Vec<ColumnId> {
self.output_col_idx
.iter()
.map(|i| self.get_table_columns()[*i].column_id)
.collect()
}

pub fn primary_key(&self) -> &[ColumnOrder] {
&self.cdc_table_desc.pk
}

pub fn watermark_columns(&self) -> FixedBitSet {
FixedBitSet::with_capacity(self.get_table_columns().len())
}

pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
self.output_col_idx
.iter()
.map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
.collect()
}

pub(crate) fn column_names(&self) -> Vec<String> {
self.output_col_idx
.iter()
.map(|&i| self.get_table_columns()[i].name.clone())
.collect()
}

/// get the Mapping of columnIndex from internal column index to output column index
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
ColIndexMapping::with_remaining_columns(
&self.output_col_idx,
self.get_table_columns().len(),
)
}

/// Get the ids of the output columns and primary key columns.
pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
let mut ids = self.output_column_ids();
for column_order in self.primary_key() {
let id = self.get_table_columns()[column_order.column_index].column_id;
if !ids.contains(&id) {
ids.push(id);
}
}
ids
}

/// Create a logical scan node for CDC backfill
pub(crate) fn new(
table_name: String,
output_col_idx: Vec<usize>, // the column index in the table
cdc_table_desc: Rc<CdcTableDesc>,
ctx: OptimizerContextRef,
) -> Self {
Self {
table_name,
output_col_idx,
cdc_table_desc,
ctx,
}
}

pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
Pretty::Array(
match verbose {
true => self.column_names_with_table_prefix(),
false => self.column_names(),
}
.into_iter()
.map(Pretty::from)
.collect(),
)
}
}

// TODO: extend for cdc table
impl GenericPlanNode for CdcScan {
fn schema(&self) -> Schema {
let fields = self
.output_col_idx
.iter()
.map(|tb_idx| {
let col = &self.get_table_columns()[*tb_idx];
Field::from_with_table_name_prefix(col, &self.table_name)
})
.collect();
Schema { fields }
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.cdc_table_desc.stream_key.clone())
}

fn ctx(&self) -> OptimizerContextRef {
self.ctx.clone()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.stream_key();
let col_num = self.output_col_idx.len();
match &pk_indices {
Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
None => FunctionalDependencySet::new(col_num),
}
}
}

impl CdcScan {
pub fn get_table_columns(&self) -> &[ColumnDesc] {
&self.cdc_table_desc.columns
}

pub fn append_only(&self) -> bool {
false
}

/// Get the descs of the output columns.
pub fn column_descs(&self) -> Vec<ColumnDesc> {
self.output_col_idx
.iter()
.map(|&i| self.get_table_columns()[i].clone())
.collect()
}
}
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ mod scan;
pub use scan::*;
mod sys_scan;
pub use sys_scan::*;

mod cdc_scan;
pub use cdc_scan::*;

mod union;
pub use union::*;
mod top_n;
Expand Down
Loading

0 comments on commit a34f46a

Please sign in to comment.