Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 26, 2024
1 parent 0b21756 commit 056daf7
Show file tree
Hide file tree
Showing 18 changed files with 62 additions and 62 deletions.
2 changes: 1 addition & 1 deletion e2e_test/streaming/changed_log.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1

statement ok
create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.change_log_op as op1, sub2.change_log_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.changelog_op as op1, sub2.changelog_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changelog from t3 select * from sub;
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangeLogNode change_log = 143;
ChangeLogNode changelog = 143;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/binder/relation/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::binder::bind_context::RecursiveUnion;
use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::{BoundQuery, Relation, ShareId};
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::{CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};
use crate::optimizer::plan_node::generic::{CHANGELOG_OP, _CHANGELOG_ROW_ID};

/// Share a relation during binding and planning.
/// It could be used to share a (recursive) CTE, a source, a view and so on.
Expand Down Expand Up @@ -74,14 +74,14 @@ impl BoundShareInput {
false,
Field::with_name(
risingwave_common::types::DataType::Int16,
CHANGE_LOG_OP.to_string(),
CHANGELOG_OP.to_string(),
),
),
(
true,
Field::with_name(
risingwave_common::types::DataType::Serial,
_CHANGE_LOG_ROW_ID.to_string(),
_CHANGELOG_ROW_ID.to_string(),
),
),
])
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::binder::{Binder, Relation};
use crate::catalog::check_valid_column_name;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::CHANGE_LOG_OP;
use crate::optimizer::plan_node::generic::CHANGELOG_OP;
use crate::utils::group_by::GroupBy;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -285,10 +285,10 @@ impl Binder {

if let Some(Relation::Share(bound)) = &from {
if matches!(bound.input, BoundShareInput::ChangeLog(_))
&& fields.iter().filter(|&x| x.name.eq(CHANGE_LOG_OP)).count() > 1
&& fields.iter().filter(|&x| x.name.eq(CHANGELOG_OP)).count() > 1
{
return Err(ErrorCode::BindError(
"The source table of changelog cannot have `change_log_op`, please rename it first".to_string()
"The source table of changelog cannot have `changelog_op`, please rename it first".to_string()
)
.into());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@ use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMappingRewriteExt;
use crate::OptimizerContextRef;

pub const CHANGE_LOG_OP: &str = "change_log_op";
pub const _CHANGE_LOG_ROW_ID: &str = "_change_log_row_id";
pub const CHANGELOG_OP: &str = "changelog_op";
pub const _CHANGELOG_ROW_ID: &str = "_changelog_row_id";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ChangeLog<PlanRef> {
pub input: PlanRef,
// If there is no op in the output result, it is false, example 'create materialized view mv1 as with sub as changelog from t1 select v1 from sub;'
pub need_op: bool,
// False before rewrite, true after rewrite
pub need_change_log_row_id: bool,
pub need_changelog_row_id: bool,
}
impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
childless_record(name, vec![])
}
}
impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
pub fn new(input: PlanRef, need_op: bool, need_change_log_row_id: bool) -> Self {
pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
ChangeLog {
input,
need_op,
need_change_log_row_id,
need_changelog_row_id,
}
}

Expand All @@ -59,20 +59,20 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
if self.need_op {
fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
CHANGE_LOG_OP,
CHANGELOG_OP,
));
}
if self.need_change_log_row_id {
if self.need_changelog_row_id {
fields.push(Field::with_name(
risingwave_common::types::DataType::Serial,
_CHANGE_LOG_ROW_ID,
_CHANGELOG_ROW_ID,
));
}
Schema::new(fields)
}

