Skip to content

Commit

Permalink
default column impure
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 8, 2023
1 parent ab85dc4 commit a575b99
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 62 deletions.
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message GeneratedColumnDesc {

message DefaultColumnDesc {
expr.ExprNode expr = 1;
data.Datum snapshot_value = 2;
}

message StorageTableDesc {
Expand Down
31 changes: 14 additions & 17 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,23 +429,20 @@ impl TableCatalog {
}

pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
self.columns
.iter()
.enumerate()
.filter(|(_, c)| c.is_default())
.map(|(i, c)| {
if let GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr }) =
c.column_desc.generated_or_default_column.clone().unwrap()
{
(
i,
ExprImpl::from_expr_proto(&expr.unwrap())
.expect("expr in default columns corrupted"),
)
} else {
unreachable!()
}
})
self.columns.iter().enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
expr, ..
})) = c.column_desc.generated_or_default_column.as_ref()
{
Some((
i,
ExprImpl::from_expr_proto(expr.as_ref().unwrap())
.expect("expr in default columns corrupted"),
))
} else {
None
}
})
}

pub fn has_generated_column(&self) -> bool {
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,13 @@ impl ExprImpl {
if self.is_const() {
self.eval_row(&OwnedRow::empty())
.now_or_never()
.expect("constant expression should not be async")
.unwrap_or_else(|| {
// TODO: error type
Err(ErrorCode::ExprError(
anyhow::anyhow!("constant expression should not be async").into(),
)
.into())
})
.into()
} else {
None
Expand Down
60 changes: 32 additions & 28 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::catalog::{
use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source::external::ExternalTableType;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc};
Expand All @@ -40,7 +41,7 @@ use super::RwPgResponse;
use crate::binder::{bind_data_type, bind_struct_field, Clause};
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, CatalogError, ColumnId};
use crate::expr::{Expr, ExprImpl};
use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime};
use crate::handler::create_source::{
bind_source_watermark, check_source_schema, try_bind_columns_from_source,
validate_compatibility, UPSTREAM_SOURCE_KEY,
Expand Down Expand Up @@ -231,27 +232,8 @@ fn check_generated_column_constraints(
Ok(())
}

fn check_default_column_constraints(
expr: &ExprImpl,
column_catalogs: &[ColumnCatalog],
) -> Result<()> {
let input_refs = expr.collect_input_refs(column_catalogs.len());
if input_refs.count_ones(..) > 0 {
return Err(ErrorCode::BindError(
"Default can not reference another column, and you should try generated column instead."
.to_string(),
)
.into());
}
if expr.is_impure() {
return Err(
ErrorCode::BindError("impure default expr is not supported.".to_string()).into(),
);
}
Ok(())
}

/// Binds constraints that can be only specified in column definitions.
/// Binds constraints that can be only specified in column definitions,
/// currently generated columns and default columns.
pub fn bind_sql_column_constraints(
session: &SessionImpl,
table_name: String,
Expand Down Expand Up @@ -307,12 +289,34 @@ pub fn bind_sql_column_constraints(
.bind_expr(expr)?
.cast_assign(column_catalogs[idx].data_type().clone())?;

check_default_column_constraints(&expr_impl, column_catalogs)?;

column_catalogs[idx].column_desc.generated_or_default_column =
Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
expr: Some(expr_impl.to_expr_proto()),
}));
// Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
// schema change.
//
// TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
// so the rewritten expression should almost always be pure and we directly call `fold_const`
// here. Actually we do not require purity of the expression here since we're only to get a
// snapshot value.
let rewritten_expr_impl =
InlineNowProcTime::new(session.pinned_snapshot().epoch())
.rewrite_expr(expr_impl.clone());

if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
let snapshot_value = snapshot_value?;

column_catalogs[idx].column_desc.generated_or_default_column = Some(
GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value: Some(snapshot_value.to_protobuf()),
expr: Some(expr_impl.to_expr_proto()), /* persist the original expression */
}),
);
} else {
return Err(ErrorCode::BindError(format!(
"Default expression used in column `{}` cannot be evaluated. \
Use generated columns instead if you mean to reference other columns.",
column.name
))
.into());
}
}
_ => {}
}
Expand Down
38 changes: 22 additions & 16 deletions src/storage/src/row_serde/value_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::{
};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_common::util::value_encoding::{
BasicSerde, BasicSerializer, EitherSerde, ValueRowDeserializer, ValueRowSerdeKind,
ValueRowSerializer,
BasicSerde, BasicSerializer, DatumFromProtoExt, EitherSerde, ValueRowDeserializer,
ValueRowSerdeKind, ValueRowSerializer,
};
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
Expand Down Expand Up @@ -98,21 +98,27 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
}

let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| {
if c.is_default() {
if let GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr }) =
c.generated_or_default_column.clone().unwrap()
{
Some((
i,
build_from_prost(&expr.expect("expr should not be none"))
.expect("build_from_prost error")
.eval_row_infallible(&OwnedRow::empty())
.now_or_never()
.expect("constant expression should not be async"),
))
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value,
expr,
})) = c.generated_or_default_column.clone()
{
// TODO: may not panic on error
let value = if let Some(snapshot_value) = snapshot_value {
// If there's a `snapshot_value`, we can use it directly.
Datum::from_protobuf(&snapshot_value, &c.data_type)
.expect("invalid default value")
} else {
unreachable!()
}
// For backward compatibility, default columns in old tables may not have `snapshot_value`.
// In this case, we need to evaluate the expression to get the default value.
// It's okay since we previously banned impure expressions in default columns.
build_from_prost(&expr.expect("expr should not be none"))
.expect("build_from_prost error")
.eval_row_infallible(&OwnedRow::empty())
.now_or_never()
.expect("constant expression should not be async")
};
Some((i, value))
} else {
None
}
Expand Down

0 comments on commit a575b99

Please sign in to comment.