From b877e18e8f22a78b859ecb37d6b7d645bb9be2d8 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 13:49:59 +0800 Subject: [PATCH 1/5] evaluate scalar expr non-strictly in streaming Signed-off-by: Runji Wang --- e2e_test/streaming/bug_fixes/issue_12474.slt | 26 +++++++ src/batch/src/executor/project_set.rs | 58 ++++++++++++++- src/expr/core/src/table_function/mod.rs | 53 +------------- src/stream/src/executor/project_set.rs | 77 +++++++++++++++++--- src/stream/src/from_proto/project_set.rs | 9 ++- 5 files changed, 157 insertions(+), 66 deletions(-) create mode 100644 e2e_test/streaming/bug_fixes/issue_12474.slt diff --git a/e2e_test/streaming/bug_fixes/issue_12474.slt b/e2e_test/streaming/bug_fixes/issue_12474.slt new file mode 100644 index 0000000000000..bb4d8e9d991eb --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12474.slt @@ -0,0 +1,26 @@ +# https://github.com/risingwavelabs/risingwave/issues/12474 + +statement ok +create table t(x int[]); + +statement ok +create materialized view mv as select 1 / x[1] as bomb, unnest(x) as unnest from t; + +statement ok +insert into t values (array[0, 1]), (array[1]); + +statement ok +flush; + +query II rowsort +select * from mv; +---- +NULL 0 +NULL 1 +1 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index 2d50c1039743c..7784159eb6c70 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -15,13 +15,16 @@ use either::Either; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, BoxedExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::error::{BatchError, Result}; use crate::executor::{ @@ -170,6 +173,57 @@ impl BoxedExecutorBuilder for ProjectSetExecutor { } } +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(BoxedExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: BoxedExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { + Ok(match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?), + PbSelectItem::TableFunction(tf) => { + Self::Set(table_function::build_from_prost(tf, chunk_size)?) + } + }) + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>> { + match self { + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)), + } + } +} + #[cfg(test)] mod tests { use futures::stream::StreamExt; diff --git a/src/expr/core/src/table_function/mod.rs b/src/expr/core/src/table_function/mod.rs index b87d6f020b093..c14a50a8f41a4 100644 --- a/src/expr/core/src/table_function/mod.rs +++ b/src/expr/core/src/table_function/mod.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use futures_async_stream::try_stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; -use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk}; +use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk}; use risingwave_common::types::{DataType, DatumRef}; -use risingwave_pb::expr::project_set_select_item::SelectItem; use risingwave_pb::expr::table_function::PbType; -use risingwave_pb::expr::{PbProjectSetSelectItem, PbTableFunction}; +use risingwave_pb::expr::PbTableFunction; use super::{ExprError, Result}; use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression}; @@ -141,53 +139,6 @@ pub fn build( desc.build_table(return_type, chunk_size, children) } -/// See also [`PbProjectSetSelectItem`] -#[derive(Debug)] -pub enum ProjectSetSelectItem { - TableFunction(BoxedTableFunction), - Expr(BoxedExpression), -} - -impl From for ProjectSetSelectItem { - fn from(table_function: BoxedTableFunction) -> Self { - ProjectSetSelectItem::TableFunction(table_function) - } -} - -impl From for ProjectSetSelectItem { - fn from(expr: BoxedExpression) -> Self { - ProjectSetSelectItem::Expr(expr) - } -} - -impl ProjectSetSelectItem { - pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { - match prost.select_item.as_ref().unwrap() { - SelectItem::Expr(expr) => expr_build_from_prost(expr).map(Into::into), - SelectItem::TableFunction(tf) => build_from_prost(tf, chunk_size).map(Into::into), - } - } - - pub fn return_type(&self) -> DataType { - match self { - ProjectSetSelectItem::TableFunction(tf) => tf.return_type(), - ProjectSetSelectItem::Expr(expr) => expr.return_type(), - } - } - - pub async fn eval<'a>( - &'a self, - input: &'a DataChunk, - ) -> Result, ArrayRef>> { - match self { - Self::TableFunction(tf) => Ok(Either::Left( - TableFunctionOutputIter::new(tf.eval(input).await).await?, - )), - Self::Expr(expr) => expr.eval(input).await.map(Either::Right), - } - } -} - /// A wrapper over the output of table function that allows iteration by rows. /// /// If the table function returns multiple columns, the output will be struct values. diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 43a2d65cbfb11..701f4fc5d27df 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -14,13 +14,16 @@ use either::Either; use multimap::MultiMap; -use risingwave_common::array::Op; +use risingwave_common::array::{ArrayRef, DataChunk, Op}; use risingwave_common::bail; use risingwave_common::row::RowExt; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::{LogReport, NonStrictExpression}; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, EvalErrorReport, NonStrictExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; +use risingwave_expr::ExprError; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::executor::prelude::*; @@ -226,17 +229,13 @@ impl Inner { for expr_idx in expr_indices { let expr_idx = *expr_idx; let derived_watermark = match &self.select_list[expr_idx] { - ProjectSetSelectItem::Expr(expr) => { + ProjectSetSelectItem::Scalar(expr) => { watermark .clone() - .transform_with_expr( - // TODO: should we build `expr` in non-strict mode? - &NonStrictExpression::new_topmost(expr, LogReport), - expr_idx + PROJ_ROW_ID_OFFSET, - ) + .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET) .await } - ProjectSetSelectItem::TableFunction(_) => { + ProjectSetSelectItem::Set(_) => { bail!("Watermark should not be produced by a table function"); } }; @@ -253,3 +252,61 @@ impl Inner { Ok(ret) } } + +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(NonStrictExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: NonStrictExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost( + prost: &PbProjectSetSelectItem, + error_report: impl EvalErrorReport + 'static, + chunk_size: usize, + ) -> Result { + match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => { + expr::build_non_strict_from_prost(expr, error_report).map(Self::Scalar) + } + PbSelectItem::TableFunction(tf) => { + table_function::build_from_prost(tf, chunk_size).map(Self::Set) + } + } + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>, ExprError> { + match self { + Self::Scalar(expr) => Ok(Either::Right(expr.eval_infallible(input).await)), + // FIXME(runji): table function should also be evaluated non strictly + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + } + } +} diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index c2338394b33ef..155cc20a203a2 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -14,11 +14,10 @@ use multimap::MultiMap; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; use risingwave_pb::stream_plan::ProjectSetNode; use super::*; -use crate::executor::ProjectSetExecutor; +use crate::executor::{ProjectSetExecutor, ProjectSetSelectItem}; pub struct ProjectSetExecutorBuilder; @@ -35,7 +34,11 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder { .get_select_list() .iter() .map(|proto| { - ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size) + ProjectSetSelectItem::from_prost( + proto, + params.eval_error_report.clone(), + params.env.config().developer.chunk_size, + ) }) .try_collect()?; let watermark_derivations = MultiMap::from_iter( From 56c03ae16809f42555b46fa8fb2126149aeb6ad6 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 14:59:32 +0800 Subject: [PATCH 2/5] fix build Signed-off-by: Runji Wang --- src/stream/tests/integration_tests/project_set.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index 5cff03284f4b6..3f54056b9487c 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -30,8 +30,8 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { let (tx, source) = MockSource::channel(); let source = source.into_executor(schema, PkIndices::new()); - let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)").into_inner(); - let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)").into_inner(); + let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); + let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)"); let tf1 = repeat(build_from_pretty("1:int4").into_inner(), 1); let tf2 = repeat(build_from_pretty("2:int4").into_inner(), 2); From afe88d8d542ad02c20154bc8e1a7d3b5e9a27d62 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 15:00:25 +0800 Subject: [PATCH 3/5] fix slt Signed-off-by: Runji Wang --- e2e_test/streaming/bug_fixes/issue_12474.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/streaming/bug_fixes/issue_12474.slt b/e2e_test/streaming/bug_fixes/issue_12474.slt index bb4d8e9d991eb..245ca499a255d 100644 --- a/e2e_test/streaming/bug_fixes/issue_12474.slt +++ b/e2e_test/streaming/bug_fixes/issue_12474.slt @@ -15,9 +15,9 @@ flush; query II rowsort select * from mv; ---- +1 1 NULL 0 NULL 1 -1 1 statement ok drop materialized view mv; From 3e1e6ca57d2e059314ffc82d1ea82bc837548eae Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 15:05:08 +0800 Subject: [PATCH 4/5] add comment Signed-off-by: Runji Wang --- src/stream/src/executor/project_set.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 701f4fc5d27df..12f9d431c7c22 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -255,7 +255,10 @@ impl Inner { /// Either a scalar expression or a set-returning function. /// -/// See also [`PbProjectSetSelectItem`] +/// See also [`PbProjectSetSelectItem`]. +/// +/// A similar enum is defined in the `batch` module. The difference is that +/// we use `NonStrictExpression` instead of `BoxedExpression` here. #[derive(Debug)] pub enum ProjectSetSelectItem { Scalar(NonStrictExpression), From 017aee3cb66e590c0ffda962d8c4834bd6f8afa5 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 4 Jun 2024 16:20:40 +0800 Subject: [PATCH 5/5] trigger recovery by another bug Signed-off-by: Runji Wang --- e2e_test/error_ui/simple/recovery.slt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/e2e_test/error_ui/simple/recovery.slt b/e2e_test/error_ui/simple/recovery.slt index b47afc94c47ad..152854c278483 100644 --- a/e2e_test/error_ui/simple/recovery.slt +++ b/e2e_test/error_ui/simple/recovery.slt @@ -1,15 +1,15 @@ -# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/12474. +# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/11915. # We should consider using a mechanism designed for testing recovery instead of depending on a bug. statement ok -create table t (v int); +create table t (v decimal); statement ok -create materialized view mv as select generate_series(1, 10), coalesce(pg_sleep(2), v) / 0 bomb from t; +create materialized view mv as select sum(coalesce(pg_sleep(1), v)) from t; -# The bomb will be triggered after 2 seconds of sleep, so the insertion should return successfully. +# The bomb will be triggered after 1 seconds of sleep, so the insertion should return successfully. statement ok -insert into t values (1); +insert into t values (4e28), (4e28); # Wait for recovery to complete. sleep 15s @@ -25,7 +25,7 @@ with error as ( limit 1 ) select -case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Division by zero%' then 'ok' +case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Numeric out of range%' then 'ok' else error end as result from error;