From b877e18e8f22a78b859ecb37d6b7d645bb9be2d8 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 13:49:59 +0800 Subject: [PATCH] evaluate scalar expr non-strictly in streaming Signed-off-by: Runji Wang --- e2e_test/streaming/bug_fixes/issue_12474.slt | 26 +++++++ src/batch/src/executor/project_set.rs | 58 ++++++++++++++- src/expr/core/src/table_function/mod.rs | 53 +------------- src/stream/src/executor/project_set.rs | 77 +++++++++++++++++--- src/stream/src/from_proto/project_set.rs | 9 ++- 5 files changed, 157 insertions(+), 66 deletions(-) create mode 100644 e2e_test/streaming/bug_fixes/issue_12474.slt diff --git a/e2e_test/streaming/bug_fixes/issue_12474.slt b/e2e_test/streaming/bug_fixes/issue_12474.slt new file mode 100644 index 000000000000..bb4d8e9d991e --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12474.slt @@ -0,0 +1,26 @@ +# https://github.com/risingwavelabs/risingwave/issues/12474 + +statement ok +create table t(x int[]); + +statement ok +create materialized view mv as select 1 / x[1] as bomb, unnest(x) as unnest from t; + +statement ok +insert into t values (array[0, 1]), (array[1]); + +statement ok +flush; + +query II rowsort +select * from mv; +---- +NULL 0 +NULL 1 +1 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index 2d50c1039743..7784159eb6c7 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -15,13 +15,16 @@ use either::Either; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, BoxedExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::error::{BatchError, Result}; use crate::executor::{ @@ -170,6 +173,57 @@ impl BoxedExecutorBuilder for ProjectSetExecutor { } } +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(BoxedExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: BoxedExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { + Ok(match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?), + PbSelectItem::TableFunction(tf) => { + Self::Set(table_function::build_from_prost(tf, chunk_size)?) + } + }) + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>> { + match self { + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)), + } + } +} + #[cfg(test)] mod tests { use futures::stream::StreamExt; diff --git a/src/expr/core/src/table_function/mod.rs b/src/expr/core/src/table_function/mod.rs index b87d6f020b09..c14a50a8f41a 100644 --- a/src/expr/core/src/table_function/mod.rs +++ b/src/expr/core/src/table_function/mod.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use futures_async_stream::try_stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; -use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk}; +use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk}; use risingwave_common::types::{DataType, DatumRef}; -use risingwave_pb::expr::project_set_select_item::SelectItem; use risingwave_pb::expr::table_function::PbType; -use risingwave_pb::expr::{PbProjectSetSelectItem, PbTableFunction}; +use risingwave_pb::expr::PbTableFunction; use super::{ExprError, Result}; use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression}; @@ -141,53 +139,6 @@ pub fn build( desc.build_table(return_type, chunk_size, children) } -/// See also [`PbProjectSetSelectItem`] -#[derive(Debug)] -pub enum ProjectSetSelectItem { - TableFunction(BoxedTableFunction), - Expr(BoxedExpression), -} - -impl From for ProjectSetSelectItem { - fn from(table_function: BoxedTableFunction) -> Self { - ProjectSetSelectItem::TableFunction(table_function) - } -} - -impl From for ProjectSetSelectItem { - fn from(expr: BoxedExpression) -> Self { - ProjectSetSelectItem::Expr(expr) - } -} - -impl ProjectSetSelectItem { - pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { - match prost.select_item.as_ref().unwrap() { - SelectItem::Expr(expr) => expr_build_from_prost(expr).map(Into::into), - SelectItem::TableFunction(tf) => build_from_prost(tf, chunk_size).map(Into::into), - } - } - - pub fn return_type(&self) -> DataType { - match self { - ProjectSetSelectItem::TableFunction(tf) => tf.return_type(), - ProjectSetSelectItem::Expr(expr) => expr.return_type(), - } - } - - pub async fn eval<'a>( - &'a self, - input: &'a DataChunk, - ) -> Result, ArrayRef>> { - match self { - Self::TableFunction(tf) => Ok(Either::Left( - TableFunctionOutputIter::new(tf.eval(input).await).await?, - )), - Self::Expr(expr) => expr.eval(input).await.map(Either::Right), - } - } -} - /// A wrapper over the output of table function that allows iteration by rows. /// /// If the table function returns multiple columns, the output will be struct values. diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 43a2d65cbfb1..701f4fc5d27d 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -14,13 +14,16 @@ use either::Either; use multimap::MultiMap; -use risingwave_common::array::Op; +use risingwave_common::array::{ArrayRef, DataChunk, Op}; use risingwave_common::bail; use risingwave_common::row::RowExt; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::{LogReport, NonStrictExpression}; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, EvalErrorReport, NonStrictExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; +use risingwave_expr::ExprError; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::executor::prelude::*; @@ -226,17 +229,13 @@ impl Inner { for expr_idx in expr_indices { let expr_idx = *expr_idx; let derived_watermark = match &self.select_list[expr_idx] { - ProjectSetSelectItem::Expr(expr) => { + ProjectSetSelectItem::Scalar(expr) => { watermark .clone() - .transform_with_expr( - // TODO: should we build `expr` in non-strict mode? - &NonStrictExpression::new_topmost(expr, LogReport), - expr_idx + PROJ_ROW_ID_OFFSET, - ) + .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET) .await } - ProjectSetSelectItem::TableFunction(_) => { + ProjectSetSelectItem::Set(_) => { bail!("Watermark should not be produced by a table function"); } }; @@ -253,3 +252,61 @@ impl Inner { Ok(ret) } } + +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(NonStrictExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: NonStrictExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost( + prost: &PbProjectSetSelectItem, + error_report: impl EvalErrorReport + 'static, + chunk_size: usize, + ) -> Result { + match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => { + expr::build_non_strict_from_prost(expr, error_report).map(Self::Scalar) + } + PbSelectItem::TableFunction(tf) => { + table_function::build_from_prost(tf, chunk_size).map(Self::Set) + } + } + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>, ExprError> { + match self { + Self::Scalar(expr) => Ok(Either::Right(expr.eval_infallible(input).await)), + // FIXME(runji): table function should also be evaluated non strictly + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + } + } +} diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index c2338394b33e..155cc20a203a 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -14,11 +14,10 @@ use multimap::MultiMap; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; use risingwave_pb::stream_plan::ProjectSetNode; use super::*; -use crate::executor::ProjectSetExecutor; +use crate::executor::{ProjectSetExecutor, ProjectSetSelectItem}; pub struct ProjectSetExecutorBuilder; @@ -35,7 +34,11 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder { .get_select_list() .iter() .map(|proto| { - ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size) + ProjectSetSelectItem::from_prost( + proto, + params.eval_error_report.clone(), + params.env.config().developer.chunk_size, + ) }) .try_collect()?; let watermark_derivations = MultiMap::from_iter(