Skip to content

Commit

Permalink
evaluate scalar expr non-strictly in streaming
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed Jun 4, 2024
1 parent 144c7cc commit 6152a42
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 66 deletions.
20 changes: 20 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12474.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# https://github.com/risingwavelabs/risingwave/issues/12474

statement ok
create table t(x int[]);

statement ok
create materialized view mv as select 1 / x[1] as bomb, unnest(x) as unnest from t;

statement ok
insert into t values (array[0, 1]), (array[1]);

statement ok
flush;

query II rowsort
select * from mv;
----
NULL 0
NULL 1
1 1
58 changes: 56 additions & 2 deletions src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
use either::Either;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::table_function::ProjectSetSelectItem;
use risingwave_expr::expr::{self, BoxedExpression};
use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::expr::project_set_select_item::PbSelectItem;
use risingwave_pb::expr::PbProjectSetSelectItem;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down Expand Up @@ -170,6 +173,57 @@ impl BoxedExecutorBuilder for ProjectSetExecutor {
}
}

/// Either a scalar expression or a set-returning function.
///
/// See also [`PbProjectSetSelectItem`]
#[derive(Debug)]
pub enum ProjectSetSelectItem {
Scalar(BoxedExpression),
Set(BoxedTableFunction),
}

impl From<BoxedTableFunction> for ProjectSetSelectItem {
fn from(table_function: BoxedTableFunction) -> Self {
ProjectSetSelectItem::Set(table_function)
}
}

impl From<BoxedExpression> for ProjectSetSelectItem {
fn from(expr: BoxedExpression) -> Self {
ProjectSetSelectItem::Scalar(expr)
}
}

impl ProjectSetSelectItem {
pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
Ok(match prost.select_item.as_ref().unwrap() {
PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
PbSelectItem::TableFunction(tf) => {
Self::Set(table_function::build_from_prost(tf, chunk_size)?)
}
})
}

pub fn return_type(&self) -> DataType {
match self {
ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
ProjectSetSelectItem::Set(tf) => tf.return_type(),
}
}

pub async fn eval<'a>(
&'a self,
input: &'a DataChunk,
) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
match self {
Self::Set(tf) => Ok(Either::Left(
TableFunctionOutputIter::new(tf.eval(input).await).await?,
)),
Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
}
}
}

#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
Expand Down
53 changes: 2 additions & 51 deletions src/expr/core/src/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use either::Either;
use futures_async_stream::try_stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk};
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk};
use risingwave_common::types::{DataType, DatumRef};
use risingwave_pb::expr::project_set_select_item::SelectItem;
use risingwave_pb::expr::table_function::PbType;
use risingwave_pb::expr::{PbProjectSetSelectItem, PbTableFunction};
use risingwave_pb::expr::PbTableFunction;

use super::{ExprError, Result};
use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression};
Expand Down Expand Up @@ -141,53 +139,6 @@ pub fn build(
desc.build_table(return_type, chunk_size, children)
}

/// See also [`PbProjectSetSelectItem`]
#[derive(Debug)]
pub enum ProjectSetSelectItem {
TableFunction(BoxedTableFunction),
Expr(BoxedExpression),
}

impl From<BoxedTableFunction> for ProjectSetSelectItem {
fn from(table_function: BoxedTableFunction) -> Self {
ProjectSetSelectItem::TableFunction(table_function)
}
}

impl From<BoxedExpression> for ProjectSetSelectItem {
fn from(expr: BoxedExpression) -> Self {
ProjectSetSelectItem::Expr(expr)
}
}

impl ProjectSetSelectItem {
pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
match prost.select_item.as_ref().unwrap() {
SelectItem::Expr(expr) => expr_build_from_prost(expr).map(Into::into),
SelectItem::TableFunction(tf) => build_from_prost(tf, chunk_size).map(Into::into),
}
}

pub fn return_type(&self) -> DataType {
match self {
ProjectSetSelectItem::TableFunction(tf) => tf.return_type(),
ProjectSetSelectItem::Expr(expr) => expr.return_type(),
}
}

pub async fn eval<'a>(
&'a self,
input: &'a DataChunk,
) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
match self {
Self::TableFunction(tf) => Ok(Either::Left(
TableFunctionOutputIter::new(tf.eval(input).await).await?,
)),
Self::Expr(expr) => expr.eval(input).await.map(Either::Right),
}
}
}