fn stream_key(&self) -> Option<Vec<usize>> {
if self.need_change_log_row_id {
if self.need_changelog_row_id {
let keys = vec![self.schema().len() - 1];
Some(keys)
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ mod cte_ref;
pub use cte_ref::*;
mod recursive_union;
pub use recursive_union::*;
mod change_log;
pub use change_log::*;
mod changelog;
pub use changelog::*;
mod now;
pub use now::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use itertools::Itertools;

use super::expr_visitable::ExprVisitable;
use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};
use super::generic::{GenericPlanRef, CHANGELOG_OP, _CHANGELOG_ROW_ID};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
Expand All @@ -40,8 +40,8 @@ impl LogicalChangeLog {
Self::new(input, true, true).into()
}

pub fn new(input: PlanRef, need_op: bool, need_change_log_row_id: bool) -> Self {
let core = generic::ChangeLog::new(input, need_op, need_change_log_row_id);
pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self {
let core = generic::ChangeLog::new(input, need_op, need_changelog_row_id);
Self::with_core(core)
}

Expand All @@ -57,23 +57,23 @@ impl PlanTreeNodeUnary for LogicalChangeLog {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.core.need_op, self.core.need_change_log_row_id)
Self::new(input, self.core.need_op, self.core.need_changelog_row_id)
}

fn rewrite_with_input(
&self,
input: PlanRef,
input_col_change: ColIndexMapping,
) -> (Self, ColIndexMapping) {
let change_log = Self::new(input, self.core.need_op, true);
let changelog = Self::new(input, self.core.need_op, true);
if self.core.need_op {
let mut output_vec = input_col_change.to_parts().0.to_vec();
let len = input_col_change.to_parts().1;
output_vec.push(Some(len));
let out_col_change = ColIndexMapping::new(output_vec, len + 1);
(change_log, out_col_change)
(changelog, out_col_change)
} else {
(change_log, input_col_change)
(changelog, input_col_change)
}
}
}
Expand All @@ -99,16 +99,16 @@ impl ColPrunable for LogicalChangeLog {
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let fields = self.schema().fields();
let mut need_op = false;
let mut need_change_log_row_id = false;
let mut need_changelog_row_id = false;
let new_required_cols: Vec<_> = required_cols
.iter()
.filter_map(|a| {
if let Some(f) = fields.get(*a) {
if f.name == CHANGE_LOG_OP {
if f.name == CHANGELOG_OP {
need_op = true;
None
} else if f.name == _CHANGE_LOG_ROW_ID {
need_change_log_row_id = true;
} else if f.name == _CHANGELOG_ROW_ID {
need_changelog_row_id = true;
None
} else {
Some(*a)
Expand All @@ -120,7 +120,7 @@ impl ColPrunable for LogicalChangeLog {
.collect();

let new_input = self.input().prune_col(&new_required_cols, ctx);
Self::new(new_input, need_op, need_change_log_row_id).into()
Self::new(new_input, need_op, need_changelog_row_id).into()
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ impl ToStream for LogicalChangeLog {
.collect_vec();
let project = LogicalProject::new(input.clone(), exprs);
let (project, out_col_change) = project.rewrite_with_input(input, input_col_change);
let (change_log, out_col_change) = self.rewrite_with_input(project.into(), out_col_change);
Ok((change_log.into(), out_col_change))
let (changelog, out_col_change) = self.rewrite_with_input(project.into(), out_col_change);
Ok((changelog.into(), out_col_change))
}
}
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ mod batch_values;
mod logical_agg;
mod logical_apply;
mod logical_cdc_scan;
mod logical_change_log;
mod logical_changelog;
mod logical_cte_ref;
mod logical_dedup;
mod logical_delete;
Expand Down Expand Up @@ -879,7 +879,7 @@ mod logical_topn;
mod logical_union;
mod logical_update;
mod logical_values;
mod stream_change_log;
mod stream_changelog;
mod stream_dedup;
mod stream_delta_join;
mod stream_dml;
Expand Down Expand Up @@ -953,7 +953,7 @@ pub use batch_values::BatchValues;
pub use logical_agg::LogicalAgg;
pub use logical_apply::LogicalApply;
pub use logical_cdc_scan::LogicalCdcScan;
pub use logical_change_log::LogicalChangeLog;
pub use logical_changelog::LogicalChangeLog;
pub use logical_cte_ref::LogicalCteRef;
pub use logical_dedup::LogicalDedup;
pub use logical_delete::LogicalDelete;
Expand Down Expand Up @@ -984,7 +984,7 @@ pub use logical_union::LogicalUnion;
pub use logical_update::LogicalUpdate;
pub use logical_values::LogicalValues;
pub use stream_cdc_table_scan::StreamCdcTableScan;
pub use stream_change_log::StreamChangeLog;
pub use stream_changelog::StreamChangeLog;
pub use stream_dedup::StreamDedup;
pub use stream_delta_join::StreamDeltaJoin;
pub use stream_dml::StreamDml;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");

impl StreamNode for StreamChangeLog {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
PbNodeBody::ChangeLog(ChangeLogNode {
PbNodeBody::Changelog(ChangeLogNode {
need_op: self.core.need_op,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::optimizer::plan_node::LogicalChangeLog;
use crate::{PlanRef, Planner};

impl Planner {
pub(super) fn plan_change_log(&mut self, relation: Relation) -> Result<PlanRef> {
pub(super) fn plan_changelog(&mut self, relation: Relation) -> Result<PlanRef> {
let root = self.plan_relation(relation)?;
let plan = LogicalChangeLog::create(root);
Ok(plan)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::binder::{BoundStatement, ShareId};
use crate::error::Result;
use crate::optimizer::{OptimizerContextRef, PlanRoot};

mod change_log;
mod changelog;
mod delete;
mod insert;
mod query;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Planner {
),
BoundShareInput::ChangeLog(relation) => {
let id = share.share_id;
let result = self.plan_change_log(relation)?;
let result = self.plan_changelog(relation)?;
let logical_share = LogicalShare::create(result);
self.share_cache.insert(id, logical_share.clone());
Ok(logical_share)
Expand Down
6 changes: 3 additions & 3 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4001,9 +4001,9 @@ impl Parser<'_> {
self.expect_token(&Token::RParen)?;
Ok(CteInner::Query(query))
} else {
let change_log = self.parse_identifier_non_reserved()?;
if change_log.to_string().to_lowercase() != "changelog" {
parser_err!("Expected 'changelog' but found '{}'", change_log);
let changelog = self.parse_identifier_non_reserved()?;
if changelog.to_string().to_lowercase() != "changelog" {
parser_err!("Expected 'changelog' but found '{}'", changelog);
}
self.expect_keyword(Keyword::FROM)?;
Ok(CteInner::ChangeLog(self.parse_identifier()?))
Expand Down
20 changes: 10 additions & 10 deletions src/sqlparser/tests/sqlparser_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2745,14 +2745,14 @@ fn parse_ctes() {
}

#[test]
fn parse_change_log_ctes() {
fn parse_changelog_ctes() {
let cte_sqls = vec!["foo", "bar"];
let with = &format!(
"WITH a AS changelog from {}, b AS changelog from {} SELECT foo + bar FROM a, b",
cte_sqls[0], cte_sqls[1]
);

fn assert_change_log_ctes(expected: &[&str], sel: &Query) {
fn assert_changelog_ctes(expected: &[&str], sel: &Query) {
for (i, exp) in expected.iter().enumerate() {
let Cte { alias, cte_inner } = &sel.with.as_ref().unwrap().cte_tables[i];
if let CteInner::ChangeLog(from) = cte_inner {
Expand All @@ -2773,13 +2773,13 @@ fn parse_change_log_ctes() {
}

// Top-level CTE
assert_change_log_ctes(&cte_sqls, &verified_query(with));
assert_changelog_ctes(&cte_sqls, &verified_query(with));
// CTE in a subquery
let sql = &format!("SELECT ({})", with);
let select = verified_only_select(sql);
match expr_from_projection(only(&select.projection)) {
Expr::Subquery(ref subquery) => {
assert_change_log_ctes(&cte_sqls, subquery.as_ref());
assert_changelog_ctes(&cte_sqls, subquery.as_ref());
}
_ => panic!("expected subquery"),
}
Expand All @@ -2788,21 +2788,21 @@ fn parse_change_log_ctes() {
let select = verified_only_select(sql);
match only(select.from).relation {
TableFactor::Derived { subquery, .. } => {
assert_change_log_ctes(&cte_sqls, subquery.as_ref())
assert_changelog_ctes(&cte_sqls, subquery.as_ref())
}
_ => panic!("expected derived table"),
}
// CTE in a view
let sql = &format!("CREATE VIEW v AS {}", with);
match verified_stmt(sql) {
Statement::CreateView { query, .. } => assert_change_log_ctes(&cte_sqls, &query),
Statement::CreateView { query, .. } => assert_changelog_ctes(&cte_sqls, &query),
_ => panic!("expected CREATE VIEW"),
}
// CTE in a CTE...
let sql = &format!("WITH outer_cte AS ({}) SELECT * FROM outer_cte", with);
let select = verified_query(sql);
if let CteInner::Query(query) = &only(&select.with.unwrap().cte_tables).cte_inner {
assert_change_log_ctes(&cte_sqls, query);
assert_changelog_ctes(&cte_sqls, query);
} else {
panic!("expected CteInner::Query")
}
Expand All @@ -2824,12 +2824,12 @@ fn parse_cte_renamed_columns() {
.columns
);

let sql_change_log = "WITH cte (col1, col2) AS changelog from baz SELECT * FROM cte";
let sql_changelog = "WITH cte (col1, col2) AS changelog from baz SELECT * FROM cte";

let query_change_log = verified_query(sql_change_log);
let query_changelog = verified_query(sql_changelog);
assert_eq!(
vec![Ident::new_unchecked("col1"), Ident::new_unchecked("col2")],
query_change_log
query_changelog
.with
.unwrap()
.cte_tables
Expand Down
Loading

0 comments on commit 056daf7

Please sign in to comment.