Skip to content

Commit

Permalink
fix(udf): don't fallback to row-based evaluation for udf (#14147)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
wangrunji0408 authored Dec 22, 2023
1 parent adaf0a7 commit 188228c
Showing 2 changed files with 95 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use risingwave_pb::expr::ExprNode;

use super::expr_some_all::SomeAllExpression;
use super::expr_udf::UdfExpression;
use super::non_strict::NonStrictNoFallback;
use super::wrapper::checked::Checked;
use super::wrapper::non_strict::NonStrict;
use super::wrapper::EvalErrorReport;
@@ -75,11 +76,15 @@ where

/// Attach wrappers to an expression.
#[expect(clippy::let_and_return)]
fn wrap(&self, expr: impl Expression + 'static) -> BoxedExpression {
fn wrap(&self, expr: impl Expression + 'static, no_fallback: bool) -> BoxedExpression {
let checked = Checked(expr);

let may_non_strict = if let Some(error_report) = &self.error_report {
NonStrict::new(checked, error_report.clone()).boxed()
if no_fallback {
NonStrictNoFallback::new(checked, error_report.clone()).boxed()
} else {
NonStrict::new(checked, error_report.clone()).boxed()
}
} else {
checked.boxed()
};
@@ -90,7 +95,9 @@ where
/// Build an expression with `build_inner` and attach some wrappers.
fn build(&self, prost: &ExprNode) -> Result<BoxedExpression> {
let expr = self.build_inner(prost)?;
Ok(self.wrap(expr))
// no fallback to row-based evaluation for UDF
let no_fallback = matches!(prost.get_rex_node().unwrap(), RexNode::Udf(_));
Ok(self.wrap(expr, no_fallback))
}

/// Build an expression from protobuf.
@@ -209,7 +216,7 @@ pub fn build_func_non_strict(
error_report: impl EvalErrorReport + 'static,
) -> Result<NonStrictExpression> {
let expr = build_func(func, ret_type, children)?;
let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr));
let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr, false));

Ok(wrapped)
}
84 changes: 84 additions & 0 deletions src/expr/core/src/expr/wrapper/non_strict.rs
Original file line number Diff line number Diff line change
@@ -152,3 +152,87 @@ where
self.inner.input_ref_index()
}
}

/// Similar to [`NonStrict`] wrapper, but does not fallback to row-based evaluation when an error occurs.
pub(crate) struct NonStrictNoFallback<E, R> {
inner: E,
report: R,
}

impl<E, R> std::fmt::Debug for NonStrictNoFallback<E, R>
where
E: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NonStrictNoFallback")
.field("inner", &self.inner)
.field("report", &std::any::type_name::<R>())
.finish()
}
}

impl<E, R> NonStrictNoFallback<E, R>
where
E: Expression,
R: EvalErrorReport,
{
pub fn new(inner: E, report: R) -> Self {
Self { inner, report }
}
}

// TODO: avoid the overhead of extra boxing.
#[async_trait]
impl<E, R> Expression for NonStrictNoFallback<E, R>
where
E: Expression,
R: EvalErrorReport,
{
fn return_type(&self) -> DataType {
self.inner.return_type()
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
Ok(match self.inner.eval(input).await {
Ok(array) => array,
Err(error) => {
self.report.report(error);
// no fallback and return NULL for each row
let mut builder = self.return_type().create_array_builder(input.capacity());
builder.append_n_null(input.capacity());
builder.finish().into()
}
})
}

async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
Ok(match self.inner.eval_v2(input).await {
Ok(value) => value,
Err(error) => {
self.report.report(error);
ValueImpl::Scalar {
value: None,
capacity: input.capacity(),
}
}
})
}

async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
Ok(match self.inner.eval_row(input).await {
Ok(datum) => datum,
Err(error) => {
self.report.report(error);
None // NULL
}
})
}

fn eval_const(&self) -> Result<Datum> {
self.inner.eval_const() // do not handle error
}

fn input_ref_index(&self) -> Option<usize> {
self.inner.input_ref_index()
}
}

0 comments on commit 188228c

Please sign in to comment.