Skip to content

Commit

Permalink
feat: support create table with with non-null default columns and cor…
Browse files Browse the repository at this point in the history
…responding `insert` (#9521)

Signed-off-by: Clearlove <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
y-wei and BugenZhao authored May 5, 2023
1 parent cf358f1 commit 4222535
Show file tree
Hide file tree
Showing 26 changed files with 218 additions and 69 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions e2e_test/batch/basic/table_with_default_columns.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int default 2+3);

statement ok
insert into t1 values (0, 0);

query II
select * from t1;
----
0 0

statement ok
insert into t1 values (1);

query II
select * from t1 where v1 = 1;
----
1 5

statement ok
drop table t1;
16 changes: 6 additions & 10 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,18 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message IndexAndExpr {
uint32 index = 1;
expr.ExprNode expr = 2;
}

message DefaultColumns {
repeated IndexAndExpr default_column = 1;
}

message InsertNode {
// Id of the table to perform inserting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 5;
repeated uint32 column_indices = 2;
DefaultColumns default_columns = 6;

// deprecated, breaking change
// DefaultColumns default_columns = 6 [deprecated = true];
// reserved 6;

plan_common.DefaultColumns default_columns = 6;
// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
// be filled in streaming.
Expand Down
29 changes: 25 additions & 4 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ message ColumnDesc {
// For example, when the type is created from a protobuf schema file,
// this field will store the message name.
string type_name = 5;
// Optional description for the generated column.
GeneratedColumnDesc generated_column = 6;

// deprecated, breaking change
// GeneratedColumnDesc generated_column = 6 [deprecated = true];
// reserved 6;

// Optional description for the generated column or default value.
oneof generated_or_default_column {
GeneratedColumnDesc generated_column = 6;
DefaultColumnDesc default_column = 7;
}
}

message ColumnCatalog {
Expand All @@ -40,6 +48,10 @@ message GeneratedColumnDesc {
expr.ExprNode expr = 1;
}

message DefaultColumnDesc {
expr.ExprNode expr = 1;
}

message StorageTableDesc {
uint32 table_id = 1;
repeated ColumnDesc columns = 2;
Expand All @@ -49,8 +61,8 @@ message StorageTableDesc {
uint32 retention_seconds = 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
// Whether the table is versioned. If `true`, column-aware row encoding will be used
// to be compatible with schema changes.
// Whether the table is versioned. If `true`, column-aware row encoding will
// be used to be compatible with schema changes.
bool versioned = 8;
}

Expand Down Expand Up @@ -83,3 +95,12 @@ enum RowFormatType {
UPSERT_AVRO = 11;
DEBEZIUM_MONGO_JSON = 12;
}

message IndexAndExpr {
uint32 index = 1;
expr.ExprNode expr = 2;
}

message DefaultColumns {
repeated IndexAndExpr default_columns = 1;
}
4 changes: 2 additions & 2 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::IndexAndExpr;
use risingwave_pb::plan_common::IndexAndExpr;
use risingwave_source::dml_manager::DmlManagerRef;

use crate::executor::{
Expand Down Expand Up @@ -210,7 +210,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
.collect();
let sorted_default_columns = if let Some(default_columns) = &insert_node.default_columns {
let mut default_columns = default_columns
.get_default_column()
.get_default_columns()
.iter()
.cloned()
.map(|IndexAndExpr { index: i, expr: e }| {
Expand Down
38 changes: 27 additions & 11 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use std::borrow::Cow;

use itertools::Itertools;
use risingwave_pb::plan_common::{GeneratedColumnDesc, PbColumnCatalog, PbColumnDesc};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};

use super::row_id_column_desc;
use crate::catalog::{Field, ROW_ID_COLUMN_ID};
Expand Down Expand Up @@ -93,7 +94,7 @@ pub struct ColumnDesc {
pub name: String,
pub field_descs: Vec<ColumnDesc>,
pub type_name: String,
pub generated_column: Option<GeneratedColumnDesc>,
pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
}

impl ColumnDesc {
Expand All @@ -104,7 +105,7 @@ impl ColumnDesc {
name: String::new(),
field_descs: vec![],
type_name: String::new(),
generated_column: None,
generated_or_default_column: None,
}
}

Expand All @@ -121,7 +122,7 @@ impl ColumnDesc {
.map(|f| f.to_protobuf())
.collect_vec(),
type_name: self.type_name.clone(),
generated_column: self.generated_column.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
}
}

Expand Down Expand Up @@ -164,7 +165,7 @@ impl ColumnDesc {
name: name.to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
}
}

Expand All @@ -184,7 +185,7 @@ impl ColumnDesc {
name: name.to_string(),
field_descs: fields,
type_name: type_name.to_string(),
generated_column: None,
generated_or_default_column: None,
}
}

Expand All @@ -199,7 +200,7 @@ impl ColumnDesc {
.map(Self::from_field_without_column_id)
.collect_vec(),
type_name: field.type_name.clone(),
generated_column: None,
generated_or_default_column: None,
}
}

Expand All @@ -208,7 +209,17 @@ impl ColumnDesc {
}

pub fn is_generated(&self) -> bool {
self.generated_column.is_some()
matches!(
self.generated_or_default_column,
Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
)
}

pub fn is_default(&self) -> bool {
matches!(
self.generated_or_default_column,
Some(GeneratedOrDefaultColumn::DefaultColumn(_))
)
}
}

Expand All @@ -225,7 +236,7 @@ impl From<PbColumnDesc> for ColumnDesc {
name: prost.name,
type_name: prost.type_name,
field_descs,
generated_column: prost.generated_column,
generated_or_default_column: prost.generated_or_default_column,
}
}
}
Expand All @@ -244,7 +255,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
name: c.name.clone(),
field_descs: c.field_descs.iter().map(ColumnDesc::to_protobuf).collect(),
type_name: c.type_name.clone(),
generated_column: c.generated_column.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
}
}
}
Expand All @@ -263,7 +274,12 @@ impl ColumnCatalog {

/// If the column is a generated column
pub fn is_generated(&self) -> bool {
self.column_desc.generated_column.is_some()
self.column_desc.is_generated()
}

/// If the column is a column with default expr
pub fn is_default(&self) -> bool {
self.column_desc.is_default()
}

/// Get a reference to the column desc's data type.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn row_id_column_desc() -> ColumnDesc {
name: row_id_column_name(),
field_descs: vec![],
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl ColumnDescTestExt for ColumnDesc {
name: name.to_string(),
type_name: type_name.to_string(),
field_descs: fields,
generated_column: None,
generated_or_default_column: None,
}
}
}
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn test_table_materialize() -> StreamResult<()> {
name: field.name,
field_descs: vec![],
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
})
.collect_vec();
let (barrier_tx, barrier_rx) = unbounded_channel();
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) fn avro_field_to_column_desc(
name: name.to_owned(),
field_descs: vec_column,
type_name: schema_name.to_string(),
generated_column: None,
generated_or_default_column: None,
})
}
_ => {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProtobufParserConfig {
column_type: Some(field_type.to_protobuf()),
field_descs,
type_name: m.full_name().to_string(),
generated_column: None,
generated_or_default_column: None,
})
} else {
*index += 1;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<&SourceColumnDesc> for ColumnDesc {
name: s.name.clone(),
field_descs: s.fields.clone(),
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,14 @@
create table t (a int, b int);
insert into t (a) select * from t;
binder_error: 'Bind error: INSERT has more expressions than target columns'
- name: insert to a table with default columns
sql: |
create table t (a int, b int default 2+3);
insert into t values (1);
logical_plan: |
LogicalInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] }
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] }
└─BatchValues { rows: [[1:Int32]] }
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result<ColumnDesc> {
name: f.name.real_value(),
field_descs: vec![],
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
})
})
.collect::<Result<Vec<_>>>()?
Expand All @@ -528,7 +528,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result<ColumnDesc> {
name: column_def.name.real_value(),
field_descs,
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
})
}

