From a575b99b28c13001edfc6ca8164b3184df386b47 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sun, 8 Oct 2023 16:59:34 +0800 Subject: [PATCH] default column impure Signed-off-by: Bugen Zhao --- proto/plan_common.proto | 1 + src/frontend/src/catalog/table_catalog.rs | 31 ++++++------ src/frontend/src/expr/mod.rs | 8 ++- src/frontend/src/handler/create_table.rs | 60 ++++++++++++----------- src/storage/src/row_serde/value_serde.rs | 38 ++++++++------ 5 files changed, 76 insertions(+), 62 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 24ed4c8372b86..e4f025589cd65 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -50,6 +50,7 @@ message GeneratedColumnDesc { message DefaultColumnDesc { expr.ExprNode expr = 1; + data.Datum snapshot_value = 2; } message StorageTableDesc { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 778b43c0598f5..0fadf44ad40cf 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -429,23 +429,20 @@ impl TableCatalog { } pub fn default_columns(&self) -> impl Iterator + '_ { - self.columns - .iter() - .enumerate() - .filter(|(_, c)| c.is_default()) - .map(|(i, c)| { - if let GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr }) = - c.column_desc.generated_or_default_column.clone().unwrap() - { - ( - i, - ExprImpl::from_expr_proto(&expr.unwrap()) - .expect("expr in default columns corrupted"), - ) - } else { - unreachable!() - } - }) + self.columns.iter().enumerate().filter_map(|(i, c)| { + if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { + expr, .. + })) = c.column_desc.generated_or_default_column.as_ref() + { + Some(( + i, + ExprImpl::from_expr_proto(expr.as_ref().unwrap()) + .expect("expr in default columns corrupted"), + )) + } else { + None + } + }) } pub fn has_generated_column(&self) -> bool { diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 0af30ccb364f8..561bd22c60567 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -323,7 +323,13 @@ impl ExprImpl { if self.is_const() { self.eval_row(&OwnedRow::empty()) .now_or_never() - .expect("constant expression should not be async") + .unwrap_or_else(|| { + // TODO: error type + Err(ErrorCode::ExprError( + anyhow::anyhow!("constant expression should not be async").into(), + ) + .into()) + }) .into() } else { None diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 81f60b64702f2..19d9a2f25c4b8 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source::external::ExternalTableType; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc}; @@ -40,7 +41,7 @@ use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field, Clause}; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, CatalogError, ColumnId}; -use crate::expr::{Expr, ExprImpl}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ bind_source_watermark, check_source_schema, try_bind_columns_from_source, validate_compatibility, UPSTREAM_SOURCE_KEY, @@ -231,27 +232,8 @@ fn check_generated_column_constraints( Ok(()) } -fn check_default_column_constraints( - expr: &ExprImpl, - column_catalogs: &[ColumnCatalog], -) -> Result<()> { - let input_refs = expr.collect_input_refs(column_catalogs.len()); - if input_refs.count_ones(..) > 0 { - return Err(ErrorCode::BindError( - "Default can not reference another column, and you should try generated column instead." - .to_string(), - ) - .into()); - } - if expr.is_impure() { - return Err( - ErrorCode::BindError("impure default expr is not supported.".to_string()).into(), - ); - } - Ok(()) -} - -/// Binds constraints that can be only specified in column definitions. +/// Binds constraints that can be only specified in column definitions, +/// currently generated columns and default columns. pub fn bind_sql_column_constraints( session: &SessionImpl, table_name: String, @@ -307,12 +289,34 @@ pub fn bind_sql_column_constraints( .bind_expr(expr)? .cast_assign(column_catalogs[idx].data_type().clone())?; - check_default_column_constraints(&expr_impl, column_catalogs)?; - - column_catalogs[idx].column_desc.generated_or_default_column = - Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { - expr: Some(expr_impl.to_expr_proto()), - })); + // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of + // schema change. + // + // TODO: Currently we don't support impure expressions other than `now()` (like `random()`), + // so the rewritten expression should almost always be pure and we directly call `fold_const` + // here. Actually we do not require purity of the expression here since we're only to get a + // snapshot value. + let rewritten_expr_impl = + InlineNowProcTime::new(session.pinned_snapshot().epoch()) + .rewrite_expr(expr_impl.clone()); + + if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() { + let snapshot_value = snapshot_value?; + + column_catalogs[idx].column_desc.generated_or_default_column = Some( + GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { + snapshot_value: Some(snapshot_value.to_protobuf()), + expr: Some(expr_impl.to_expr_proto()), /* persist the original expression */ + }), + ); + } else { + return Err(ErrorCode::BindError(format!( + "Default expression used in column `{}` cannot be evaluated. \ + Use generated columns instead if you mean to reference other columns.", + column.name + )) + .into()); + } } _ => {} } diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index 96ff78b9aa69e..5d56cdba2d96d 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -27,8 +27,8 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::{ }; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_common::util::value_encoding::{ - BasicSerde, BasicSerializer, EitherSerde, ValueRowDeserializer, ValueRowSerdeKind, - ValueRowSerializer, + BasicSerde, BasicSerializer, DatumFromProtoExt, EitherSerde, ValueRowDeserializer, + ValueRowSerdeKind, ValueRowSerializer, }; use risingwave_expr::expr::build_from_prost; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; @@ -98,21 +98,27 @@ impl ValueRowSerdeNew for ColumnAwareSerde { } let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| { - if c.is_default() { - if let GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr }) = - c.generated_or_default_column.clone().unwrap() - { - Some(( - i, - build_from_prost(&expr.expect("expr should not be none")) - .expect("build_from_prost error") - .eval_row_infallible(&OwnedRow::empty()) - .now_or_never() - .expect("constant expression should not be async"), - )) + if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { + snapshot_value, + expr, + })) = c.generated_or_default_column.clone() + { + // TODO: may not panic on error + let value = if let Some(snapshot_value) = snapshot_value { + // If there's a `snapshot_value`, we can use it directly. + Datum::from_protobuf(&snapshot_value, &c.data_type) + .expect("invalid default value") } else { - unreachable!() - } + // For backward compatibility, default columns in old tables may not have `snapshot_value`. + // In this case, we need to evaluate the expression to get the default value. + // It's okay since we previously banned impure expressions in default columns. + build_from_prost(&expr.expect("expr should not be none")) + .expect("build_from_prost error") + .eval_row_infallible(&OwnedRow::empty()) + .now_or_never() + .expect("constant expression should not be async") + }; + Some((i, value)) } else { None }