From 4222535a02517a35409cb5d909be7bddb288e4ed Mon Sep 17 00:00:00 2001 From: Clearlove <52417396+Eurekaaw@users.noreply.github.com> Date: Fri, 5 May 2023 09:34:12 -0400 Subject: [PATCH] feat: support create table with with non-null default columns and corresponding `insert` (#9521) Signed-off-by: Clearlove Co-authored-by: Bugen Zhao --- Cargo.lock | 4 +- .../basic/table_with_default_columns.slt.part | 24 +++++++++++ proto/batch_plan.proto | 16 +++---- proto/plan_common.proto | 29 +++++++++++-- src/batch/src/executor/insert.rs | 4 +- src/common/src/catalog/column.rs | 38 +++++++++++----- src/common/src/catalog/mod.rs | 2 +- src/common/src/catalog/test_utils.rs | 2 +- src/compute/tests/integration_tests.rs | 2 +- src/connector/src/parser/avro/util.rs | 2 +- src/connector/src/parser/protobuf/parser.rs | 2 +- src/connector/src/source/manager.rs | 2 +- .../planner_test/tests/testdata/insert.yaml | 11 +++++ src/frontend/src/binder/expr/mod.rs | 4 +- src/frontend/src/binder/insert.rs | 15 +++++-- .../src/catalog/system_catalog/mod.rs | 2 +- src/frontend/src/catalog/table_catalog.rs | 25 ++++++++++- src/frontend/src/handler/create_source.rs | 6 +-- src/frontend/src/handler/create_table.rs | 43 ++++++++++++++++--- .../src/optimizer/plan_node/batch_insert.rs | 5 ++- .../src/optimizer/plan_node/logical_source.rs | 24 +++++++---- src/prost/build.rs | 5 +++ src/source/src/source_desc.rs | 2 +- src/sqlparser/src/ast/ddl.rs | 4 +- src/sqlparser/src/parser.rs | 2 +- src/sqlparser/tests/sqlparser_postgres.rs | 12 ++++-- 26 files changed, 218 insertions(+), 69 deletions(-) create mode 100644 e2e_test/batch/basic/table_with_default_columns.slt.part diff --git a/Cargo.lock b/Cargo.lock index 063ed111135ea..2324515724a02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2722,9 +2722,9 @@ dependencies = [ [[package]] name = "halfbrown" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389d755cc2bcc8cdb7554b89ddaa59d9275a727ec8915b0e2bc4176c33cc39cb" +checksum = "6d9ab7d9233262d3b74ae4c4a7a090fc4379b07c6eb9b02ecab7cbb12ad67a58" dependencies = [ "hashbrown 0.13.2", "serde", diff --git a/e2e_test/batch/basic/table_with_default_columns.slt.part b/e2e_test/batch/basic/table_with_default_columns.slt.part new file mode 100644 index 0000000000000..1569190fa98be --- /dev/null +++ b/e2e_test/batch/basic/table_with_default_columns.slt.part @@ -0,0 +1,24 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t1 (v1 int, v2 int default 2+3); + +statement ok +insert into t1 values (0, 0); + +query II +select * from t1; +---- +0 0 + +statement ok +insert into t1 values (1); + +query II +select * from t1 where v1 = 1; +---- +1 5 + +statement ok +drop table t1; diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 21c453465e371..df4488a610e18 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -74,22 +74,18 @@ message FilterNode { expr.ExprNode search_condition = 1; } -message IndexAndExpr { - uint32 index = 1; - expr.ExprNode expr = 2; -} - -message DefaultColumns { - repeated IndexAndExpr default_column = 1; -} - message InsertNode { // Id of the table to perform inserting. uint32 table_id = 1; // Version of the table. uint64 table_version_id = 5; repeated uint32 column_indices = 2; - DefaultColumns default_columns = 6; + + // deprecated, breaking change + // DefaultColumns default_columns = 6 [deprecated = true]; + // reserved 6; + + plan_common.DefaultColumns default_columns = 6; // An optional field and will be `None` for tables without user-defined pk. // The `BatchInsertExecutor` should add a column with NULL value which will // be filled in streaming. diff --git a/proto/plan_common.proto b/proto/plan_common.proto index ce5a0b4120534..68b9c76fc7989 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -27,8 +27,16 @@ message ColumnDesc { // For example, when the type is created from a protobuf schema file, // this field will store the message name. string type_name = 5; - // Optional description for the generated column. - GeneratedColumnDesc generated_column = 6; + + // deprecated, breaking change + // GeneratedColumnDesc generated_column = 6 [deprecated = true]; + // reserved 6; + + // Optional description for the generated column or default value. + oneof generated_or_default_column { + GeneratedColumnDesc generated_column = 6; + DefaultColumnDesc default_column = 7; + } } message ColumnCatalog { @@ -40,6 +48,10 @@ message GeneratedColumnDesc { expr.ExprNode expr = 1; } +message DefaultColumnDesc { + expr.ExprNode expr = 1; +} + message StorageTableDesc { uint32 table_id = 1; repeated ColumnDesc columns = 2; @@ -49,8 +61,8 @@ message StorageTableDesc { uint32 retention_seconds = 5; repeated uint32 value_indices = 6; uint32 read_prefix_len_hint = 7; - // Whether the table is versioned. If `true`, column-aware row encoding will be used - // to be compatible with schema changes. + // Whether the table is versioned. If `true`, column-aware row encoding will + // be used to be compatible with schema changes. bool versioned = 8; } @@ -83,3 +95,12 @@ enum RowFormatType { UPSERT_AVRO = 11; DEBEZIUM_MONGO_JSON = 12; } + +message IndexAndExpr { + uint32 index = 1; + expr.ExprNode expr = 2; +} + +message DefaultColumns { + repeated IndexAndExpr default_columns = 1; +} diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 3b4d275cef5f4..7d5a2a708f5f2 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -28,7 +28,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::IndexAndExpr; +use risingwave_pb::plan_common::IndexAndExpr; use risingwave_source::dml_manager::DmlManagerRef; use crate::executor::{ @@ -210,7 +210,7 @@ impl BoxedExecutorBuilder for InsertExecutor { .collect(); let sorted_default_columns = if let Some(default_columns) = &insert_node.default_columns { let mut default_columns = default_columns - .get_default_column() + .get_default_columns() .iter() .cloned() .map(|IndexAndExpr { index: i, expr: e }| { diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index ed59d93447818..e9b2ace90e997 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,7 +15,8 @@ use std::borrow::Cow; use itertools::Itertools; -use risingwave_pb::plan_common::{GeneratedColumnDesc, PbColumnCatalog, PbColumnDesc}; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; +use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; use super::row_id_column_desc; use crate::catalog::{Field, ROW_ID_COLUMN_ID}; @@ -93,7 +94,7 @@ pub struct ColumnDesc { pub name: String, pub field_descs: Vec, pub type_name: String, - pub generated_column: Option, + pub generated_or_default_column: Option, } impl ColumnDesc { @@ -104,7 +105,7 @@ impl ColumnDesc { name: String::new(), field_descs: vec![], type_name: String::new(), - generated_column: None, + generated_or_default_column: None, } } @@ -121,7 +122,7 @@ impl ColumnDesc { .map(|f| f.to_protobuf()) .collect_vec(), type_name: self.type_name.clone(), - generated_column: self.generated_column.clone(), + generated_or_default_column: self.generated_or_default_column.clone(), } } @@ -164,7 +165,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, } } @@ -184,7 +185,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: fields, type_name: type_name.to_string(), - generated_column: None, + generated_or_default_column: None, } } @@ -199,7 +200,7 @@ impl ColumnDesc { .map(Self::from_field_without_column_id) .collect_vec(), type_name: field.type_name.clone(), - generated_column: None, + generated_or_default_column: None, } } @@ -208,7 +209,17 @@ impl ColumnDesc { } pub fn is_generated(&self) -> bool { - self.generated_column.is_some() + matches!( + self.generated_or_default_column, + Some(GeneratedOrDefaultColumn::GeneratedColumn(_)) + ) + } + + pub fn is_default(&self) -> bool { + matches!( + self.generated_or_default_column, + Some(GeneratedOrDefaultColumn::DefaultColumn(_)) + ) } } @@ -225,7 +236,7 @@ impl From for ColumnDesc { name: prost.name, type_name: prost.type_name, field_descs, - generated_column: prost.generated_column, + generated_or_default_column: prost.generated_or_default_column, } } } @@ -244,7 +255,7 @@ impl From<&ColumnDesc> for PbColumnDesc { name: c.name.clone(), field_descs: c.field_descs.iter().map(ColumnDesc::to_protobuf).collect(), type_name: c.type_name.clone(), - generated_column: c.generated_column.clone(), + generated_or_default_column: c.generated_or_default_column.clone(), } } } @@ -263,7 +274,12 @@ impl ColumnCatalog { /// If the column is a generated column pub fn is_generated(&self) -> bool { - self.column_desc.generated_column.is_some() + self.column_desc.is_generated() + } + + /// If the column is a column with default expr + pub fn is_default(&self) -> bool { + self.column_desc.is_default() } /// Get a reference to the column desc's data type. diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index ddb78a4cc8f12..7b958673b0f6d 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -89,7 +89,7 @@ pub fn row_id_column_desc() -> ColumnDesc { name: row_id_column_name(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, } } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index e0df76709ad4c..2cce9b79b346e 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -56,7 +56,7 @@ impl ColumnDescTestExt for ColumnDesc { name: name.to_string(), type_name: type_name.to_string(), field_descs: fields, - generated_column: None, + generated_or_default_column: None, } } } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index b211f6e419700..5bef206861e07 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -151,7 +151,7 @@ async fn test_table_materialize() -> StreamResult<()> { name: field.name, field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }) .collect_vec(); let (barrier_tx, barrier_rx) = unbounded_channel(); diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index cae07a39e19a5..5adf7b9bc9c60 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -47,7 +47,7 @@ pub(crate) fn avro_field_to_column_desc( name: name.to_owned(), field_descs: vec_column, type_name: schema_name.to_string(), - generated_column: None, + generated_or_default_column: None, }) } _ => { diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 4a8ed815ba4dd..d92679c0bc0ef 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -155,7 +155,7 @@ impl ProtobufParserConfig { column_type: Some(field_type.to_protobuf()), field_descs, type_name: m.full_name().to_string(), - generated_column: None, + generated_or_default_column: None, }) } else { *index += 1; diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 466454a5c3d38..a3122ed6d4075 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -78,7 +78,7 @@ impl From<&SourceColumnDesc> for ColumnDesc { name: s.name.clone(), field_descs: s.fields.clone(), type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, } } } diff --git a/src/frontend/planner_test/tests/testdata/insert.yaml b/src/frontend/planner_test/tests/testdata/insert.yaml index 77dc718a2bc82..1776d9edc761b 100644 --- a/src/frontend/planner_test/tests/testdata/insert.yaml +++ b/src/frontend/planner_test/tests/testdata/insert.yaml @@ -323,3 +323,14 @@ create table t (a int, b int); insert into t (a) select * from t; binder_error: 'Bind error: INSERT has more expressions than target columns' +- name: insert to a table with default columns + sql: | + create table t (a int, b int default 2+3); + insert into t values (1); + logical_plan: | + LogicalInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] } + └─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } } + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] } + └─BatchValues { rows: [[1:Int32]] } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 155b6d8c1aba2..257291c7c9517 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -515,7 +515,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: f.name.real_value(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }) }) .collect::>>()? @@ -528,7 +528,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: column_def.name.real_value(), field_descs, type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }) } diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index 81f42fad536f9..11893ccad033a 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, Schema, TableVersionId}; @@ -101,12 +101,16 @@ impl Binder { self.bind_table(schema_name.as_deref(), &table_name, None)?; let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, true)?; + let default_columns_from_catalog = + table_catalog.default_columns().collect::>(); let table_id = table_catalog.id; let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); let cols_to_insert_in_table = table_catalog.columns_to_insert().cloned().collect_vec(); - let generated_column_names: HashSet<_> = table_catalog.generated_column_names().collect(); + let generated_column_names = table_catalog + .generated_column_names() + .collect::>(); for col in &cols_to_insert_by_user { let query_col_name = col.real_value(); if generated_column_names.contains(query_col_name.as_str()) { @@ -249,7 +253,12 @@ impl Binder { .map(|i| { ( i, - ExprImpl::literal_null(cols_to_insert_in_table[i].data_type().clone()), + default_columns_from_catalog + .get(&i) + .cloned() + .unwrap_or_else(|| { + ExprImpl::literal_null(cols_to_insert_in_table[i].data_type().clone()) + }), ) }) .collect_vec(); diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 70b8cbf199f53..7826c19175976 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -131,7 +131,7 @@ macro_rules! def_sys_catalog { name: col.1.to_string(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }, is_hidden: false, }) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 9e46bbb1f15fb..57fbcacd504d2 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -24,8 +24,11 @@ use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; use risingwave_pb::catalog::PbTable; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; +use risingwave_pb::plan_common::DefaultColumnDesc; use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId}; +use crate::expr::ExprImpl; use crate::user::UserId; use crate::WithOptions; @@ -390,6 +393,26 @@ impl TableCatalog { .map(|c| c.name()) } + pub fn default_columns(&self) -> impl Iterator + '_ { + self.columns + .iter() + .enumerate() + .filter(|(_, c)| c.is_default()) + .map(|(i, c)| { + if let GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr }) = + c.column_desc.generated_or_default_column.clone().unwrap() + { + ( + i, + ExprImpl::from_expr_proto(&expr.unwrap()) + .expect("expr in default columns corrupted"), + ) + } else { + unreachable!() + } + }) + } + pub fn has_generated_column(&self) -> bool { self.columns.iter().any(|c| c.is_generated()) } @@ -561,7 +584,7 @@ mod tests { ColumnDesc::new_atomic(DataType::Varchar, "zipcode", 3), ], type_name: ".test.Country".to_string(), - generated_column: None, + generated_or_default_column: None, }, is_hidden: false } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index b5e589ab06e24..c9e5ec0b69ad2 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -353,7 +353,7 @@ pub(crate) async fn resolve_source_schema( name: "_id".to_string(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }, is_hidden: false, }); @@ -364,7 +364,7 @@ pub(crate) async fn resolve_source_schema( name: "payload".to_string(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }, is_hidden: false, }); @@ -531,7 +531,7 @@ fn check_and_add_timestamp_column( name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }; column_descs.push(kafka_timestamp_column); } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ce7bd78b88412..689720f4ccf8c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -25,7 +25,8 @@ use risingwave_common::catalog::{ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc}; -use risingwave_pb::plan_common::GeneratedColumnDesc; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; +use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark, @@ -165,7 +166,7 @@ pub fn bind_sql_columns( name: name.real_value(), field_descs, type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, }); } @@ -194,6 +195,21 @@ fn check_generated_column_constraints( Ok(()) } +fn check_default_column_constraints( + expr: &ExprImpl, + column_catalogs: &[ColumnCatalog], +) -> Result<()> { + let input_refs = expr.collect_input_refs(column_catalogs.len()); + if input_refs.count_ones(..) > 0 { + return Err(ErrorCode::BindError( + "Default can not reference another column, and you should try generated column instead." + .to_string(), + ) + .into()); + } + Ok(()) +} + /// Binds constraints that can be only specified in column definitions. pub fn bind_sql_column_constraints( session: &SessionImpl, @@ -216,6 +232,7 @@ pub fn bind_sql_column_constraints( let mut binder = Binder::new_for_ddl(session); binder.bind_columns_to_context(table_name.clone(), column_catalogs.to_vec())?; + for column in columns { for option_def in column.options { match option_def.option { @@ -231,9 +248,23 @@ pub fn bind_sql_column_constraints( &generated_column_names, )?; - column_catalogs[idx].column_desc.generated_column = Some(GeneratedColumnDesc { - expr: Some(expr_impl.to_expr_proto()), - }); + column_catalogs[idx].column_desc.generated_or_default_column = Some( + GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc { + expr: Some(expr_impl.to_expr_proto()), + }), + ); + } + ColumnOption::DefaultColumns(expr) => { + let idx = binder + .get_column_binding_index(table_name.clone(), &column.name.real_value())?; + let expr_impl = binder.bind_expr(expr)?; + + check_default_column_constraints(&expr_impl, column_catalogs)?; + + column_catalogs[idx].column_desc.generated_or_default_column = + Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { + expr: Some(expr_impl.to_expr_proto()), + })); } ColumnOption::Unique { is_primary: true } => { // Bind primary key in `bind_sql_table_column_constraints` @@ -280,7 +311,7 @@ pub fn bind_sql_table_column_constraints( } pk_column_names.push(column.name.real_value()); } - ColumnOption::GeneratedColumns(_) => { + ColumnOption::GeneratedColumns(_) | ColumnOption::DefaultColumns(_) => { // Bind generated columns in `bind_sql_column_constraints` } _ => { diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 762813fedbf52..95f88c0db19dd 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -17,7 +17,8 @@ use std::fmt; use itertools::Itertools; use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::{DefaultColumns, IndexAndExpr, InsertNode}; +use risingwave_pb::batch_plan::InsertNode; +use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr}; use super::{ ExprRewritable, LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, @@ -85,7 +86,7 @@ impl ToBatchPb for BatchInsert { let default_columns = self.logical.default_columns(); let has_default_columns = !default_columns.is_empty(); let default_columns = DefaultColumns { - default_column: default_columns + default_columns: default_columns .into_iter() .map(|(i, expr)| IndexAndExpr { index: i as u32, diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index df658cd3a21ee..c39336f9f53d9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; @@ -129,7 +130,7 @@ impl LogicalSource { pub fn gen_optional_generated_column_project_exprs( column_descs: Vec, ) -> Result>> { - if !column_descs.iter().any(|c| c.generated_column.is_some()) { + if !column_descs.iter().any(|c| c.is_generated()) { return Ok(None); } @@ -137,7 +138,7 @@ impl LogicalSource { let mut mapping = vec![None; column_descs.len()]; let mut cur = 0; for (idx, column_desc) in column_descs.iter().enumerate() { - if column_desc.generated_column.is_none() { + if !column_desc.is_generated() { mapping[idx] = Some(cur); cur += 1; } else { @@ -152,12 +153,19 @@ impl LogicalSource { let mut cur = 0; for column_desc in column_descs { let ret_data_type = column_desc.data_type.clone(); - if let Some(generated_column) = column_desc.generated_column { - let GeneratedColumnDesc { expr } = generated_column; - // TODO(yuhao): avoid this `from_expr_proto`. - let proj_expr = rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); - let casted_expr = proj_expr.cast_assign(column_desc.data_type)?; - exprs.push(casted_expr); + if column_desc.is_generated() { + if let GeneratedOrDefaultColumn::GeneratedColumn(generated_column) = + column_desc.generated_or_default_column.unwrap() + { + let GeneratedColumnDesc { expr } = generated_column; + // TODO(yuhao): avoid this `from_expr_proto`. + let proj_expr = + rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); + let casted_expr = proj_expr.cast_assign(column_desc.data_type)?; + exprs.push(casted_expr); + } else { + unreachable!() + } } else { let input_ref = InputRef { data_type: ret_data_type, diff --git a/src/prost/build.rs b/src/prost/build.rs index 05f1d2c4e70f6..bd1f748415d61 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -65,7 +65,12 @@ fn main() -> Result<(), Box> { .type_attribute("expr.FunctionCall", "#[derive(Eq, Hash)]") .type_attribute("expr.UserDefinedFunction", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute( + "plan_common.ColumnDesc.generated_or_default_column", + "#[derive(Eq, Hash)]", + ) .type_attribute("plan_common.GeneratedColumnDesc", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.DefaultColumnDesc", "#[derive(Eq, Hash)]") .out_dir(out_dir.as_path()) .compile(&protos, &[proto_dir.to_string()]) .expect("Failed to compile grpc!"); diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 397da173f6815..2b9fa9771ca00 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -193,7 +193,7 @@ pub mod test_utils { name: f.name.clone(), field_descs: vec![], type_name: "".to_string(), - generated_column: None, + generated_or_default_column: None, } .to_protobuf(), ), diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 47ec88350d230..7b6bd3e911eb8 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -419,7 +419,7 @@ pub enum ColumnOption { /// `NOT NULL` NotNull, /// `DEFAULT ` - Default(Expr), + DefaultColumns(Expr), /// `{ PRIMARY KEY | UNIQUE }` Unique { is_primary: bool }, /// A referential integrity constraint (`[FOREIGN KEY REFERENCES @@ -449,7 +449,7 @@ impl fmt::Display for ColumnOption { match self { Null => write!(f, "NULL"), NotNull => write!(f, "NOT NULL"), - Default(expr) => write!(f, "DEFAULT {}", expr), + DefaultColumns(expr) => write!(f, "DEFAULT {}", expr), Unique { is_primary } => { write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" }) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 4b0aa39a211ca..5d356c206fd13 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2327,7 +2327,7 @@ impl Parser { } else if self.parse_keyword(Keyword::NULL) { Ok(Some(ColumnOption::Null)) } else if self.parse_keyword(Keyword::DEFAULT) { - Ok(Some(ColumnOption::Default(self.parse_expr()?))) + Ok(Some(ColumnOption::DefaultColumns(self.parse_expr()?))) } else if self.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) { Ok(Some(ColumnOption::Unique { is_primary: true })) } else if self.parse_keyword(Keyword::UNIQUE) { diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index d78e71d99e080..4dd7e3197cf57 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -52,7 +52,7 @@ fn parse_create_table_with_defaults() { None, vec![ColumnOptionDef { name: None, - option: ColumnOption::Default(verified_expr( + option: ColumnOption::DefaultColumns(verified_expr( "nextval(public.customer_customer_id_seq)" )) }], @@ -101,7 +101,9 @@ fn parse_create_table_with_defaults() { vec![ ColumnOptionDef { name: None, - option: ColumnOption::Default(Expr::Value(Value::Boolean(true))), + option: ColumnOption::DefaultColumns(Expr::Value(Value::Boolean( + true + ))), }, ColumnOptionDef { name: None, @@ -116,7 +118,9 @@ fn parse_create_table_with_defaults() { vec![ ColumnOptionDef { name: None, - option: ColumnOption::Default(verified_expr("CAST(now() AS TEXT)")) + option: ColumnOption::DefaultColumns(verified_expr( + "CAST(now() AS TEXT)" + )) }, ColumnOptionDef { name: None, @@ -131,7 +135,7 @@ fn parse_create_table_with_defaults() { vec![ ColumnOptionDef { name: None, - option: ColumnOption::Default(verified_expr("now()")), + option: ColumnOption::DefaultColumns(verified_expr("now()")), }, ColumnOptionDef { name: None,