Expand Down
15 changes: 12 additions & 3 deletions src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, Schema, TableVersionId};
Expand Down Expand Up @@ -101,12 +101,16 @@ impl Binder {
self.bind_table(schema_name.as_deref(), &table_name, None)?;

let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, true)?;
let default_columns_from_catalog =
table_catalog.default_columns().collect::<BTreeMap<_, _>>();
let table_id = table_catalog.id;
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");
let cols_to_insert_in_table = table_catalog.columns_to_insert().cloned().collect_vec();

let generated_column_names: HashSet<_> = table_catalog.generated_column_names().collect();
let generated_column_names = table_catalog
.generated_column_names()
.collect::<HashSet<_>>();
for col in &cols_to_insert_by_user {
let query_col_name = col.real_value();
if generated_column_names.contains(query_col_name.as_str()) {
Expand Down Expand Up @@ -249,7 +253,12 @@ impl Binder {
.map(|i| {
(
i,
ExprImpl::literal_null(cols_to_insert_in_table[i].data_type().clone()),
default_columns_from_catalog
.get(&i)
.cloned()
.unwrap_or_else(|| {
ExprImpl::literal_null(cols_to_insert_in_table[i].data_type().clone())
}),
)
})
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ macro_rules! def_sys_catalog {
name: col.1.to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_column: None,
generated_or_default_column: None,
},
is_hidden: false,
})
Expand Down
Loading

0 comments on commit 4222535

Please sign in to comment.