/// A wrapper over the output of table function that allows iteration by rows.
///
/// If the table function returns multiple columns, the output will be struct values.
Expand Down
77 changes: 67 additions & 10 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

use either::Either;
use multimap::MultiMap;
use risingwave_common::array::Op;
use risingwave_common::array::{ArrayRef, DataChunk, Op};
use risingwave_common::bail;
use risingwave_common::row::RowExt;
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::{LogReport, NonStrictExpression};
use risingwave_expr::table_function::ProjectSetSelectItem;
use risingwave_expr::expr::{self, EvalErrorReport, NonStrictExpression};
use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
use risingwave_expr::ExprError;
use risingwave_pb::expr::project_set_select_item::PbSelectItem;
use risingwave_pb::expr::PbProjectSetSelectItem;

use crate::executor::prelude::*;

Expand Down Expand Up @@ -226,17 +229,13 @@ impl Inner {
for expr_idx in expr_indices {
let expr_idx = *expr_idx;
let derived_watermark = match &self.select_list[expr_idx] {
ProjectSetSelectItem::Expr(expr) => {
ProjectSetSelectItem::Scalar(expr) => {
watermark
.clone()
.transform_with_expr(
// TODO: should we build `expr` in non-strict mode?
&NonStrictExpression::new_topmost(expr, LogReport),
expr_idx + PROJ_ROW_ID_OFFSET,
)
.transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET)
.await
}
ProjectSetSelectItem::TableFunction(_) => {
ProjectSetSelectItem::Set(_) => {
bail!("Watermark should not be produced by a table function");
}
};
Expand All @@ -253,3 +252,61 @@ impl Inner {
Ok(ret)
}
}

/// Either a scalar expression or a set-returning function.
///
/// See also [`PbProjectSetSelectItem`]
#[derive(Debug)]
pub enum ProjectSetSelectItem {
Scalar(NonStrictExpression),
Set(BoxedTableFunction),
}

impl From<BoxedTableFunction> for ProjectSetSelectItem {
fn from(table_function: BoxedTableFunction) -> Self {
ProjectSetSelectItem::Set(table_function)
}
}

impl From<NonStrictExpression> for ProjectSetSelectItem {
fn from(expr: NonStrictExpression) -> Self {
ProjectSetSelectItem::Scalar(expr)
}
}

impl ProjectSetSelectItem {
pub fn from_prost(
prost: &PbProjectSetSelectItem,
error_report: impl EvalErrorReport + 'static,
chunk_size: usize,
) -> Result<Self, ExprError> {
match prost.select_item.as_ref().unwrap() {
PbSelectItem::Expr(expr) => {
expr::build_non_strict_from_prost(expr, error_report).map(Self::Scalar)
}
PbSelectItem::TableFunction(tf) => {
table_function::build_from_prost(tf, chunk_size).map(Self::Set)
}
}
}

pub fn return_type(&self) -> DataType {
match self {
ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
ProjectSetSelectItem::Set(tf) => tf.return_type(),
}
}

pub async fn eval<'a>(
&'a self,
input: &'a DataChunk,
) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>, ExprError> {
match self {
Self::Scalar(expr) => Ok(Either::Right(expr.eval_infallible(input).await)),
// FIXME(runji): table function should also be evaluated non strictly
Self::Set(tf) => Ok(Either::Left(
TableFunctionOutputIter::new(tf.eval(input).await).await?,
)),
}
}
}
9 changes: 6 additions & 3 deletions src/stream/src/from_proto/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

use multimap::MultiMap;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::table_function::ProjectSetSelectItem;
use risingwave_pb::stream_plan::ProjectSetNode;

use super::*;
use crate::executor::ProjectSetExecutor;
use crate::executor::{ProjectSetExecutor, ProjectSetSelectItem};

pub struct ProjectSetExecutorBuilder;

Expand All @@ -35,7 +34,11 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder {
.get_select_list()
.iter()
.map(|proto| {
ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size)
ProjectSetSelectItem::from_prost(
proto,
params.eval_error_report.clone(),
params.env.config().developer.chunk_size,
)
})
.try_collect()?;
let watermark_derivations = MultiMap::from_iter(
Expand Down

0 comments on commit 6152a42

Please sign in to comment.