From 3ece3337f4988478a25bb1543d3796348ac5c738 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sat, 16 Sep 2023 02:00:04 +0800 Subject: [PATCH] fix handling visibility Signed-off-by: Runji Wang --- e2e_test/udf/udf.slt | 26 +++++--------- src/common/src/array/arrow.rs | 6 ++-- src/common/src/array/data_chunk.rs | 22 ++++++++++++ src/expr/src/table_function/mod.rs | 4 +-- src/expr/src/table_function/user_defined.rs | 38 ++++++++++++++++----- 5 files changed, 67 insertions(+), 29 deletions(-) diff --git a/e2e_test/udf/udf.slt b/e2e_test/udf/udf.slt index 1ebb3ed30f4b..110b1c0f373d 100644 --- a/e2e_test/udf/udf.slt +++ b/e2e_test/udf/udf.slt @@ -231,32 +231,24 @@ statement ok create table t (x int); statement ok -create materialized view mv1 as select gcd(x, x) from t where x = 1; +create materialized view mv as select gcd(x, x), series(x) from t where x <> 2; statement ok -create materialized view mv2 as select series(x) from t where x = 1; - -statement ok -insert into t values (1), (2); +insert into t values (1), (2), (3); statement ok flush; -query T -select * from mv1; ----- -1 - -query T -select * from mv2; +query II +select * from mv; ---- -0 - -statement ok -drop materialized view mv1; +1 0 +3 0 +3 1 +3 2 statement ok -drop materialized view mv2; +drop materialized view mv; statement ok drop table t; diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 9b4165b608d9..0f89e6b4f53f 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -27,6 +27,7 @@ use crate::util::iter_util::ZipEqDebug; // Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`. +// note: DataChunk -> arrow RecordBatch will IGNORE the visibilities. impl TryFrom<&DataChunk> for arrow_array::RecordBatch { type Error = ArrayError; @@ -47,8 +48,9 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch { .collect(); let schema = Arc::new(Schema::new(fields)); - - arrow_array::RecordBatch::try_new(schema, columns) + let opts = + arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); + arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) .map_err(|err| ArrayError::ToArrow(err.to_string())) } } diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index cc4bef12cccf..657dfd3c366f 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::fmt::Display; use std::hash::BuildHasher; use std::sync::Arc; @@ -265,6 +266,27 @@ impl DataChunk { } } + /// Convert the chunk to compact format. + /// + /// If the chunk is not compacted, return a new compacted chunk, otherwise return a reference to self. + pub fn compact_cow(&self) -> Cow<'_, Self> { + match &self.vis2 { + Vis::Compact(_) => Cow::Borrowed(self), + Vis::Bitmap(visibility) => { + let cardinality = visibility.count_ones(); + let columns = self + .columns + .iter() + .map(|col| { + let array = col; + array.compact(visibility, cardinality).into() + }) + .collect::>(); + Cow::Owned(Self::new(columns, cardinality)) + } + } + } + pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult { let mut columns = vec![]; for any_col in proto.get_columns() { diff --git a/src/expr/src/table_function/mod.rs b/src/expr/src/table_function/mod.rs index 23453d9f7b95..245eebd7f372 100644 --- a/src/expr/src/table_function/mod.rs +++ b/src/expr/src/table_function/mod.rs @@ -51,7 +51,7 @@ pub trait TableFunction: std::fmt::Debug + Sync + Send { /// # Contract of the output /// /// The returned `DataChunk` contains exact two columns: - /// - The first column is an I32Array containing row indexes of input chunk. It should be + /// - The first column is an I32Array containing row indices of input chunk. It should be /// monotonically increasing. /// - The second column is the output values. The data type of the column is `return_type`. /// @@ -82,7 +82,7 @@ pub trait TableFunction: std::fmt::Debug + Sync + Send { /// (You don't need to understand this section to implement a `TableFunction`) /// /// The output of the `TableFunction` is different from the output of the `ProjectSet` executor. - /// `ProjectSet` executor uses the row indexes to stitch multiple table functions and produces + /// `ProjectSet` executor uses the row indices to stitch multiple table functions and produces /// `projected_row_id`. /// /// ## Example diff --git a/src/expr/src/table_function/user_defined.rs b/src/expr/src/table_function/user_defined.rs index ef4f4255b561..813cf2350448 100644 --- a/src/expr/src/table_function/user_defined.rs +++ b/src/expr/src/table_function/user_defined.rs @@ -14,9 +14,10 @@ use std::sync::Arc; +use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use futures_util::stream; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{DataChunk, I32Array}; use risingwave_common::bail; use risingwave_udf::ArrowFlightUdfClient; @@ -25,6 +26,7 @@ use super::*; #[derive(Debug)] pub struct UserDefinedTableFunction { children: Vec, + #[allow(dead_code)] arg_schema: SchemaRef, return_type: DataType, client: Arc, @@ -49,25 +51,42 @@ impl TableFunction for UserDefinedTableFunction { impl UserDefinedTableFunction { #[try_stream(boxed, ok = DataChunk, error = ExprError)] async fn eval_inner<'a>(&'a self, input: &'a DataChunk) { + // evaluate children expressions let mut columns = Vec::with_capacity(self.children.len()); for c in &self.children { - let val = c.eval_checked(input).await?.as_ref().try_into()?; + let val = c.eval_checked(input).await?; columns.push(val); } + let direct_input = DataChunk::new(columns, input.vis().clone()); + + // compact the input chunk and record the row mapping + let visible_rows = direct_input.vis().iter_ones().collect_vec(); + let compacted_input = direct_input.compact_cow(); + let arrow_input = RecordBatch::try_from(compacted_input.as_ref())?; - let opts = - arrow_array::RecordBatchOptions::default().with_row_count(Some(input.capacity())); - let input = - arrow_array::RecordBatch::try_new_with_options(self.arg_schema.clone(), columns, &opts) - .expect("failed to build record batch"); + // call UDTF #[for_await] for res in self .client - .call_stream(&self.identifier, stream::once(async { input })) + .call_stream(&self.identifier, stream::once(async { arrow_input })) .await? { let output = DataChunk::try_from(&res?)?; self.check_output(&output)?; + + // we send the compacted input to UDF, so we need to map the row indices back to the original input + let origin_indices = output + .column_at(0) + .as_int32() + .raw_iter() + // we have checked all indices are non-negative + .map(|idx| visible_rows[idx as usize] as i32) + .collect::(); + + let output = DataChunk::new( + vec![origin_indices.into_ref(), output.column_at(1).clone()], + output.vis().clone(), + ); yield output; } } @@ -87,6 +106,9 @@ impl UserDefinedTableFunction { DataType::Int32, ); } + if output.column_at(0).as_int32().raw_iter().any(|i| i < 0) { + bail!("UDF returned negative row index"); + } if !output .column_at(1) .data_type()