Skip to content

Commit

Permalink
fix(expr): correctly handle visibility and nested array in ANY/ALL (
Browse files Browse the repository at this point in the history
#10520)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jun 26, 2023
1 parent 135e4bf commit 92722c9
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ steps:
- ./ci/plugins/upload-failure-logs
# Extra 2 minutes to account for docker-compose latency.
# See: https://github.com/risingwavelabs/risingwave/issues/9423#issuecomment-1521222169
timeout_in_minutes: 7
timeout_in_minutes: 10

- label: "release"
command: "ci/scripts/release.sh"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 7
timeout_in_minutes: 10

- label: "regress test"
command: "ci/scripts/regress-test.sh -p ci-dev"
Expand Down
12 changes: 12 additions & 0 deletions e2e_test/batch/basic/all_any_some.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,15 @@ drop materialized view mv2;

statement ok
drop table tmp;

statement ok
create materialized view mv as with cte(v1, v2) as (values ('a1', array[1,2]), ('a2', array[3,4]), ('b', array[5,6])) select v1, 1 = any(v2) as any from cte where v1 like 'a%';

query TT rowsort
select * from mv;
----
a1 t
a2 f

statement ok
drop materialized view mv;
15 changes: 15 additions & 0 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ impl<'a> ListRef<'a> {
self.len() == 0
}

/// Returns the elements in the flattened list.
pub fn flatten(self) -> Vec<DatumRef<'a>> {
// XXX: avoid using vector
iter_elems_ref!(self, it, {
Expand All @@ -449,6 +450,20 @@ impl<'a> ListRef<'a> {
})
}

/// Returns the total number of elements in the flattened list.
pub fn flatten_len(self) -> usize {
iter_elems_ref!(self, it, {
it.map(|datum_ref| {
if let Some(ScalarRefImpl::List(list_ref)) = datum_ref {
list_ref.flatten_len()
} else {
1
}
})
.sum()
})
}

/// Iterates over the elements of the list.
///
/// Prefer using the macro `iter_elems_ref!` if possible to avoid the cost of enum dispatching.
Expand Down
26 changes: 17 additions & 9 deletions src/expr/src/expr/expr_some_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use itertools::{multizip, Itertools};
use risingwave_common::array::{Array, ArrayRef, BoolArray, DataChunk};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, Scalar, ScalarImpl, ScalarRefImpl};
use risingwave_common::types::{DataType, Datum, ListRef, Scalar, ScalarImpl, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::{bail, ensure};
use risingwave_pb::expr::expr_node::{RexNode, Type};
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Expression for SomeAllExpression {
let capacity = arr_right_inner
.iter()
.flatten()
.map(|list_ref| list_ref.flatten().len())
.map(ListRef::flatten_len)
.sum();

let mut unfolded_arr_left_builder = arr_left.create_builder(capacity);
Expand All @@ -110,10 +110,11 @@ impl Expression for SomeAllExpression {
let datum_right = right.unwrap();
match datum_right {
ScalarRefImpl::List(array) => {
let len = array.iter().len();
let flattened = array.flatten();
let len = flattened.len();
num_array.push(Some(len));
unfolded_arr_left_builder.append_n(len, left);
for item in array.iter() {
for item in flattened {
unfolded_arr_right_builder.append(item);
}
}
Expand Down Expand Up @@ -142,12 +143,19 @@ impl Expression for SomeAllExpression {
}
}

assert_eq!(num_array.len(), data_chunk.capacity());

let unfolded_arr_left = unfolded_arr_left_builder.finish();
let unfolded_arr_right = unfolded_arr_right_builder.finish();

// Unfolded array are actually compacted, and the visibility of the output array will be
// further restored by `num_array`.
assert_eq!(unfolded_arr_left.len(), unfolded_arr_right.len());
let unfolded_compact_len = unfolded_arr_left.len();

let data_chunk = DataChunk::new(
vec![
unfolded_arr_left_builder.finish().into(),
unfolded_arr_right_builder.finish().into(),
],
capacity,
vec![unfolded_arr_left.into(), unfolded_arr_right.into()],
unfolded_compact_len,
);

let func_results = self.func.eval(&data_chunk).await?;
Expand Down

0 comments on commit 92722c9

Please sign in to comment.