Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(expr): tolerate table function errors on streams #17156

Merged
merged 9 commits into from
Jun 7, 2024
Merged
41 changes: 41 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_11915.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# https://github.com/risingwavelabs/risingwave/issues/11915

statement ok
create table t(x int);

statement ok
create materialized view mv as select x, generate_series(1, 2, x) from t;

# x = 0 causes generate_series(1, 2, x) to return an error.
statement ok
insert into t values (0), (1);

statement ok
flush;

# Output 0 row when the set-returning function returns error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an discussion on the behavior in this case. #12474 (comment) cc @fuyufjh

Not sure which one we should adopt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behavior for a table function not being called (e.g. with null input) is returning no rows. So I vote for this for the sake of consistency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behavior for a table function not being called (e.g. with null input) is returning no rows. So I vote for this for the sake of consistency.

I agree that these 2 cases should be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Option 1 in the original issue. And I agree it is consistent with null input. In other words, the table function returns null (which is similar to {} but not {null}) on error.

Related edges cases on the differences among null, {} and {null}:

^ Just mentioning these cases. Not suggesting any value is always preferred than the other 2.

query II rowsort
select * from mv;
----
1 1
1 2

# Delete the error row.
statement ok
delete from t where x = 0;

statement ok
flush;

# The result should be the same as before.
query II rowsort
select * from mv;
----
1 1
1 2

statement ok
drop materialized view mv;

statement ok
drop table t;
2 changes: 1 addition & 1 deletion src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ProjectSetExecutor {
&& i == row_idx
{
valid = true;
value
value?
} else {
None
}
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/executor/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures_async_stream::try_stream;
use risingwave_common::array::{ArrayImpl, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_expr::table_function::{build_from_prost, BoxedTableFunction};
use risingwave_expr::table_function::{build_from_prost, check_error, BoxedTableFunction};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedExecutor, BoxedExecutorBuilder};
Expand Down Expand Up @@ -54,6 +54,7 @@ impl TableFunctionExecutor {
#[for_await]
for chunk in self.table_function.eval(&dummy_chunk).await {
let chunk = chunk?;
check_error(&chunk)?;
// remove the first column and expand the second column if its data type is struct
yield match chunk.column_at(1).as_ref() {
ArrayImpl::Struct(struct_array) => struct_array.into(),
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ impl Bitmap {
self.count_ones
}

/// Returns true if any bit is set to 1.
pub fn any(&self) -> bool {
self.count_ones != 0
}

/// Returns the length of vector to store `num_bits` bits.
fn vec_len(num_bits: usize) -> usize {
(num_bits + BITS - 1) / BITS
Expand Down
42 changes: 34 additions & 8 deletions src/expr/core/src/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ pub use self::empty::*;
pub use self::repeat::*;
use self::user_defined::*;

/// Instance of a table function.
/// A table function takes a row as input and returns multiple rows as output.
///
/// A table function takes a row as input and returns a table. It is also known as Set-Returning
/// Function.
/// It is also known as Set-Returning Function.
#[async_trait::async_trait]
pub trait TableFunction: std::fmt::Debug + Sync + Send {
/// The data type of the output.
fn return_type(&self) -> DataType;

/// # Contract of the output
///
/// The returned `DataChunk` contains exact two columns:
/// The returned `DataChunk` contains two or three columns:
/// - The first column is an I32Array containing row indices of input chunk. It should be
/// monotonically increasing.
/// - The second column is the output values. The data type of the column is `return_type`.
/// - (Optional) If any error occurs, the error message is stored in the third column.
///
/// i.e., for the `i`-th input row, the output rows are `(i, output_1)`, `(i, output_2)`, ...
///
Expand Down Expand Up @@ -173,7 +174,7 @@ pub fn build(
/// for i in 0..4 {
/// let (index, value) = iter.peek().unwrap();
/// assert_eq!(index, i);
/// assert_eq!(value, Some((i as i64).into()));
/// assert_eq!(value, Ok(Some((i as i64).into())));
/// iter.next().await.unwrap();
/// }
/// assert!(iter.peek().is_none());
Expand All @@ -199,11 +200,19 @@ impl<'a> TableFunctionOutputIter<'a> {
}

/// Gets the current row.
pub fn peek(&'a self) -> Option<(usize, DatumRef<'a>)> {
pub fn peek(&'a self) -> Option<(usize, Result<DatumRef<'a>>)> {
let chunk = self.chunk.as_ref()?;
let index = chunk.column_at(0).as_int32().value_at(self.index).unwrap() as usize;
let value = chunk.column_at(1).value_at(self.index);
Some((index, value))
let result = if let Some(msg) = chunk
.columns()
.get(2)
.and_then(|errors| errors.as_utf8().value_at(self.index))
{
Err(ExprError::Custom(msg.into()))
} else {
Ok(chunk.column_at(1).value_at(self.index))
};
Some((index, result))
}

/// Moves to the next row.
Expand All @@ -229,3 +238,20 @@ impl<'a> TableFunctionOutputIter<'a> {
Ok(())
}
}

/// Checks if the output chunk returned by `TableFunction::eval` contains any error.
pub fn check_error(chunk: &DataChunk) -> Result<()> {
if let Some(errors) = chunk.columns().get(2) {
if errors.null_bitmap().any() {
return Err(ExprError::Custom(
errors
.as_utf8()
.iter()
.find_map(|s| s)
.expect("no error message")
.into(),
));
}
}
Ok(())
}
37 changes: 17 additions & 20 deletions src/expr/impl/src/table_function/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ use risingwave_expr::{function, ExprError, Result};

#[function("generate_series(int4, int4) -> setof int4")]
#[function("generate_series(int8, int8) -> setof int8")]
fn generate_series<T>(start: T, stop: T) -> Result<impl Iterator<Item = Result<T>>>
fn generate_series<T>(start: T, stop: T) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<Output = T> + PartialOrd + Copy + One + IsNegative,
{
range_generic::<_, _, true>(start, stop, T::one())
}

#[function("generate_series(decimal, decimal) -> setof decimal")]
fn generate_series_decimal(
start: Decimal,
stop: Decimal,
) -> Result<impl Iterator<Item = Result<Decimal>>>
fn generate_series_decimal(start: Decimal, stop: Decimal) -> Result<impl Iterator<Item = Decimal>>
where
{
validate_range_parameters(start, stop, Decimal::one())?;
Expand All @@ -39,7 +36,7 @@ where
#[function("generate_series(int4, int4, int4) -> setof int4")]
#[function("generate_series(int8, int8, int8) -> setof int8")]
#[function("generate_series(timestamp, timestamp, interval) -> setof timestamp")]
fn generate_series_step<T, S>(start: T, stop: T, step: S) -> Result<impl Iterator<Item = Result<T>>>
fn generate_series_step<T, S>(start: T, stop: T, step: S) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<S, Output = T> + PartialOrd + Copy,
S: IsNegative + Copy,
Expand All @@ -52,22 +49,22 @@ fn generate_series_step_decimal(
start: Decimal,
stop: Decimal,
step: Decimal,
) -> Result<impl Iterator<Item = Result<Decimal>>> {
) -> Result<impl Iterator<Item = Decimal>> {
validate_range_parameters(start, stop, step)?;
range_generic::<_, _, true>(start, stop, step)
}

#[function("range(int4, int4) -> setof int4")]
#[function("range(int8, int8) -> setof int8")]
fn range<T>(start: T, stop: T) -> Result<impl Iterator<Item = Result<T>>>
fn range<T>(start: T, stop: T) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<Output = T> + PartialOrd + Copy + One + IsNegative,
{
range_generic::<_, _, false>(start, stop, T::one())
}

#[function("range(decimal, decimal) -> setof decimal")]
fn range_decimal(start: Decimal, stop: Decimal) -> Result<impl Iterator<Item = Result<Decimal>>>
fn range_decimal(start: Decimal, stop: Decimal) -> Result<impl Iterator<Item = Decimal>>
where
{
validate_range_parameters(start, stop, Decimal::one())?;
Expand All @@ -77,7 +74,7 @@ where
#[function("range(int4, int4, int4) -> setof int4")]
#[function("range(int8, int8, int8) -> setof int8")]
#[function("range(timestamp, timestamp, interval) -> setof timestamp")]
fn range_step<T, S>(start: T, stop: T, step: S) -> Result<impl Iterator<Item = Result<T>>>
fn range_step<T, S>(start: T, stop: T, step: S) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<S, Output = T> + PartialOrd + Copy,
S: IsNegative + Copy,
Expand All @@ -90,7 +87,7 @@ fn range_step_decimal(
start: Decimal,
stop: Decimal,
step: Decimal,
) -> Result<impl Iterator<Item = Result<Decimal>>> {
) -> Result<impl Iterator<Item = Decimal>> {
validate_range_parameters(start, stop, step)?;
range_generic::<_, _, false>(start, stop, step)
}
Expand All @@ -100,7 +97,7 @@ fn range_generic<T, S, const INCLUSIVE: bool>(
start: T,
stop: T,
step: S,
) -> Result<impl Iterator<Item = Result<T>>>
) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<S, Output = T> + PartialOrd + Copy,
S: IsNegative + Copy,
Expand All @@ -113,19 +110,19 @@ where
}
let mut cur = start;
let neg = step.is_negative();
let mut next = move || {
let next = move || {
match (INCLUSIVE, neg) {
(true, true) if cur < stop => return Ok(None),
(true, false) if cur > stop => return Ok(None),
(false, true) if cur <= stop => return Ok(None),
(false, false) if cur >= stop => return Ok(None),
(true, true) if cur < stop => return None,
(true, false) if cur > stop => return None,
(false, true) if cur <= stop => return None,
(false, false) if cur >= stop => return None,
_ => {}
};
let ret = cur;
cur = cur.checked_add(step).ok_or(ExprError::NumericOutOfRange)?;
Ok(Some(ret))
cur = cur.checked_add(step)?;
Some(ret)
};
Ok(std::iter::from_fn(move || next().transpose()))
Ok(std::iter::from_fn(next))
}

#[inline]
Expand Down
71 changes: 55 additions & 16 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,11 +1122,29 @@ impl FunctionAttr {
let iter = quote! { #fn_name(#(#inputs,)* #prebuilt_arg #context) };
let mut iter = match user_fn.return_type_kind {
ReturnTypeKind::T => quote! { #iter },
ReturnTypeKind::Result => quote! { #iter? },
ReturnTypeKind::Option => quote! { if let Some(it) = #iter { it } else { continue; } },
ReturnTypeKind::ResultOption => {
quote! { if let Some(it) = #iter? { it } else { continue; } }
}
ReturnTypeKind::Option => quote! { match #iter {
Some(it) => it,
None => continue,
} },
ReturnTypeKind::Result => quote! { match #iter {
Ok(it) => it,
Err(e) => {
index_builder.append(Some(i as i32));
#(#builders.append_null();)*
error_builder.append(Some(&e.to_string()));
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
} },
ReturnTypeKind::ResultOption => quote! { match #iter {
Ok(Some(it)) => it,
Ok(None) => continue,
Err(e) => {
index_builder.append(Some(i as i32));
#(#builders.append_null();)*
error_builder.append(Some(&e.to_string()));
continue;
}
} },
};
// if user function accepts non-option arguments, we assume the function
// returns empty on null input, so we need to unwrap the inputs before calling.
Expand All @@ -1153,11 +1171,24 @@ impl FunctionAttr {
"expect `impl Iterator` in return type",
)
})?;
let output = match iterator_item_type {
ReturnTypeKind::T => quote! { Some(output) },
ReturnTypeKind::Option => quote! { output },
ReturnTypeKind::Result => quote! { Some(output?) },
ReturnTypeKind::ResultOption => quote! { output? },
let append_output = match iterator_item_type {
ReturnTypeKind::T => quote! {
let (#(#outputs),*) = output;
#(#builders.append(#optioned_outputs);)* error_builder.append_null();
},
ReturnTypeKind::Option => quote! { match output {
Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* error_builder.append_null(); }
None => { #(#builders.append_null();)* error_builder.append_null(); }
} },
ReturnTypeKind::Result => quote! { match output {
Ok((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* error_builder.append_null(); }
Err(e) => { #(#builders.append_null();)* error_builder.append(Some(&e.to_string())); }
} },
ReturnTypeKind::ResultOption => quote! { match output {
Ok(Some((#(#outputs),*))) => { #(#builders.append(#optioned_outputs);)* error_builder.append_null(); }
Ok(None) => { #(#builders.append_null();)* error_builder.append_null(); }
Err(e) => { #(#builders.append_null();)* error_builder.append(Some(&e.to_string())); }
} },
};

Ok(quote! {
Expand Down Expand Up @@ -1215,6 +1246,7 @@ impl FunctionAttr {

let mut index_builder = I32ArrayBuilder::new(self.chunk_size);
#(let mut #builders = #builder_types::with_type(self.chunk_size, #return_types);)*
let mut error_builder = Utf8ArrayBuilder::new(self.chunk_size);

for i in 0..input.capacity() {
if unsafe { !input.visibility().is_set_unchecked(i) } {
Expand All @@ -1223,17 +1255,19 @@ impl FunctionAttr {
#(let #inputs = unsafe { #arrays.value_at_unchecked(i) };)*
for output in #iter {
index_builder.append(Some(i as i32));
match #output {
Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* }
None => { #(#builders.append_null();)* }
}
#append_output

if index_builder.len() == self.chunk_size {
let len = index_builder.len();
let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref();
let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*];
#build_value_array
yield DataChunk::new(vec![index_array, value_array], self.chunk_size);
let error_array = std::mem::replace(&mut error_builder, Utf8ArrayBuilder::new(self.chunk_size)).finish().into_ref();
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
if error_array.null_bitmap().any() {
yield DataChunk::new(vec![index_array, value_array, error_array], self.chunk_size);
} else {
yield DataChunk::new(vec![index_array, value_array], self.chunk_size);
}
}
}
}
Expand All @@ -1243,7 +1277,12 @@ impl FunctionAttr {
let index_array = index_builder.finish().into_ref();
let value_arrays = [#(#builders.finish().into_ref()),*];
#build_value_array
yield DataChunk::new(vec![index_array, value_array], len);
let error_array = error_builder.finish().into_ref();
if error_array.null_bitmap().any() {
yield DataChunk::new(vec![index_array, value_array, error_array], len);
} else {
yield DataChunk::new(vec![index_array, value_array], len);
}
}
}
}
Expand Down
Loading
Loading