From 6da4c48532a0e7549ad79defacfa288a90f61bac Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 08:07:13 -0400 Subject: [PATCH 01/31] feat:bind func --- proto/expr.proto | 2 ++ src/frontend/src/binder/expr/function.rs | 18 +++++++++++ src/frontend/src/expr/function_call.rs | 2 ++ .../tests/testdata/basic_query.yaml | 31 +++++++++++++++++++ 4 files changed, 53 insertions(+) diff --git a/proto/expr.proto b/proto/expr.proto index 5a27a22712a4e..187b437cd412a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -55,6 +55,8 @@ message ExprNode { ROUND = 214; ASCII = 215; TRANSLATE = 216; + NULLIF = 217; + COALESCE = 218; // Boolean comparison IS_TRUE = 301; IS_NOT_TRUE = 302; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 08a820a5c0fe7..d6f9f77a881a3 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -58,6 +58,24 @@ impl Binder { "position" => ExprType::Position, "ltrim" => ExprType::Ltrim, "rtrim" => ExprType::Rtrim, + "nullif" => { + if inputs.len() != 2 { + return Err(ErrorCode::BindError( + "Nullif function must contain 2 arguments".to_string(), + ) + .into()); + } + ExprType::Nullif + } + "coalesce" => { + if inputs.len() <= 0 { + return Err(ErrorCode::BindError( + "Coalesce function must contain at least 1 argument".to_string(), + ) + .into()); + } + ExprType::Coalesce + }, "round" => { inputs = Self::rewrite_round_args(inputs); ExprType::RoundDigit diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index 5c866f8e3c4a7..c7d59b3eecd11 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -106,6 +106,8 @@ impl FunctionCall { align_types(inputs.iter_mut())?; Ok(DataType::Boolean) } + ExprType::Nullif => Ok(inputs[0].return_type()), + ExprType::Coalesce => Ok(inputs[0].return_type()), _ => infer_type( func_type, inputs.iter().map(|expr| expr.return_type()).collect(), diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 1f8018d0664d4..fced182873752 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -119,3 +119,34 @@ BatchDelete { table: t } BatchFilter { predicate: ($1 = 1:Int32) } BatchScan { table: t, columns: [_row_id#0, v1, v2] } +- sql: | + create table t (v1 int); + select nullif(v1, 1) from t; + batch_plan: | + BatchExchange { order: [], dist: Single } + BatchProject { exprs: [Nullif($0, 1:Int32)], expr_alias: [ ] } + BatchScan { table: t, columns: [v1] } + stream_plan: | + StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamProject { exprs: [Nullif($0, 1:Int32), $1], expr_alias: [ , ] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } +- sql: | + create table t (v1 int); + select nullif(v1, 1, 2) from t; + binder_error: 'Bind error: Nullif function must contain 2 arguments' +- sql: | + create table t (v1 int); + select coalesce(v1, 1, 2) from t; + batch_plan: | + BatchExchange { order: [], dist: Single } + BatchProject { exprs: [Coalesce($0, 1:Int32, 2:Int32)], expr_alias: [ ] } + BatchScan { table: t, columns: [v1] } + stream_plan: | + StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamProject { exprs: [Coalesce($0, 1:Int32, 2:Int32), $1], expr_alias: [ , ] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } +- sql: | + create table t (v1 int); + select coalesce() from t; + binder_error: 'Bind error: Coalesce function must contain at least 1 argument' + From c4dda45f589cec5843eab565e20b3e2bde3e3ac5 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 09:22:35 -0400 Subject: [PATCH 02/31] fix --- src/expr/src/expr/expr_nullif.rs | 80 ++++++++++++++++++++++++++++++++ src/expr/src/expr/mod.rs | 1 + 2 files changed, 81 insertions(+) create mode 100644 src/expr/src/expr/expr_nullif.rs diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs new file mode 100644 index 0000000000000..c44225bde09b3 --- /dev/null +++ b/src/expr/src/expr/expr_nullif.rs @@ -0,0 +1,80 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::TryFrom; +use itertools::Itertools; + +use risingwave_common::array::{Array, ArrayImpl, ArrayRef, DataChunk}; +use risingwave_common::error::{internal_error, ErrorCode, Result, RwError}; +use risingwave_common::types::DataType; +use risingwave_common::{ensure, ensure_eq, try_match_expand}; +use risingwave_pb::expr::expr_node::{RexNode, Type}; +use risingwave_pb::expr::ExprNode; + +use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Expression}; + +#[derive(Debug)] +pub struct NullifExpression { + return_type: DataType, + left: BoxedExpression, + right: BoxedExpression, +} + +impl Expression for NullifExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + fn eval(&self, input: &DataChunk) -> Result { + let array_left = self.left.eval(input)?; + let array_right = self.right.eval(input)?; + + if let ArrayImpl::Bool(bool_array) = array_right.as_ref() { + bool_array.iter().map(|c|{ + + }).collect_vec() + Ok(struct_array.get_children_by_index(self.index)) + } else { + Err(internal_error("expects a struct array ref")) + } + } +} + +impl NullifExpression { + pub fn new(return_type: DataType, left: BoxedExpression, right: BoxedExpression) -> Self { + NullifExpression { + return_type, + left, + right, + } + } +} + +impl<'a> TryFrom<&'a ExprNode> for NullifExpression { + type Error = RwError; + + fn try_from(prost: &'a ExprNode) -> Result { + ensure!(prost.get_expr_type()? == Type::Nullif); + + let ret_type = DataType::from(prost.get_return_type()?); + let func_call_node = try_match_expand!(prost.get_rex_node().unwrap(), RexNode::FuncCall)?; + + let children = func_call_node.children.to_vec(); + // Nullif `func_call_node` have 2 child nodes. + ensure_eq!(children.len(), 2); + let left = expr_build_from_prost(&children[0])?; + let right = expr_build_from_prost(&children[1])?; + Ok(NullifExpression::new(ret_type, left, right)) + } +} \ No newline at end of file diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 80f09b9abc68e..508725b106bdb 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -20,6 +20,7 @@ pub mod expr_binary_nonnull; pub mod expr_binary_nullable; mod expr_case; mod expr_field; +mod expr_nullif; mod expr_in; mod expr_input_ref; mod expr_is_null; From 61e90abc911434d502f1c0d2f31f833e7a9eb8a1 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 14:36:38 -0400 Subject: [PATCH 03/31] feat/nullif_expr --- proto/expr.proto | 1 - src/common/src/types/mod.rs | 10 +- src/expr/src/expr/expr_nullif.rs | 122 +++++++++++++++--- src/expr/src/expr/mod.rs | 2 +- src/frontend/src/binder/expr/function.rs | 27 ++-- src/frontend/src/expr/function_call.rs | 1 - .../tests/testdata/basic_query.yaml | 15 --- 7 files changed, 122 insertions(+), 56 deletions(-) diff --git a/proto/expr.proto b/proto/expr.proto index 187b437cd412a..c15f63112a1aa 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -56,7 +56,6 @@ message ExprNode { ASCII = 215; TRANSLATE = 216; NULLIF = 217; - COALESCE = 218; // Boolean comparison IS_TRUE = 301; IS_NOT_TRUE = 302; diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 3bf52ee94d90e..5fb9b43d87051 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -125,9 +125,13 @@ impl DataType { DataType::Timestamp => NaiveDateTimeArrayBuilder::new(capacity)?.into(), DataType::Timestampz => PrimitiveArrayBuilder::::new(capacity)?.into(), DataType::Interval => IntervalArrayBuilder::new(capacity)?.into(), - DataType::Struct { .. } => { - todo!() - } + DataType::Struct { fields } => StructArrayBuilder::with_meta( + capacity, + ArrayMeta::Struct { + children: fields.clone(), + }, + )? + .into(), DataType::List { datatype } => ListArrayBuilder::with_meta( capacity, ArrayMeta::List { diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs index c44225bde09b3..a107c6f076ab2 100644 --- a/src/expr/src/expr/expr_nullif.rs +++ b/src/expr/src/expr/expr_nullif.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::convert::TryFrom; -use itertools::Itertools; +use std::sync::Arc; +use itertools::Itertools; use risingwave_common::array::{Array, ArrayImpl, ArrayRef, DataChunk}; -use risingwave_common::error::{internal_error, ErrorCode, Result, RwError}; -use risingwave_common::types::DataType; +use risingwave_common::error::{internal_error, Result, RwError}; +use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::{ensure, ensure_eq, try_match_expand}; use risingwave_pb::expr::expr_node::{RexNode, Type}; use risingwave_pb::expr::ExprNode; @@ -27,8 +28,8 @@ use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Ex #[derive(Debug)] pub struct NullifExpression { return_type: DataType, - left: BoxedExpression, - right: BoxedExpression, + origin: BoxedExpression, + is_equal: BoxedExpression, } impl Expression for NullifExpression { @@ -37,26 +38,34 @@ impl Expression for NullifExpression { } fn eval(&self, input: &DataChunk) -> Result { - let array_left = self.left.eval(input)?; - let array_right = self.right.eval(input)?; - - if let ArrayImpl::Bool(bool_array) = array_right.as_ref() { - bool_array.iter().map(|c|{ + let origin = self.origin.eval(input)?; + let is_equal = self.is_equal.eval(input)?; + let mut builder = self.return_type.create_array_builder(input.cardinality())?; - }).collect_vec() - Ok(struct_array.get_children_by_index(self.index)) + if let ArrayImpl::Bool(bool_array) = is_equal.as_ref() { + origin + .iter() + .zip_eq(bool_array.iter()) + .try_for_each(|(c, d)| { + if let Some(true) = d { + builder.append_datum(&c.to_owned_datum()) + } else { + builder.append_null() + } + })?; + Ok(Arc::new(builder.finish()?)) } else { - Err(internal_error("expects a struct array ref")) + Err(internal_error("expects a bool array ref")) } } } impl NullifExpression { - pub fn new(return_type: DataType, left: BoxedExpression, right: BoxedExpression) -> Self { + pub fn new(return_type: DataType, origin: BoxedExpression, is_equal: BoxedExpression) -> Self { NullifExpression { return_type, - left, - right, + origin, + is_equal, } } } @@ -73,8 +82,81 @@ impl<'a> TryFrom<&'a ExprNode> for NullifExpression { let children = func_call_node.children.to_vec(); // Nullif `func_call_node` have 2 child nodes. ensure_eq!(children.len(), 2); - let left = expr_build_from_prost(&children[0])?; - let right = expr_build_from_prost(&children[1])?; - Ok(NullifExpression::new(ret_type, left, right)) + let origin = expr_build_from_prost(&children[0])?; + let is_equal = expr_build_from_prost(&children[1])?; + Ok(NullifExpression::new(ret_type, origin, is_equal)) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use risingwave_common::array::column::Column; + use risingwave_common::array::{BoolArray, DataChunk, PrimitiveArray, Utf8Array}; + use risingwave_common::types::ScalarImpl; + use risingwave_pb::data::data_type::TypeName; + use risingwave_pb::data::DataType as ProstDataType; + use risingwave_pb::expr::expr_node::RexNode; + use risingwave_pb::expr::expr_node::Type::Nullif; + use risingwave_pb::expr::{ExprNode, FunctionCall}; + + use crate::expr::expr_nullif::NullifExpression; + use crate::expr::test_utils::make_input_ref; + use crate::expr::Expression; + + pub fn make_nullif_function(children: Vec, ret: TypeName) -> ExprNode { + ExprNode { + expr_type: Nullif as i32, + return_type: Some(ProstDataType { + type_name: ret as i32, + ..Default::default() + }), + rex_node: Some(RexNode::FuncCall(FunctionCall { children })), + } + } + + #[test] + fn test_nullif_expr() { + let input_node1 = make_input_ref(0, TypeName::Int32); + let input_node2 = make_input_ref(1, TypeName::Int32); + let input_node3 = make_input_ref(2, TypeName::Varchar); + + let array = PrimitiveArray::::from_slice(&[Some(2), Some(2), Some(4), Some(3)]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col1 = Column::new(array); + let array = BoolArray::from_slice(&[Some(true), Some(true), Some(false), Some(false)]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col2 = Column::new(array); + let array = Utf8Array::from_slice(&[Some("2"), Some("2"), Some("4"), Some("3")]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col3 = Column::new(array); + + let data_chunk = DataChunk::builder().columns(vec![col1, col2, col3]).build(); + + let nullif_expr = NullifExpression::try_from(&make_nullif_function( + vec![input_node1, input_node2.clone()], + TypeName::Int32, + )) + .unwrap(); + let res = nullif_expr.eval(&data_chunk).unwrap(); + assert_eq!(res.datum_at(0), Some(ScalarImpl::Int32(2))); + assert_eq!(res.datum_at(1), Some(ScalarImpl::Int32(2))); + assert_eq!(res.datum_at(2), None); + assert_eq!(res.datum_at(3), None); + + let nullif_expr = NullifExpression::try_from(&make_nullif_function( + vec![input_node3, input_node2], + TypeName::Varchar, + )) + .unwrap(); + let res = nullif_expr.eval(&data_chunk).unwrap(); + assert_eq!(res.datum_at(0), Some(ScalarImpl::Utf8("2".to_string()))); + assert_eq!(res.datum_at(1), Some(ScalarImpl::Utf8("2".to_string()))); + assert_eq!(res.datum_at(2), None); + assert_eq!(res.datum_at(3), None); + } +} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 508725b106bdb..594263f7cc782 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -20,11 +20,11 @@ pub mod expr_binary_nonnull; pub mod expr_binary_nullable; mod expr_case; mod expr_field; -mod expr_nullif; mod expr_in; mod expr_input_ref; mod expr_is_null; mod expr_literal; +mod expr_nullif; mod expr_ternary_bytes; pub mod expr_unary; mod pg_sleep; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index d6f9f77a881a3..cabfcb4534ab3 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -59,23 +59,9 @@ impl Binder { "ltrim" => ExprType::Ltrim, "rtrim" => ExprType::Rtrim, "nullif" => { - if inputs.len() != 2 { - return Err(ErrorCode::BindError( - "Nullif function must contain 2 arguments".to_string(), - ) - .into()); - } + inputs = Self::rewrite_nullif_args(inputs)?; ExprType::Nullif } - "coalesce" => { - if inputs.len() <= 0 { - return Err(ErrorCode::BindError( - "Coalesce function must contain at least 1 argument".to_string(), - ) - .into()); - } - ExprType::Coalesce - }, "round" => { inputs = Self::rewrite_round_args(inputs); ExprType::RoundDigit @@ -98,6 +84,17 @@ impl Binder { } } + fn rewrite_nullif_args(mut inputs: Vec) -> Result> { + if inputs.len() != 2 { + Err(ErrorCode::BindError("Nullif function must contain 2 arguments".to_string()).into()) + } else { + inputs[1] = + FunctionCall::new(ExprType::Equal, vec![inputs[0].clone(), inputs[1].clone()])? + .into(); + Ok(inputs) + } + } + /// Rewrite the arguments to be consistent with the `round` signature: /// - round(Decimal, Int32) -> Decimal /// - round(Decimal) -> Decimal diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index c7d59b3eecd11..b0294e554a326 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -107,7 +107,6 @@ impl FunctionCall { Ok(DataType::Boolean) } ExprType::Nullif => Ok(inputs[0].return_type()), - ExprType::Coalesce => Ok(inputs[0].return_type()), _ => infer_type( func_type, inputs.iter().map(|expr| expr.return_type()).collect(), diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index fced182873752..0d5410304b802 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -134,19 +134,4 @@ create table t (v1 int); select nullif(v1, 1, 2) from t; binder_error: 'Bind error: Nullif function must contain 2 arguments' -- sql: | - create table t (v1 int); - select coalesce(v1, 1, 2) from t; - batch_plan: | - BatchExchange { order: [], dist: Single } - BatchProject { exprs: [Coalesce($0, 1:Int32, 2:Int32)], expr_alias: [ ] } - BatchScan { table: t, columns: [v1] } - stream_plan: | - StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamProject { exprs: [Coalesce($0, 1:Int32, 2:Int32), $1], expr_alias: [ , ] } - StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } -- sql: | - create table t (v1 int); - select coalesce() from t; - binder_error: 'Bind error: Coalesce function must contain at least 1 argument' From faf442a8fab39cda83b7ff74f9825bfae2647ee1 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 14:47:31 -0400 Subject: [PATCH 04/31] feat: add test --- e2e_test/v2/batch/func.slt | 25 +++++++++++++++++++++++++ src/expr/src/expr/expr_nullif.rs | 4 ++-- src/expr/src/expr/mod.rs | 2 ++ 3 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 e2e_test/v2/batch/func.slt diff --git a/e2e_test/v2/batch/func.slt b/e2e_test/v2/batch/func.slt new file mode 100644 index 0000000000000..561da943680f9 --- /dev/null +++ b/e2e_test/v2/batch/func.slt @@ -0,0 +1,25 @@ +query I +select nullif(2,2); +---- + + +query I +select nullif(1,2); +---- +1 + +statement ok +create table t3 (v1 int, v2 int, v3 int); + +statement ok +insert into t3 values (1) (2) (3); + +query I +select nullif(t3.v1,1) from t3; +---- + +2 +3 + +statement ok +drop table t3; \ No newline at end of file diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs index a107c6f076ab2..5bd63af86dab1 100644 --- a/src/expr/src/expr/expr_nullif.rs +++ b/src/expr/src/expr/expr_nullif.rs @@ -47,7 +47,7 @@ impl Expression for NullifExpression { .iter() .zip_eq(bool_array.iter()) .try_for_each(|(c, d)| { - if let Some(true) = d { + if let Some(false) = d { builder.append_datum(&c.to_owned_datum()) } else { builder.append_null() @@ -126,7 +126,7 @@ mod tests { .map(|x| Arc::new(x.into())) .unwrap(); let col1 = Column::new(array); - let array = BoolArray::from_slice(&[Some(true), Some(true), Some(false), Some(false)]) + let array = BoolArray::from_slice(&[Some(false), Some(false), Some(true), Some(true)]) .map(|x| Arc::new(x.into())) .unwrap(); let col2 = Column::new(array); diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 594263f7cc782..2a6deb639cf6d 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -45,6 +45,7 @@ use risingwave_pb::expr::ExprNode; use crate::expr::build_expr_from_prost::*; use crate::expr::expr_field::FieldExpression; +use crate::expr::expr_nullif::NullifExpression; pub type ExpressionRef = Arc; @@ -81,6 +82,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { Add | Subtract | Multiply | Divide | Modulus => build_binary_expr_prost(prost), Extract | RoundDigit | TumbleStart | Position => build_binary_expr_prost(prost), StreamNullByRowCount | And | Or => build_nullable_binary_expr_prost(prost), + Nullif => NullifExpression::try_from(prost).map(|d| Box::new(d) as BoxedExpression), Substr => build_substr_expr(prost), Length => build_length_expr(prost), Replace => build_replace_expr(prost), From c4c53c519e91b97e2e7aed4afc325687a70fbc23 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 15:02:05 -0400 Subject: [PATCH 05/31] fix --- src/frontend/test_runner/tests/testdata/basic_query.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 0d5410304b802..0c6d235350f64 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -124,11 +124,11 @@ select nullif(v1, 1) from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchProject { exprs: [Nullif($0, 1:Int32)], expr_alias: [ ] } + BatchProject { exprs: [Nullif($0, ($0 = 1:Int32))], expr_alias: [ ] } BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamProject { exprs: [Nullif($0, 1:Int32), $1], expr_alias: [ , ] } + StreamProject { exprs: [Nullif($0, ($0 = 1:Int32)), $1], expr_alias: [ , ] } StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | create table t (v1 int); From a9f717db7441322aafdc911f206947448524b968 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 15:22:38 -0400 Subject: [PATCH 06/31] fix --- e2e_test/v2/batch/func.slt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/e2e_test/v2/batch/func.slt b/e2e_test/v2/batch/func.slt index 561da943680f9..0402f49d3d2a9 100644 --- a/e2e_test/v2/batch/func.slt +++ b/e2e_test/v2/batch/func.slt @@ -1,7 +1,7 @@ query I select nullif(2,2); ---- - +NULL query I select nullif(1,2); @@ -12,14 +12,12 @@ statement ok create table t3 (v1 int, v2 int, v3 int); statement ok -insert into t3 values (1) (2) (3); +insert into t3 values(1, 2, NULL); query I select nullif(t3.v1,1) from t3; ---- -2 -3 statement ok drop table t3; \ No newline at end of file From 00d6aef10ef446ef61d282f832ba7a0e88fd8c44 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 16:06:17 -0400 Subject: [PATCH 07/31] fix --- e2e_test/v2/batch/func.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/v2/batch/func.slt b/e2e_test/v2/batch/func.slt index 0402f49d3d2a9..b63d3694faf70 100644 --- a/e2e_test/v2/batch/func.slt +++ b/e2e_test/v2/batch/func.slt @@ -17,7 +17,7 @@ insert into t3 values(1, 2, NULL); query I select nullif(t3.v1,1) from t3; ---- - +NULL statement ok drop table t3; \ No newline at end of file From b789f74006d091b1dc3f0df0fac1b422f5532ffe Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 18:37:49 -0400 Subject: [PATCH 08/31] fix --- src/expr/src/expr/expr_nullif.rs | 1 + src/frontend/src/binder/expr/function.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs index 5bd63af86dab1..4484eacfa5525 100644 --- a/src/expr/src/expr/expr_nullif.rs +++ b/src/expr/src/expr/expr_nullif.rs @@ -43,6 +43,7 @@ impl Expression for NullifExpression { let mut builder = self.return_type.create_array_builder(input.cardinality())?; if let ArrayImpl::Bool(bool_array) = is_equal.as_ref() { + // If `is_equal` is true, add null otherwise add `origin` value. origin .iter() .zip_eq(bool_array.iter()) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index cabfcb4534ab3..ba77b027b5765 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -84,6 +84,8 @@ impl Binder { } } + /// Make sure inputs only have 2 value and rewrite the arguments. + /// Nullif(expr1,expr2) -> Nullif(expr1,Equal(expr1,expr2)). fn rewrite_nullif_args(mut inputs: Vec) -> Result> { if inputs.len() != 2 { Err(ErrorCode::BindError("Nullif function must contain 2 arguments".to_string()).into()) From 1d2c3f4d25f72c32e993c04a5e93f3f3253b59aa Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 20:14:47 -0400 Subject: [PATCH 09/31] feat:coalesce --- proto/expr.proto | 1 + src/expr/src/expr/expr_coalesce.rs | 150 ++++++++++++++++++ src/expr/src/expr/mod.rs | 3 + src/frontend/src/binder/expr/function.rs | 27 ++++ src/frontend/src/expr/function_call.rs | 1 + .../tests/testdata/basic_query.yaml | 19 +++ 6 files changed, 201 insertions(+) create mode 100644 src/expr/src/expr/expr_coalesce.rs diff --git a/proto/expr.proto b/proto/expr.proto index c15f63112a1aa..187b437cd412a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -56,6 +56,7 @@ message ExprNode { ASCII = 215; TRANSLATE = 216; NULLIF = 217; + COALESCE = 218; // Boolean comparison IS_TRUE = 301; IS_NOT_TRUE = 302; diff --git a/src/expr/src/expr/expr_coalesce.rs b/src/expr/src/expr/expr_coalesce.rs new file mode 100644 index 0000000000000..d191737bbcf2a --- /dev/null +++ b/src/expr/src/expr/expr_coalesce.rs @@ -0,0 +1,150 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::TryFrom; +use std::sync::Arc; + +use risingwave_common::array::{ArrayRef, DataChunk}; +use risingwave_common::error::{Result, RwError}; +use risingwave_common::types::DataType; +use risingwave_common::{ensure, try_match_expand}; +use risingwave_pb::expr::expr_node::{RexNode, Type}; +use risingwave_pb::expr::ExprNode; + +use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Expression}; + +#[derive(Debug)] +pub struct CoalesceExpression { + return_type: DataType, + children: Vec, +} + +impl Expression for CoalesceExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + fn eval(&self, input: &DataChunk) -> Result { + let children_array = self + .children + .iter() + .map(|c| c.eval(input)) + .collect::>>()?; + let mut builder = self.return_type.create_array_builder(input.cardinality())?; + + let len = children_array[0].len(); + for i in 0..len { + let mut data = None; + for array in &children_array { + let datum = array.datum_at(i); + if datum.is_some() { + data = datum; + break; + } + } + builder.append_datum(&data)?; + } + Ok(Arc::new(builder.finish()?)) + } +} + +impl CoalesceExpression { + pub fn new(return_type: DataType, children: Vec) -> Self { + CoalesceExpression { + return_type, + children, + } + } +} + +impl<'a> TryFrom<&'a ExprNode> for CoalesceExpression { + type Error = RwError; + + fn try_from(prost: &'a ExprNode) -> Result { + ensure!(prost.get_expr_type()? == Type::Coalesce); + + let ret_type = DataType::from(prost.get_return_type()?); + let func_call_node = try_match_expand!(prost.get_rex_node().unwrap(), RexNode::FuncCall)?; + + let children = func_call_node + .children + .to_vec() + .iter() + .map(expr_build_from_prost) + .collect::>>()?; + Ok(CoalesceExpression::new(ret_type, children)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use risingwave_common::array::column::Column; + use risingwave_common::array::{DataChunk, PrimitiveArray}; + use risingwave_common::types::ScalarImpl; + use risingwave_pb::data::data_type::TypeName; + use risingwave_pb::data::DataType as ProstDataType; + use risingwave_pb::expr::expr_node::RexNode; + use risingwave_pb::expr::expr_node::Type::Coalesce; + use risingwave_pb::expr::{ExprNode, FunctionCall}; + + use crate::expr::expr_coalesce::CoalesceExpression; + use crate::expr::test_utils::make_input_ref; + use crate::expr::Expression; + + pub fn make_coalesce_function(children: Vec, ret: TypeName) -> ExprNode { + ExprNode { + expr_type: Coalesce as i32, + return_type: Some(ProstDataType { + type_name: ret as i32, + ..Default::default() + }), + rex_node: Some(RexNode::FuncCall(FunctionCall { children })), + } + } + + #[test] + fn test_coalesce_expr() { + let input_node1 = make_input_ref(0, TypeName::Int32); + let input_node2 = make_input_ref(1, TypeName::Int32); + let input_node3 = make_input_ref(2, TypeName::Int32); + + let array = PrimitiveArray::::from_slice(&[Some(1), None, None, None]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col1 = Column::new(array); + let array = PrimitiveArray::::from_slice(&[None, Some(2), None, None]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col2 = Column::new(array); + let array = PrimitiveArray::::from_slice(&[None, None, Some(3), None]) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col3 = Column::new(array); + + let data_chunk = DataChunk::builder().columns(vec![col1, col2, col3]).build(); + + let nullif_expr = CoalesceExpression::try_from(&make_coalesce_function( + vec![input_node1, input_node2, input_node3], + TypeName::Int32, + )) + .unwrap(); + let res = nullif_expr.eval(&data_chunk).unwrap(); + assert_eq!(res.datum_at(0), Some(ScalarImpl::Int32(1))); + assert_eq!(res.datum_at(1), Some(ScalarImpl::Int32(2))); + assert_eq!(res.datum_at(2), Some(ScalarImpl::Int32(3))); + assert_eq!(res.datum_at(3), None); + } +} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 2a6deb639cf6d..7c5d3ff672032 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -19,6 +19,7 @@ mod expr_binary_bytes; pub mod expr_binary_nonnull; pub mod expr_binary_nullable; mod expr_case; +mod expr_coalesce; mod expr_field; mod expr_in; mod expr_input_ref; @@ -44,6 +45,7 @@ use risingwave_common::types::DataType; use risingwave_pb::expr::ExprNode; use crate::expr::build_expr_from_prost::*; +use crate::expr::expr_coalesce::CoalesceExpression; use crate::expr::expr_field::FieldExpression; use crate::expr::expr_nullif::NullifExpression; @@ -83,6 +85,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { Extract | RoundDigit | TumbleStart | Position => build_binary_expr_prost(prost), StreamNullByRowCount | And | Or => build_nullable_binary_expr_prost(prost), Nullif => NullifExpression::try_from(prost).map(|d| Box::new(d) as BoxedExpression), + Coalesce => CoalesceExpression::try_from(prost).map(|d| Box::new(d) as BoxedExpression), Substr => build_substr_expr(prost), Length => build_length_expr(prost), Replace => build_replace_expr(prost), diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index ba77b027b5765..38239e0b28f1e 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -62,6 +62,10 @@ impl Binder { inputs = Self::rewrite_nullif_args(inputs)?; ExprType::Nullif } + "coalesce" => { + inputs = Self::check_coalesce_args(inputs)?; + ExprType::Coalesce + } "round" => { inputs = Self::rewrite_round_args(inputs); ExprType::RoundDigit @@ -97,6 +101,29 @@ impl Binder { } } + /// Make sure inputs have more than 1 value and check the args have same `data_type`. + fn check_coalesce_args(inputs: Vec) -> Result> { + if inputs.is_empty() { + Err(ErrorCode::BindError( + "Coalesce function must contain at least 1 argument".to_string(), + ) + .into()) + } else { + let data_type = inputs[0].return_type(); + for input in &inputs { + if data_type != input.return_type() { + return Err(ErrorCode::BindError(format!( + "Coalesce function cannot match types {:?} and {:?}", + data_type, + input.return_type() + )) + .into()); + } + } + Ok(inputs) + } + } + /// Rewrite the arguments to be consistent with the `round` signature: /// - round(Decimal, Int32) -> Decimal /// - round(Decimal) -> Decimal diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index b0294e554a326..c7d59b3eecd11 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -107,6 +107,7 @@ impl FunctionCall { Ok(DataType::Boolean) } ExprType::Nullif => Ok(inputs[0].return_type()), + ExprType::Coalesce => Ok(inputs[0].return_type()), _ => infer_type( func_type, inputs.iter().map(|expr| expr.return_type()).collect(), diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 0c6d235350f64..0038e5bb62391 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -134,4 +134,23 @@ create table t (v1 int); select nullif(v1, 1, 2) from t; binder_error: 'Bind error: Nullif function must contain 2 arguments' +- sql: | + create table t (v1 int); + select coalesce(v1, 1) from t; + batch_plan: | + BatchExchange { order: [], dist: Single } + BatchProject { exprs: [Coalesce($0, 1:Int32)], expr_alias: [ ] } + BatchScan { table: t, columns: [v1] } + stream_plan: | + StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamProject { exprs: [Coalesce($0, 1:Int32), $1], expr_alias: [ , ] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } +- sql: | + create table t (v1 int); + select coalesce() from t; + binder_error: 'Bind error: Coalesce function must contain at least 1 argument' +- sql: | + create table t (v1 int); + select coalesce(1,'a') from t; + binder_error: 'Bind error: Coalesce function cannot match types Int32 and Varchar' From 7a4d9837da9025bbc0f87d9665a941fef2750db3 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Wed, 4 May 2022 20:20:32 -0400 Subject: [PATCH 10/31] add test --- e2e_test/v2/batch/func.slt | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/e2e_test/v2/batch/func.slt b/e2e_test/v2/batch/func.slt index b63d3694faf70..8d6fd5f4cdcac 100644 --- a/e2e_test/v2/batch/func.slt +++ b/e2e_test/v2/batch/func.slt @@ -8,16 +8,28 @@ select nullif(1,2); ---- 1 +query I +select coalesce(1,2); +---- +1 + statement ok -create table t3 (v1 int, v2 int, v3 int); +create table t1 (v1 int, v2 int, v3 int); statement ok -insert into t3 values(1, 2, NULL); +insert into t1 values (1,null,null),(null,2,null),(null,null,3); query I -select nullif(t3.v1,1) from t3; +select nullif(t1.v1,1) from t1; ---- NULL +query I +select coalesce(v1,v2,v3) from t1; +---- +1 +2 +3 + statement ok -drop table t3; \ No newline at end of file +drop table t1; \ No newline at end of file From 6493e69d2768329f3046310fc4d5106dd25a4116 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 07:10:42 -0400 Subject: [PATCH 11/31] fix --- src/expr/src/expr/expr_nullif.rs | 72 ++++++++---------------- src/frontend/src/binder/expr/function.rs | 15 +++-- 2 files changed, 34 insertions(+), 53 deletions(-) diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs index 4484eacfa5525..c99d6a4df0725 100644 --- a/src/expr/src/expr/expr_nullif.rs +++ b/src/expr/src/expr/expr_nullif.rs @@ -16,8 +16,8 @@ use std::convert::TryFrom; use std::sync::Arc; use itertools::Itertools; -use risingwave_common::array::{Array, ArrayImpl, ArrayRef, DataChunk}; -use risingwave_common::error::{internal_error, Result, RwError}; +use risingwave_common::array::{ArrayRef, DataChunk}; +use risingwave_common::error::{Result, RwError}; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::{ensure, ensure_eq, try_match_expand}; use risingwave_pb::expr::expr_node::{RexNode, Type}; @@ -28,8 +28,8 @@ use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Ex #[derive(Debug)] pub struct NullifExpression { return_type: DataType, - origin: BoxedExpression, - is_equal: BoxedExpression, + left: BoxedExpression, + right: BoxedExpression, } impl Expression for NullifExpression { @@ -38,35 +38,27 @@ impl Expression for NullifExpression { } fn eval(&self, input: &DataChunk) -> Result { - let origin = self.origin.eval(input)?; - let is_equal = self.is_equal.eval(input)?; + let left = self.left.eval(input)?; + let right = self.right.eval(input)?; let mut builder = self.return_type.create_array_builder(input.cardinality())?; - if let ArrayImpl::Bool(bool_array) = is_equal.as_ref() { - // If `is_equal` is true, add null otherwise add `origin` value. - origin - .iter() - .zip_eq(bool_array.iter()) - .try_for_each(|(c, d)| { - if let Some(false) = d { - builder.append_datum(&c.to_owned_datum()) - } else { - builder.append_null() - } - })?; - Ok(Arc::new(builder.finish()?)) - } else { - Err(internal_error("expects a bool array ref")) - } + left.iter().zip_eq(right.iter()).try_for_each(|(c, d)| { + if c != d { + builder.append_datum(&c.to_owned_datum()) + } else { + builder.append_null() + } + })?; + Ok(Arc::new(builder.finish()?)) } } impl NullifExpression { - pub fn new(return_type: DataType, origin: BoxedExpression, is_equal: BoxedExpression) -> Self { + pub fn new(return_type: DataType, left: BoxedExpression, right: BoxedExpression) -> Self { NullifExpression { return_type, - origin, - is_equal, + left, + right, } } } @@ -83,9 +75,9 @@ impl<'a> TryFrom<&'a ExprNode> for NullifExpression { let children = func_call_node.children.to_vec(); // Nullif `func_call_node` have 2 child nodes. ensure_eq!(children.len(), 2); - let origin = expr_build_from_prost(&children[0])?; - let is_equal = expr_build_from_prost(&children[1])?; - Ok(NullifExpression::new(ret_type, origin, is_equal)) + let left = expr_build_from_prost(&children[0])?; + let right = expr_build_from_prost(&children[1])?; + Ok(NullifExpression::new(ret_type, left, right)) } } @@ -94,7 +86,7 @@ mod tests { use std::sync::Arc; use risingwave_common::array::column::Column; - use risingwave_common::array::{BoolArray, DataChunk, PrimitiveArray, Utf8Array}; + use risingwave_common::array::{DataChunk, PrimitiveArray}; use risingwave_common::types::ScalarImpl; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType as ProstDataType; @@ -121,25 +113,20 @@ mod tests { fn test_nullif_expr() { let input_node1 = make_input_ref(0, TypeName::Int32); let input_node2 = make_input_ref(1, TypeName::Int32); - let input_node3 = make_input_ref(2, TypeName::Varchar); let array = PrimitiveArray::::from_slice(&[Some(2), Some(2), Some(4), Some(3)]) .map(|x| Arc::new(x.into())) .unwrap(); let col1 = Column::new(array); - let array = BoolArray::from_slice(&[Some(false), Some(false), Some(true), Some(true)]) + let array = PrimitiveArray::::from_slice(&[Some(1), Some(3), Some(4), Some(3)]) .map(|x| Arc::new(x.into())) .unwrap(); let col2 = Column::new(array); - let array = Utf8Array::from_slice(&[Some("2"), Some("2"), Some("4"), Some("3")]) - .map(|x| Arc::new(x.into())) - .unwrap(); - let col3 = Column::new(array); - let data_chunk = DataChunk::builder().columns(vec![col1, col2, col3]).build(); + let data_chunk = DataChunk::builder().columns(vec![col1, col2]).build(); let nullif_expr = NullifExpression::try_from(&make_nullif_function( - vec![input_node1, input_node2.clone()], + vec![input_node1, input_node2], TypeName::Int32, )) .unwrap(); @@ -148,16 +135,5 @@ mod tests { assert_eq!(res.datum_at(1), Some(ScalarImpl::Int32(2))); assert_eq!(res.datum_at(2), None); assert_eq!(res.datum_at(3), None); - - let nullif_expr = NullifExpression::try_from(&make_nullif_function( - vec![input_node3, input_node2], - TypeName::Varchar, - )) - .unwrap(); - let res = nullif_expr.eval(&data_chunk).unwrap(); - assert_eq!(res.datum_at(0), Some(ScalarImpl::Utf8("2".to_string()))); - assert_eq!(res.datum_at(1), Some(ScalarImpl::Utf8("2".to_string()))); - assert_eq!(res.datum_at(2), None); - assert_eq!(res.datum_at(3), None); } } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 38239e0b28f1e..96cddc2189b58 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -59,7 +59,7 @@ impl Binder { "ltrim" => ExprType::Ltrim, "rtrim" => ExprType::Rtrim, "nullif" => { - inputs = Self::rewrite_nullif_args(inputs)?; + inputs = Self::check_nullif_args(inputs)?; ExprType::Nullif } "coalesce" => { @@ -90,13 +90,18 @@ impl Binder { /// Make sure inputs only have 2 value and rewrite the arguments. /// Nullif(expr1,expr2) -> Nullif(expr1,Equal(expr1,expr2)). - fn rewrite_nullif_args(mut inputs: Vec) -> Result> { + fn check_nullif_args(inputs: Vec) -> Result> { if inputs.len() != 2 { Err(ErrorCode::BindError("Nullif function must contain 2 arguments".to_string()).into()) } else { - inputs[1] = - FunctionCall::new(ExprType::Equal, vec![inputs[0].clone(), inputs[1].clone()])? - .into(); + if inputs[0].return_type() != inputs[1].return_type() { + return Err(ErrorCode::BindError(format!( + "Nullif function cannot match types {:?} and {:?}", + inputs[0].return_type(), + inputs[1].return_type() + )) + .into()); + } Ok(inputs) } } From 030670e382a70679a4f5ade1070eb2940efd6df2 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 09:00:00 -0400 Subject: [PATCH 12/31] fix --- src/frontend/test_runner/tests/testdata/basic_query.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 0038e5bb62391..6ba1e4d511969 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -124,16 +124,20 @@ select nullif(v1, 1) from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchProject { exprs: [Nullif($0, ($0 = 1:Int32))], expr_alias: [ ] } + BatchProject { exprs: [Nullif($0, 1:Int32)], expr_alias: [ ] } BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamProject { exprs: [Nullif($0, ($0 = 1:Int32)), $1], expr_alias: [ , ] } + StreamProject { exprs: [Nullif($0, 1:Int32), $1], expr_alias: [ , ] } StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | create table t (v1 int); select nullif(v1, 1, 2) from t; binder_error: 'Bind error: Nullif function must contain 2 arguments' +- sql: | + create table t (v1 int); + select nullif(v1, 'a') from t; + binder_error: 'Bind error: Nullif function cannot match types Int32 and Varchar' - sql: | create table t (v1 int); select coalesce(v1, 1) from t; From e2435b799af154d00098f1e99b846d8c306f3d85 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 09:02:45 -0400 Subject: [PATCH 13/31] fix --- e2e_test/v2/batch/func.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e_test/v2/batch/func.slt b/e2e_test/v2/batch/func.slt index 8d6fd5f4cdcac..4ed3df9a74027 100644 --- a/e2e_test/v2/batch/func.slt +++ b/e2e_test/v2/batch/func.slt @@ -23,6 +23,8 @@ query I select nullif(t1.v1,1) from t1; ---- NULL +NULL +NULL query I select coalesce(v1,v2,v3) from t1; From 45f6f190bd99af6997fcca057709f0c2aa2846ca Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 09:26:42 -0400 Subject: [PATCH 14/31] fix: make case support null --- src/expr/src/expr/expr_case.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/expr/src/expr/expr_case.rs b/src/expr/src/expr/expr_case.rs index 93732149d3b85..67f72185978f4 100644 --- a/src/expr/src/expr/expr_case.rs +++ b/src/expr/src/expr/expr_case.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::error::Result; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, ScalarRefImpl, ToOwnedDatum}; use crate::expr::{BoxedExpression, Expression}; @@ -77,10 +77,9 @@ impl Expression for CaseExpression { if let Some((_, t)) = when_thens .iter() .map(|(w, t)| (w.value_at(idx), t.value_at(idx))) - .find(|(w, _)| *w.unwrap().into_scalar_impl().as_bool()) + .find(|(w, _)| *w.unwrap_or(ScalarRefImpl::Bool(false)).into_scalar_impl().as_bool()) { - let t = Some(t.unwrap().into_scalar_impl()); - output_array.append_datum(&t)?; + output_array.append_datum(&t.to_owned_datum())?; } else if let Some(els) = els.as_mut() { let t = els.datum_at(idx); output_array.append_datum(&t)?; From 17ce8706f5fb45542882319b89d1da7ff01b1b80 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 09:32:47 -0400 Subject: [PATCH 15/31] fix: make case support null --- src/expr/src/expr/expr_case.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/expr/src/expr/expr_case.rs b/src/expr/src/expr/expr_case.rs index 67f72185978f4..c80758670c031 100644 --- a/src/expr/src/expr/expr_case.rs +++ b/src/expr/src/expr/expr_case.rs @@ -77,7 +77,11 @@ impl Expression for CaseExpression { if let Some((_, t)) = when_thens .iter() .map(|(w, t)| (w.value_at(idx), t.value_at(idx))) - .find(|(w, _)| *w.unwrap_or(ScalarRefImpl::Bool(false)).into_scalar_impl().as_bool()) + .find(|(w, _)| { + *w.unwrap_or(ScalarRefImpl::Bool(false)) + .into_scalar_impl() + .as_bool() + }) { output_array.append_datum(&t.to_owned_datum())?; } else if let Some(els) = els.as_mut() { From c2318ed45cbe88b9d297d0dff7dfcccbf8667b42 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 13:49:03 -0400 Subject: [PATCH 16/31] fix --- src/frontend/src/binder/expr/function.rs | 21 ++++++++----------- .../tests/testdata/basic_query.yaml | 6 +++--- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 96cddc2189b58..d0f7afb709d33 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -59,8 +59,8 @@ impl Binder { "ltrim" => ExprType::Ltrim, "rtrim" => ExprType::Rtrim, "nullif" => { - inputs = Self::check_nullif_args(inputs)?; - ExprType::Nullif + inputs = Self::write_nullif_to_case_args(inputs)?; + ExprType::Case } "coalesce" => { inputs = Self::check_coalesce_args(inputs)?; @@ -89,19 +89,16 @@ impl Binder { } /// Make sure inputs only have 2 value and rewrite the arguments. - /// Nullif(expr1,expr2) -> Nullif(expr1,Equal(expr1,expr2)). - fn check_nullif_args(inputs: Vec) -> Result> { + /// Nullif(expr1,expr2) -> Case(Equal(expr1 = expr2),null,expr1). + fn write_nullif_to_case_args(inputs: Vec) -> Result> { if inputs.len() != 2 { Err(ErrorCode::BindError("Nullif function must contain 2 arguments".to_string()).into()) } else { - if inputs[0].return_type() != inputs[1].return_type() { - return Err(ErrorCode::BindError(format!( - "Nullif function cannot match types {:?} and {:?}", - inputs[0].return_type(), - inputs[1].return_type() - )) - .into()); - } + let inputs = vec![ + FunctionCall::new(ExprType::Equal, inputs.clone())?.into(), + Literal::new(None, inputs[0].return_type()).into(), + inputs[0].clone(), + ]; Ok(inputs) } } diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 6ba1e4d511969..32141c3f51649 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -124,11 +124,11 @@ select nullif(v1, 1) from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchProject { exprs: [Nullif($0, 1:Int32)], expr_alias: [ ] } + BatchProject { exprs: [Case(($0 = 1:Int32), null:Int32, $0)], expr_alias: [ ] } BatchScan { table: t, columns: [v1] } stream_plan: | StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } - StreamProject { exprs: [Nullif($0, 1:Int32), $1], expr_alias: [ , ] } + StreamProject { exprs: [Case(($0 = 1:Int32), null:Int32, $0), $1], expr_alias: [ , ] } StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | create table t (v1 int); @@ -137,7 +137,7 @@ - sql: | create table t (v1 int); select nullif(v1, 'a') from t; - binder_error: 'Bind error: Nullif function cannot match types Int32 and Varchar' + binder_error: 'Feature is not yet implemented: Equal[Int32, Varchar], Tracking issue: https://github.com/singularity-data/risingwave/issues/112' - sql: | create table t (v1 int); select coalesce(v1, 1) from t; From d71f203639ef09652ce02e522a0e0ebd2cea95b6 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 14:17:42 -0400 Subject: [PATCH 17/31] fix --- proto/expr.proto | 3 +- src/expr/src/expr/expr_nullif.rs | 139 ------------------------- src/expr/src/expr/mod.rs | 3 - src/frontend/src/expr/function_call.rs | 1 - 4 files changed, 1 insertion(+), 145 deletions(-) delete mode 100644 src/expr/src/expr/expr_nullif.rs diff --git a/proto/expr.proto b/proto/expr.proto index 187b437cd412a..15a582ed4bf78 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -55,8 +55,7 @@ message ExprNode { ROUND = 214; ASCII = 215; TRANSLATE = 216; - NULLIF = 217; - COALESCE = 218; + COALESCE = 217; // Boolean comparison IS_TRUE = 301; IS_NOT_TRUE = 302; diff --git a/src/expr/src/expr/expr_nullif.rs b/src/expr/src/expr/expr_nullif.rs deleted file mode 100644 index c99d6a4df0725..0000000000000 --- a/src/expr/src/expr/expr_nullif.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::TryFrom; -use std::sync::Arc; - -use itertools::Itertools; -use risingwave_common::array::{ArrayRef, DataChunk}; -use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, ToOwnedDatum}; -use risingwave_common::{ensure, ensure_eq, try_match_expand}; -use risingwave_pb::expr::expr_node::{RexNode, Type}; -use risingwave_pb::expr::ExprNode; - -use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Expression}; - -#[derive(Debug)] -pub struct NullifExpression { - return_type: DataType, - left: BoxedExpression, - right: BoxedExpression, -} - -impl Expression for NullifExpression { - fn return_type(&self) -> DataType { - self.return_type.clone() - } - - fn eval(&self, input: &DataChunk) -> Result { - let left = self.left.eval(input)?; - let right = self.right.eval(input)?; - let mut builder = self.return_type.create_array_builder(input.cardinality())?; - - left.iter().zip_eq(right.iter()).try_for_each(|(c, d)| { - if c != d { - builder.append_datum(&c.to_owned_datum()) - } else { - builder.append_null() - } - })?; - Ok(Arc::new(builder.finish()?)) - } -} - -impl NullifExpression { - pub fn new(return_type: DataType, left: BoxedExpression, right: BoxedExpression) -> Self { - NullifExpression { - return_type, - left, - right, - } - } -} - -impl<'a> TryFrom<&'a ExprNode> for NullifExpression { - type Error = RwError; - - fn try_from(prost: &'a ExprNode) -> Result { - ensure!(prost.get_expr_type()? == Type::Nullif); - - let ret_type = DataType::from(prost.get_return_type()?); - let func_call_node = try_match_expand!(prost.get_rex_node().unwrap(), RexNode::FuncCall)?; - - let children = func_call_node.children.to_vec(); - // Nullif `func_call_node` have 2 child nodes. - ensure_eq!(children.len(), 2); - let left = expr_build_from_prost(&children[0])?; - let right = expr_build_from_prost(&children[1])?; - Ok(NullifExpression::new(ret_type, left, right)) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use risingwave_common::array::column::Column; - use risingwave_common::array::{DataChunk, PrimitiveArray}; - use risingwave_common::types::ScalarImpl; - use risingwave_pb::data::data_type::TypeName; - use risingwave_pb::data::DataType as ProstDataType; - use risingwave_pb::expr::expr_node::RexNode; - use risingwave_pb::expr::expr_node::Type::Nullif; - use risingwave_pb::expr::{ExprNode, FunctionCall}; - - use crate::expr::expr_nullif::NullifExpression; - use crate::expr::test_utils::make_input_ref; - use crate::expr::Expression; - - pub fn make_nullif_function(children: Vec, ret: TypeName) -> ExprNode { - ExprNode { - expr_type: Nullif as i32, - return_type: Some(ProstDataType { - type_name: ret as i32, - ..Default::default() - }), - rex_node: Some(RexNode::FuncCall(FunctionCall { children })), - } - } - - #[test] - fn test_nullif_expr() { - let input_node1 = make_input_ref(0, TypeName::Int32); - let input_node2 = make_input_ref(1, TypeName::Int32); - - let array = PrimitiveArray::::from_slice(&[Some(2), Some(2), Some(4), Some(3)]) - .map(|x| Arc::new(x.into())) - .unwrap(); - let col1 = Column::new(array); - let array = PrimitiveArray::::from_slice(&[Some(1), Some(3), Some(4), Some(3)]) - .map(|x| Arc::new(x.into())) - .unwrap(); - let col2 = Column::new(array); - - let data_chunk = DataChunk::builder().columns(vec![col1, col2]).build(); - - let nullif_expr = NullifExpression::try_from(&make_nullif_function( - vec![input_node1, input_node2], - TypeName::Int32, - )) - .unwrap(); - let res = nullif_expr.eval(&data_chunk).unwrap(); - assert_eq!(res.datum_at(0), Some(ScalarImpl::Int32(2))); - assert_eq!(res.datum_at(1), Some(ScalarImpl::Int32(2))); - assert_eq!(res.datum_at(2), None); - assert_eq!(res.datum_at(3), None); - } -} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 7c5d3ff672032..16834146da90d 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -25,7 +25,6 @@ mod expr_in; mod expr_input_ref; mod expr_is_null; mod expr_literal; -mod expr_nullif; mod expr_ternary_bytes; pub mod expr_unary; mod pg_sleep; @@ -47,7 +46,6 @@ use risingwave_pb::expr::ExprNode; use crate::expr::build_expr_from_prost::*; use crate::expr::expr_coalesce::CoalesceExpression; use crate::expr::expr_field::FieldExpression; -use crate::expr::expr_nullif::NullifExpression; pub type ExpressionRef = Arc; @@ -84,7 +82,6 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { Add | Subtract | Multiply | Divide | Modulus => build_binary_expr_prost(prost), Extract | RoundDigit | TumbleStart | Position => build_binary_expr_prost(prost), StreamNullByRowCount | And | Or => build_nullable_binary_expr_prost(prost), - Nullif => NullifExpression::try_from(prost).map(|d| Box::new(d) as BoxedExpression), Coalesce => CoalesceExpression::try_from(prost).map(|d| Box::new(d) as BoxedExpression), Substr => build_substr_expr(prost), Length => build_length_expr(prost), diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index c7d59b3eecd11..a9ea7d652cb25 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -106,7 +106,6 @@ impl FunctionCall { align_types(inputs.iter_mut())?; Ok(DataType::Boolean) } - ExprType::Nullif => Ok(inputs[0].return_type()), ExprType::Coalesce => Ok(inputs[0].return_type()), _ => infer_type( func_type, From 98c751b1a3e4ea0df643c1ee6f550b6afcdc1c80 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 20:21:55 -0400 Subject: [PATCH 18/31] fix --- src/common/src/array/struct_array.rs | 4 ++-- src/frontend/src/binder/expr/mod.rs | 13 +++++++++-- src/frontend/src/binder/expr/value.rs | 2 +- src/frontend/src/binder/values.rs | 21 ++++++++++++++++-- .../tests/testdata/struct_query.yaml | 22 +++++++++++++++++++ 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 7865affd8c23d..b305f6fe4937e 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -275,8 +275,8 @@ pub struct StructValue { } impl fmt::Display for StructValue { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { - Ok(()) + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.fields) } } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 0887a7bf6928d..aed826d723d31 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -14,14 +14,14 @@ use itertools::zip_eq; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, Scalar}; use risingwave_sqlparser::ast::{ BinaryOperator, DataType as AstDataType, DateTimeField, Expr, Query, TrimWhereField, UnaryOperator, }; use crate::binder::Binder; -use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, SubqueryKind}; +use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, Literal, SubqueryKind}; mod binary_op; mod column; @@ -102,6 +102,15 @@ impl Binder { list, negated, } => self.bind_in_list(*expr, list, negated), + Expr::Row(exprs) => { + let value = self.bind_row(&exprs)?; + Ok(ExprImpl::Literal(Box::new(Literal::new( + Some(value.to_scalar_value()), + DataType::Struct { + fields: vec![].into(), + }, + )))) + } _ => Err(ErrorCode::NotImplemented( format!("unsupported expression {:?}", expr), 112.into(), diff --git a/src/frontend/src/binder/expr/value.rs b/src/frontend/src/binder/expr/value.rs index 2b92015cf6f94..8dde1e5a0dc1d 100644 --- a/src/frontend/src/binder/expr/value.rs +++ b/src/frontend/src/binder/expr/value.rs @@ -21,7 +21,7 @@ use crate::binder::Binder; use crate::expr::Literal; impl Binder { - pub(super) fn bind_value(&mut self, value: Value) -> Result { + pub fn bind_value(&mut self, value: Value) -> Result { match value { Value::Number(s, b) => self.bind_number(s, b), Value::SingleQuotedString(s) => self.bind_string(s), diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 6efaacbaf131e..5ae7df53a9a33 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -13,10 +13,11 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::array::StructValue; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::DataType; -use risingwave_sqlparser::ast::Values; +use risingwave_common::types::{DataType, Datum, Scalar}; +use risingwave_sqlparser::ast::{Expr, Values}; use super::bind_context::Clause; use crate::binder::Binder; @@ -80,6 +81,22 @@ impl Binder { schema, }) } + + pub fn bind_row(&mut self, exprs: &[Expr]) -> Result { + let fields = exprs + .iter() + .map(|e| match e { + Expr::Value(value) => Ok(self.bind_value(value.clone())?.get_data().clone()), + Expr::Row(expr) => Ok(Some(self.bind_row(expr)?.to_scalar_value())), + _ => Err(ErrorCode::NotImplemented( + format!("unsupported expression {:?}", e), + 112.into(), + ) + .into()), + }) + .collect::>>()?; + Ok(StructValue::new(fields)) + } } #[cfg(test)] diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index 92b8daade7ecd..5e1318a0b377c 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -315,3 +315,25 @@ string address = 1; string zipcode = 3; } +- sql: | + create materialized view t as select * from s; + insert into t values (1,2,(1,2,3)); + logical_plan: | + LogicalProject { exprs: [Field($1, 1:Int32), $1, Field(Field($1, 1:Int32), 0:Int32)], expr_alias: [city, country, address] } + LogicalScan { table: t, columns: [id, country, zipcode, rate, _row_id#0] } + create_source: + row_format: protobuf + name: s + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 v1 = 1; + int32 v2 = 2; + V v3 = 3; + } + message V { + int32 v1 = 1; + int32 v2 = 2; + int32 v3 = 3; + } From f3519193aefc8e3859400676d367876e319a7e65 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 5 May 2022 21:30:22 -0400 Subject: [PATCH 19/31] fix --- src/frontend/test_runner/src/lib.rs | 10 ++++++++-- .../test_runner/tests/testdata/struct_query.yaml | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/frontend/test_runner/src/lib.rs b/src/frontend/test_runner/src/lib.rs index 1d9bc8506e9eb..45b65e9405e7e 100644 --- a/src/frontend/test_runner/src/lib.rs +++ b/src/frontend/test_runner/src/lib.rs @@ -93,6 +93,7 @@ pub struct CreateSource { row_format: String, name: String, file: Option, + materialized: Option, } #[serde_with::skip_serializing_none] @@ -200,11 +201,16 @@ impl TestCase { match self.create_source.clone() { Some(source) => { if let Some(content) = source.file { + let materialized = if let Some(true) = source.materialized { + "materialized".to_string() + } else { + "".to_string() + }; let sql = format!( - r#"CREATE SOURCE {} + r#"CREATE {} SOURCE {} WITH ('kafka.topic' = 'abc', 'kafka.servers' = 'localhost:1001') ROW FORMAT {} MESSAGE '.test.TestRecord' ROW SCHEMA LOCATION 'file://"#, - source.name, source.row_format + materialized, source.name, source.row_format ); let temp_file = create_proto_file(content.as_str()); self.run_sql( diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index 5e1318a0b377c..d8b9575f3ca1c 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -316,14 +316,14 @@ string zipcode = 3; } - sql: | - create materialized view t as select * from s; - insert into t values (1,2,(1,2,3)); + insert into s values (1,2,(1,2,3)); logical_plan: | LogicalProject { exprs: [Field($1, 1:Int32), $1, Field(Field($1, 1:Int32), 0:Int32)], expr_alias: [city, country, address] } LogicalScan { table: t, columns: [id, country, zipcode, rate, _row_id#0] } create_source: row_format: protobuf name: s + materialized: true file: | syntax = "proto3"; package test; From 3595deb4acbe7d54d3c00fa0dd96c651f8850134 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sat, 7 May 2022 19:47:26 -0400 Subject: [PATCH 20/31] fix --- src/frontend/src/binder/expr/function.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 5f514fa84f503..185b6d5bef6bc 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -59,11 +59,7 @@ impl Binder { "ltrim" => ExprType::Ltrim, "rtrim" => ExprType::Rtrim, "nullif" => { -<<<<<<< HEAD - inputs = Self::write_nullif_to_case_args(inputs)?; -======= inputs = Self::rewrite_nullif_to_case_when(inputs)?; ->>>>>>> main ExprType::Case } "coalesce" => { From ee031dc0cbdbc0be93c171ba20dd2b23136de0b4 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sat, 7 May 2022 20:07:38 -0400 Subject: [PATCH 21/31] fix --- src/common/src/array/struct_array.rs | 20 +++++++++- src/common/src/types/mod.rs | 38 ++++++++++++++++++- src/frontend/src/binder/expr/mod.rs | 5 +-- .../tests/testdata/struct_query.yaml | 11 ++++-- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index b305f6fe4937e..23c485b86f6bf 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -26,7 +26,7 @@ use super::{ }; use crate::array::ArrayRef; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::error::Result; +use crate::error::{internal_error, Result}; use crate::types::{to_datum_ref, DataType, Datum, DatumRef, Scalar, ScalarRefImpl}; /// This is a naive implementation of struct array. @@ -271,7 +271,7 @@ impl StructArray { #[derive(Clone, Debug, Eq, Default, PartialEq, Hash)] pub struct StructValue { - fields: Vec, + pub fields: Vec, } impl fmt::Display for StructValue { @@ -296,6 +296,22 @@ impl StructValue { pub fn new(fields: Vec) -> Self { Self { fields } } + + pub fn get_data_type(&self) -> Result { + let types = self + .fields + .iter() + .map(|d| match d { + None => Err(internal_error( + "cannot get data type from None Datum".to_string(), + )), + Some(s) => DataType::scalar_type_to_data_type(s), + }) + .collect::>>()?; + Ok(DataType::Struct { + fields: types.into(), + }) + } } #[derive(Copy, Clone)] diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 64b59bb3a866a..faa1036abe6ac 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -19,7 +19,7 @@ use bytes::{Buf, BufMut}; use risingwave_pb::data::DataType as ProstDataType; use serde::{Deserialize, Serialize}; -use crate::error::{ErrorCode, Result, RwError}; +use crate::error::{internal_error, ErrorCode, Result, RwError}; mod native_type; mod scalar_impl; @@ -142,6 +142,42 @@ impl DataType { }) } + pub fn scalar_type_to_data_type(scalar: &ScalarImpl) -> Result { + let data_type = match &scalar { + ScalarImpl::Int16(_) => DataType::Int16, + ScalarImpl::Int32(_) => DataType::Int32, + ScalarImpl::Int64(_) => DataType::Int64, + ScalarImpl::Float32(_) => DataType::Float32, + ScalarImpl::Float64(_) => DataType::Float64, + ScalarImpl::Utf8(_) => DataType::Varchar, + ScalarImpl::Bool(_) => DataType::Boolean, + ScalarImpl::Decimal(_) => DataType::Decimal, + ScalarImpl::Interval(_) => DataType::Interval, + ScalarImpl::NaiveDate(_) => DataType::Date, + ScalarImpl::NaiveDateTime(_) => DataType::Timestamp, + ScalarImpl::NaiveTime(_) => DataType::Time, + ScalarImpl::Struct(data) => { + let types = data + .fields + .iter() + .map(|d| match d { + None => Err(internal_error( + "cannot get data type from None Datum".to_string(), + )), + Some(s) => DataType::scalar_type_to_data_type(s), + }) + .collect::>>()?; + DataType::Struct { + fields: types.into(), + } + } + ScalarImpl::List(_) => { + todo!() + } + }; + Ok(data_type) + } + fn prost_type_name(&self) -> TypeName { match self { DataType::Int16 => TypeName::Int16, diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index aed826d723d31..b9f20d5c73726 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -104,11 +104,10 @@ impl Binder { } => self.bind_in_list(*expr, list, negated), Expr::Row(exprs) => { let value = self.bind_row(&exprs)?; + let data_type = value.get_data_type()?; Ok(ExprImpl::Literal(Box::new(Literal::new( Some(value.to_scalar_value()), - DataType::Struct { - fields: vec![].into(), - }, + data_type, )))) } _ => Err(ErrorCode::NotImplemented( diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index d8b9575f3ca1c..1dc348f82ab88 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -316,10 +316,10 @@ string zipcode = 3; } - sql: | - insert into s values (1,2,(1,2,3)); + insert into s values (1,2,(1,2,(1,2,3))); logical_plan: | - LogicalProject { exprs: [Field($1, 1:Int32), $1, Field(Field($1, 1:Int32), 0:Int32)], expr_alias: [city, country, address] } - LogicalScan { table: t, columns: [id, country, zipcode, rate, _row_id#0] } + LogicalInsert { table: s } + LogicalValues { rows: [[1:Int32, 2:Int32, [Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))]:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }]], schema: Schema { fields: [:Int32, :Int32, :Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }] } } create_source: row_format: protobuf name: s @@ -333,6 +333,11 @@ V v3 = 3; } message V { + int32 v1 = 1; + int32 v2 = 2; + U v3 = 3; + } + message U { int32 v1 = 1; int32 v2 = 2; int32 v3 = 3; From db2be86670089eac6c032b39d9a63be413651ba4 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sat, 7 May 2022 20:31:17 -0400 Subject: [PATCH 22/31] fix --- src/batch/src/executor2/values.rs | 58 +++++++++++++++++++++++++++- src/common/src/array/struct_array.rs | 2 +- src/common/src/types/mod.rs | 18 +-------- src/expr/src/expr/expr_literal.rs | 1 + 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/src/batch/src/executor2/values.rs b/src/batch/src/executor2/values.rs index c0e8eaed0dd70..e7dd631f883e7 100644 --- a/src/batch/src/executor2/values.rs +++ b/src/batch/src/executor2/values.rs @@ -143,9 +143,12 @@ impl BoxedExecutor2Builder for ValuesExecutor2 { #[cfg(test)] mod tests { + use futures::stream::StreamExt; use risingwave_common::array; - use risingwave_common::array::{I16Array, I32Array, I64Array}; + use risingwave_common::array::{ + ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue, + }; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_expr::expr::{BoxedExpression, LiteralExpression}; @@ -205,6 +208,59 @@ mod tests { } } + #[tokio::test] + async fn test_struct_values_executor() { + let value = StructValue::new(vec![ + Some(ScalarImpl::Int32(1)), + Some(ScalarImpl::Int32(2)), + Some(ScalarImpl::Int32(3)), + ]); + let exprs = vec![Box::new(LiteralExpression::new( + DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), + }, + Some(ScalarImpl::Struct(value)), + )) as BoxedExpression]; + + let fields = exprs + .iter() // for each column + .map(|col| Field::unnamed(col.return_type())) + .collect::>(); + + let values_executor = Box::new(ValuesExecutor2 { + rows: vec![exprs].into_iter(), + schema: Schema { fields }, + identity: "ValuesExecutor2".to_string(), + chunk_size: 1024, + }); + + let fields = &values_executor.schema().fields; + assert_eq!( + fields[0].data_type, + DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into() + } + ); + + let mut stream = values_executor.execute(); + let result = stream.next().await.unwrap(); + + let array: ArrayImpl = StructArray::from_slices( + &[true], + vec![ + array! { I32Array, [Some(1)] }.into(), + array! { I32Array, [Some(2)] }.into(), + array! { I32Array, [Some(3)] }.into(), + ], + vec![DataType::Int32, DataType::Int32, DataType::Int32], + ) + .unwrap() + .into(); + if let Ok(result) = result { + assert_eq!(*result.column_at(0).array(), array); + } + } + #[tokio::test] async fn test_chunk_split_size() { let rows = [ diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 23c485b86f6bf..dd4a31244494c 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -271,7 +271,7 @@ impl StructArray { #[derive(Clone, Debug, Eq, Default, PartialEq, Hash)] pub struct StructValue { - pub fields: Vec, + fields: Vec, } impl fmt::Display for StructValue { diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index faa1036abe6ac..02d984594a91a 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -19,7 +19,7 @@ use bytes::{Buf, BufMut}; use risingwave_pb::data::DataType as ProstDataType; use serde::{Deserialize, Serialize}; -use crate::error::{internal_error, ErrorCode, Result, RwError}; +use crate::error::{ErrorCode, Result, RwError}; mod native_type; mod scalar_impl; @@ -156,21 +156,7 @@ impl DataType { ScalarImpl::NaiveDate(_) => DataType::Date, ScalarImpl::NaiveDateTime(_) => DataType::Timestamp, ScalarImpl::NaiveTime(_) => DataType::Time, - ScalarImpl::Struct(data) => { - let types = data - .fields - .iter() - .map(|d| match d { - None => Err(internal_error( - "cannot get data type from None Datum".to_string(), - )), - Some(s) => DataType::scalar_type_to_data_type(s), - }) - .collect::>>()?; - DataType::Struct { - fields: types.into(), - } - } + ScalarImpl::Struct(data) => data.get_data_type()?, ScalarImpl::List(_) => { todo!() } diff --git a/src/expr/src/expr/expr_literal.rs b/src/expr/src/expr/expr_literal.rs index 117f735b1dd50..6a5e5669616ae 100644 --- a/src/expr/src/expr/expr_literal.rs +++ b/src/expr/src/expr/expr_literal.rs @@ -103,6 +103,7 @@ fn literal_type_match(return_type: &DataType, literal: Option<&ScalarImpl>) -> b | (DataType::Timestamp, ScalarImpl::NaiveDateTime(_)) | (DataType::Decimal, ScalarImpl::Decimal(_)) | (DataType::Interval, ScalarImpl::Interval(_)) + | (DataType::Struct { .. }, ScalarImpl::Struct(_)) ) } None => true, From 6b2dacdac9b4937d95b77d1ac9ba481009bd55dc Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sat, 7 May 2022 21:01:33 -0400 Subject: [PATCH 23/31] fix --- src/batch/src/executor2/insert.rs | 37 ++++++++++++++++++++++++++----- src/common/src/catalog/schema.rs | 18 +++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/batch/src/executor2/insert.rs b/src/batch/src/executor2/insert.rs index 32c7ce65bba85..e28c02de4641a 100644 --- a/src/batch/src/executor2/insert.rs +++ b/src/batch/src/executor2/insert.rs @@ -177,7 +177,7 @@ mod tests { use std::sync::Arc; use futures::StreamExt; - use risingwave_common::array::{Array, I64Array}; + use risingwave_common::array::{Array, I32Array, I64Array, StructArray}; use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId}; use risingwave_common::column_nonnull; use risingwave_common::types::DataType; @@ -195,11 +195,11 @@ mod tests { let store = MemoryStateStore::new(); // Schema for mock executor. - let schema = schema_test_utils::ii(); + let schema = schema_test_utils::iis(); let mut mock_executor = MockExecutor::new(schema.clone()); // Schema of the table - let schema = schema_test_utils::iii(); + let schema = schema_test_utils::iiis(); let table_columns: Vec<_> = schema .fields @@ -216,7 +216,19 @@ mod tests { let col1 = column_nonnull! { I64Array, [1, 3, 5, 7, 9] }; let col2 = column_nonnull! { I64Array, [2, 4, 6, 8, 10] }; - let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2]).build(); + let array = StructArray::from_slices( + &[true, false, false, false,false], + vec![ + array! { I32Array, [Some(1),None,None,None,None] }.into(), + array! { I32Array, [Some(2),None,None,None,None] }.into(), + array! { I32Array, [Some(3),None,None,None,None] }.into(), + ], + vec![DataType::Int32, DataType::Int32, DataType::Int32], + ) + .map(|x| Arc::new(x.into())) + .unwrap(); + let col3 = Column::new(array); + let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2, col3]).build(); mock_executor.add(data_chunk.clone()); // Create the table. @@ -227,7 +239,7 @@ mod tests { let source_desc = source_manager.get_source(&table_id)?; let source = source_desc.source.as_table_v2().unwrap(); let mut reader = source - .stream_reader(vec![0.into(), 1.into(), 2.into()]) + .stream_reader(vec![0.into(), 1.into(), 2.into(), 3.into()]) .await?; // Insert @@ -276,6 +288,21 @@ mod tests { vec![Some(2), Some(4), Some(6), Some(8), Some(10)] ); + let array: ArrayImpl = StructArray::from_slices( + &[true, false, false, false,false], + vec![ + array! { I32Array, [Some(1),None,None,None,None] }.into(), + array! { I32Array, [Some(2),None,None,None,None] }.into(), + array! { I32Array, [Some(3),None,None,None,None] }.into(), + ], + vec![DataType::Int32, DataType::Int32, DataType::Int32], + ) + .unwrap() + .into(); + assert_eq!(*chunk.chunk.columns()[2] + .array(), array); + + // There's nothing in store since `TableSourceV2` has no side effect. // Data will be materialized in associated streaming task. let epoch = u64::MAX; diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index 3df42c5935b73..a8c4c99d66894 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -232,11 +232,29 @@ pub mod test_utils { int32_n::<2>() } + /// Create a util schema **for test only** with two int32 fields. + pub fn iis() -> Schema { + let mut schema = int32_n::<2>(); + schema.fields.push(Field::unnamed(DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), + })); + schema + } + /// Create a util schema **for test only** with three int32 fields. pub fn iii() -> Schema { int32_n::<3>() } + /// Create a util schema **for test only** with two int32 fields. + pub fn iiis() -> Schema { + let mut schema = int32_n::<3>(); + schema.fields.push(Field::unnamed(DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), + })); + schema + } + fn varchar_n() -> Schema { field_n::(DataType::Varchar) } From b8c3383c964f2c3a15868d75875d9bceb0668c10 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sun, 8 May 2022 08:12:35 -0400 Subject: [PATCH 24/31] fix --- src/batch/src/executor2/insert.rs | 24 ++++++---- src/batch/src/executor2/values.rs | 76 +++++++++++-------------------- src/common/src/catalog/schema.rs | 18 -------- 3 files changed, 40 insertions(+), 78 deletions(-) diff --git a/src/batch/src/executor2/insert.rs b/src/batch/src/executor2/insert.rs index e28c02de4641a..06c7c92b389ed 100644 --- a/src/batch/src/executor2/insert.rs +++ b/src/batch/src/executor2/insert.rs @@ -194,13 +194,19 @@ mod tests { let source_manager = Arc::new(MemSourceManager::default()); let store = MemoryStateStore::new(); + // Make struct field + let struct_field = Field::unnamed(DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), + }); + // Schema for mock executor. - let schema = schema_test_utils::iis(); + let mut schema = schema_test_utils::ii(); + schema.fields.push(struct_field.clone()); let mut mock_executor = MockExecutor::new(schema.clone()); // Schema of the table - let schema = schema_test_utils::iiis(); - + let mut schema = schema_test_utils::iii(); + schema.fields.push(struct_field); let table_columns: Vec<_> = schema .fields .iter() @@ -217,7 +223,7 @@ mod tests { let col1 = column_nonnull! { I64Array, [1, 3, 5, 7, 9] }; let col2 = column_nonnull! { I64Array, [2, 4, 6, 8, 10] }; let array = StructArray::from_slices( - &[true, false, false, false,false], + &[true, false, false, false, false], vec![ array! { I32Array, [Some(1),None,None,None,None] }.into(), array! { I32Array, [Some(2),None,None,None,None] }.into(), @@ -289,7 +295,7 @@ mod tests { ); let array: ArrayImpl = StructArray::from_slices( - &[true, false, false, false,false], + &[true, false, false, false, false], vec![ array! { I32Array, [Some(1),None,None,None,None] }.into(), array! { I32Array, [Some(2),None,None,None,None] }.into(), @@ -297,11 +303,9 @@ mod tests { ], vec![DataType::Int32, DataType::Int32, DataType::Int32], ) - .unwrap() - .into(); - assert_eq!(*chunk.chunk.columns()[2] - .array(), array); - + .unwrap() + .into(); + assert_eq!(*chunk.chunk.columns()[2].array(), array); // There's nothing in store since `TableSourceV2` has no side effect. // Data will be materialized in associated streaming task. diff --git a/src/batch/src/executor2/values.rs b/src/batch/src/executor2/values.rs index e7dd631f883e7..9f42c10cf58f7 100644 --- a/src/batch/src/executor2/values.rs +++ b/src/batch/src/executor2/values.rs @@ -157,6 +157,11 @@ mod tests { #[tokio::test] async fn test_values_executor() { + let value = StructValue::new(vec![ + Some(ScalarImpl::Int32(1)), + Some(ScalarImpl::Int32(2)), + Some(ScalarImpl::Int32(3)), + ]); let exprs = vec![ Box::new(LiteralExpression::new( DataType::Int16, @@ -170,6 +175,12 @@ mod tests { DataType::Int64, Some(ScalarImpl::Int64(3)), )), + Box::new(LiteralExpression::new( + DataType::Struct { + fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), + }, + Some(ScalarImpl::Struct(value)), + )) as BoxedExpression, ]; let fields = exprs @@ -188,55 +199,8 @@ mod tests { assert_eq!(fields[0].data_type, DataType::Int16); assert_eq!(fields[1].data_type, DataType::Int32); assert_eq!(fields[2].data_type, DataType::Int64); - - let mut stream = values_executor.execute(); - let result = stream.next().await.unwrap(); - - if let Ok(result) = result { - assert_eq!( - *result.column_at(0).array(), - array! {I16Array, [Some(1_i16)]}.into() - ); - assert_eq!( - *result.column_at(1).array(), - array! {I32Array, [Some(2)]}.into() - ); - assert_eq!( - *result.column_at(2).array(), - array! {I64Array, [Some(3)]}.into() - ); - } - } - - #[tokio::test] - async fn test_struct_values_executor() { - let value = StructValue::new(vec![ - Some(ScalarImpl::Int32(1)), - Some(ScalarImpl::Int32(2)), - Some(ScalarImpl::Int32(3)), - ]); - let exprs = vec![Box::new(LiteralExpression::new( - DataType::Struct { - fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), - }, - Some(ScalarImpl::Struct(value)), - )) as BoxedExpression]; - - let fields = exprs - .iter() // for each column - .map(|col| Field::unnamed(col.return_type())) - .collect::>(); - - let values_executor = Box::new(ValuesExecutor2 { - rows: vec![exprs].into_iter(), - schema: Schema { fields }, - identity: "ValuesExecutor2".to_string(), - chunk_size: 1024, - }); - - let fields = &values_executor.schema().fields; assert_eq!( - fields[0].data_type, + fields[3].data_type, DataType::Struct { fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into() } @@ -244,7 +208,6 @@ mod tests { let mut stream = values_executor.execute(); let result = stream.next().await.unwrap(); - let array: ArrayImpl = StructArray::from_slices( &[true], vec![ @@ -256,8 +219,21 @@ mod tests { ) .unwrap() .into(); + if let Ok(result) = result { - assert_eq!(*result.column_at(0).array(), array); + assert_eq!( + *result.column_at(0).array(), + array! {I16Array, [Some(1_i16)]}.into() + ); + assert_eq!( + *result.column_at(1).array(), + array! {I32Array, [Some(2)]}.into() + ); + assert_eq!( + *result.column_at(2).array(), + array! {I64Array, [Some(3)]}.into() + ); + assert_eq!(*result.column_at(3).array(), array); } } diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index a8c4c99d66894..3df42c5935b73 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -232,29 +232,11 @@ pub mod test_utils { int32_n::<2>() } - /// Create a util schema **for test only** with two int32 fields. - pub fn iis() -> Schema { - let mut schema = int32_n::<2>(); - schema.fields.push(Field::unnamed(DataType::Struct { - fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), - })); - schema - } - /// Create a util schema **for test only** with three int32 fields. pub fn iii() -> Schema { int32_n::<3>() } - /// Create a util schema **for test only** with two int32 fields. - pub fn iiis() -> Schema { - let mut schema = int32_n::<3>(); - schema.fields.push(Field::unnamed(DataType::Struct { - fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(), - })); - schema - } - fn varchar_n() -> Schema { field_n::(DataType::Varchar) } From b253d602d068c295c33f28b614696092426a13f7 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sun, 8 May 2022 08:32:47 -0400 Subject: [PATCH 25/31] fix --- src/common/src/types/mod.rs | 1 + src/frontend/src/binder/values.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 02d984594a91a..58d263b1a58a3 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -142,6 +142,7 @@ impl DataType { }) } + /// Transfer `scalar_type` to `data_type`. pub fn scalar_type_to_data_type(scalar: &ScalarImpl) -> Result { let data_type = match &scalar { ScalarImpl::Int16(_) => DataType::Int16, diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 5ae7df53a9a33..ed077108fb870 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -82,6 +82,8 @@ impl Binder { }) } + /// Bind row to `struct_value` for insert nested column. + /// Only accept value and row expr in row. pub fn bind_row(&mut self, exprs: &[Expr]) -> Result { let fields = exprs .iter() From d4c31531fdbc20c7fd44e67203478274d06093be Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Sun, 8 May 2022 21:30:24 -0400 Subject: [PATCH 26/31] fix --- src/common/src/array/list_array.rs | 4 ++++ src/common/src/array/struct_array.rs | 18 +++-------------- src/common/src/types/mod.rs | 30 +++++++++++++++++++++++----- src/frontend/src/binder/expr/mod.rs | 9 ++++++++- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index 24c2436f83470..d5ef8cfe90002 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -288,6 +288,10 @@ impl ListValue { pub fn new(values: Vec) -> Self { Self { values } } + + pub fn get_values(&self) -> &[Datum] { + &self.values + } } #[derive(Copy, Clone)] diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index dd4a31244494c..98d0bce0e997e 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -26,7 +26,7 @@ use super::{ }; use crate::array::ArrayRef; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::error::{internal_error, Result}; +use crate::error::Result; use crate::types::{to_datum_ref, DataType, Datum, DatumRef, Scalar, ScalarRefImpl}; /// This is a naive implementation of struct array. @@ -297,20 +297,8 @@ impl StructValue { Self { fields } } - pub fn get_data_type(&self) -> Result { - let types = self - .fields - .iter() - .map(|d| match d { - None => Err(internal_error( - "cannot get data type from None Datum".to_string(), - )), - Some(s) => DataType::scalar_type_to_data_type(s), - }) - .collect::>>()?; - Ok(DataType::Struct { - fields: types.into(), - }) + pub fn get_fields(&self) -> &[Datum] { + &self.fields } } diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 58d263b1a58a3..d03117a3b2fe5 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -19,7 +19,7 @@ use bytes::{Buf, BufMut}; use risingwave_pb::data::DataType as ProstDataType; use serde::{Deserialize, Serialize}; -use crate::error::{ErrorCode, Result, RwError}; +use crate::error::{internal_error, ErrorCode, Result, RwError}; mod native_type; mod scalar_impl; @@ -143,7 +143,15 @@ impl DataType { } /// Transfer `scalar_type` to `data_type`. - pub fn scalar_type_to_data_type(scalar: &ScalarImpl) -> Result { + pub fn datum_type_to_data_type(datum: &Datum) -> Result { + let scalar = match datum { + Some(scalar) => scalar, + None => { + return Err(internal_error( + "cannot get data type from None Datum".to_string(), + )); + } + }; let data_type = match &scalar { ScalarImpl::Int16(_) => DataType::Int16, ScalarImpl::Int32(_) => DataType::Int32, @@ -157,9 +165,21 @@ impl DataType { ScalarImpl::NaiveDate(_) => DataType::Date, ScalarImpl::NaiveDateTime(_) => DataType::Timestamp, ScalarImpl::NaiveTime(_) => DataType::Time, - ScalarImpl::Struct(data) => data.get_data_type()?, - ScalarImpl::List(_) => { - todo!() + ScalarImpl::Struct(data) => { + let types = data + .get_fields() + .iter() + .map(DataType::datum_type_to_data_type) + .collect::>>()?; + DataType::Struct { + fields: types.into(), + } + } + ScalarImpl::List(data) => { + let data = data.get_values().get(0).ok_or_else(|| { + internal_error("cannot get data type from empty list".to_string()) + })?; + DataType::datum_type_to_data_type(data)? } }; Ok(data_type) diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index b9f20d5c73726..9ff6284975381 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -104,7 +104,14 @@ impl Binder { } => self.bind_in_list(*expr, list, negated), Expr::Row(exprs) => { let value = self.bind_row(&exprs)?; - let data_type = value.get_data_type()?; + let data_type = DataType::Struct { + fields: value + .get_fields() + .iter() + .map(DataType::datum_type_to_data_type) + .collect::>>()? + .into(), + }; Ok(ExprImpl::Literal(Box::new(Literal::new( Some(value.to_scalar_value()), data_type, From fd23fc5f78e653b08764c5e13f79a5197d2c7782 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Mon, 9 May 2022 06:33:31 -0400 Subject: [PATCH 27/31] fix --- src/common/src/types/mod.rs | 2 +- src/frontend/src/binder/values.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index d03117a3b2fe5..5a77ab1271548 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -142,7 +142,7 @@ impl DataType { }) } - /// Transfer `scalar_type` to `data_type`. + /// Transfer `datum_type` to `data_type`. pub fn datum_type_to_data_type(datum: &Datum) -> Result { let scalar = match datum { Some(scalar) => scalar, diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index ed077108fb870..614241844eecc 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -82,7 +82,8 @@ impl Binder { }) } - /// Bind row to `struct_value` for insert nested column. + /// Bind row to `struct_value` for nested column, + /// e.g. Row(1,2,(1,2,3)). /// Only accept value and row expr in row. pub fn bind_row(&mut self, exprs: &[Expr]) -> Result { let fields = exprs From 9e34ea61643c70dfe5aafa489306bab8ec5094d3 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Mon, 9 May 2022 19:25:10 -0400 Subject: [PATCH 28/31] fix --- src/frontend/src/expr/type_inference.rs | 1 + .../tests/testdata/struct_query.yaml | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/frontend/src/expr/type_inference.rs b/src/frontend/src/expr/type_inference.rs index 2c5809f46f8ab..1259a6a86e0e3 100644 --- a/src/frontend/src/expr/type_inference.rs +++ b/src/frontend/src/expr/type_inference.rs @@ -206,6 +206,7 @@ fn build_type_derive_map() -> HashMap { E::GreaterThanOrEqual, ]; build_binary_cmp_funcs(&mut map, cmp_exprs, &num_types); + build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Struct]); build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Date, T::Timestamp, T::Timestampz]); build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time, T::Interval]); for e in cmp_exprs { diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index 1dc348f82ab88..fdc77ce44adbd 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -342,3 +342,31 @@ int32 v2 = 2; int32 v3 = 3; } +- sql: | + select * from s where s.v3 = (1,2,(1,2,3)); + logical_plan: | + LogicalProject { exprs: [$1, $2, $3], expr_alias: [v1, v2, v3] } + LogicalFilter { predicate: ($3 = [Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))]:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }) } + LogicalScan { table: s, columns: [_row_id#0, v1, v2, v3] } + create_source: + row_format: protobuf + name: s + materialized: true + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 v1 = 1; + int32 v2 = 2; + V v3 = 3; + } + message V { + int32 v1 = 1; + int32 v2 = 2; + U v3 = 3; + } + message U { + int32 v1 = 1; + int32 v2 = 2; + int32 v3 = 3; + } \ No newline at end of file From cfa1b7417f1c7d8d3d9efed84b52ae7f64c57aca Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Tue, 10 May 2022 07:25:10 -0400 Subject: [PATCH 29/31] fix --- src/common/src/array/list_array.rs | 2 +- src/common/src/array/struct_array.rs | 8 ++- src/common/src/types/mod.rs | 52 ++++--------------- src/common/src/types/scalar_impl.rs | 34 ++++++++++++ src/frontend/src/binder/expr/mod.rs | 24 ++------- src/frontend/src/binder/values.rs | 21 +++++--- .../tests/testdata/struct_query.yaml | 4 +- 7 files changed, 72 insertions(+), 73 deletions(-) diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index d5ef8cfe90002..6ccfb8360b619 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -289,7 +289,7 @@ impl ListValue { Self { values } } - pub fn get_values(&self) -> &[Datum] { + pub fn values(&self) -> &[Datum] { &self.values } } diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 98d0bce0e997e..5980be39c6b8a 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -276,7 +276,11 @@ pub struct StructValue { impl fmt::Display for StructValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.fields) + write!( + f, + "{{{}}}", + self.fields.iter().map(|f| format!("{:?}", f)).join(", ") + ) } } @@ -297,7 +301,7 @@ impl StructValue { Self { fields } } - pub fn get_fields(&self) -> &[Datum] { + pub fn fields(&self) -> &[Datum] { &self.fields } } diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 5a77ab1271548..03a267881a793 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -142,49 +142,6 @@ impl DataType { }) } - /// Transfer `datum_type` to `data_type`. - pub fn datum_type_to_data_type(datum: &Datum) -> Result { - let scalar = match datum { - Some(scalar) => scalar, - None => { - return Err(internal_error( - "cannot get data type from None Datum".to_string(), - )); - } - }; - let data_type = match &scalar { - ScalarImpl::Int16(_) => DataType::Int16, - ScalarImpl::Int32(_) => DataType::Int32, - ScalarImpl::Int64(_) => DataType::Int64, - ScalarImpl::Float32(_) => DataType::Float32, - ScalarImpl::Float64(_) => DataType::Float64, - ScalarImpl::Utf8(_) => DataType::Varchar, - ScalarImpl::Bool(_) => DataType::Boolean, - ScalarImpl::Decimal(_) => DataType::Decimal, - ScalarImpl::Interval(_) => DataType::Interval, - ScalarImpl::NaiveDate(_) => DataType::Date, - ScalarImpl::NaiveDateTime(_) => DataType::Timestamp, - ScalarImpl::NaiveTime(_) => DataType::Time, - ScalarImpl::Struct(data) => { - let types = data - .get_fields() - .iter() - .map(DataType::datum_type_to_data_type) - .collect::>>()?; - DataType::Struct { - fields: types.into(), - } - } - ScalarImpl::List(data) => { - let data = data.get_values().get(0).ok_or_else(|| { - internal_error("cannot get data type from empty list".to_string()) - })?; - DataType::datum_type_to_data_type(data)? - } - }; - Ok(data_type) - } - fn prost_type_name(&self) -> TypeName { match self { DataType::Int16 => TypeName::Int16, @@ -349,6 +306,15 @@ for_all_scalar_variants! { scalar_impl_enum } pub type Datum = Option; pub type DatumRef<'a> = Option>; +pub fn get_data_type_from_datum(datum: &Datum) -> Result { + match datum { + None => Err(internal_error( + "cannot get data type from None Datum".to_string(), + )), + Some(scalar) => scalar.data_type(), + } +} + /// Convert a [`Datum`] to a [`DatumRef`]. pub fn to_datum_ref(datum: &Datum) -> DatumRef<'_> { datum.as_ref().map(|d| d.as_scalar_ref_impl()) diff --git a/src/common/src/types/scalar_impl.rs b/src/common/src/types/scalar_impl.rs index c1fc04344e8a6..f4dc72fc608be 100644 --- a/src/common/src/types/scalar_impl.rs +++ b/src/common/src/types/scalar_impl.rs @@ -300,6 +300,40 @@ impl ScalarImpl { } for_all_scalar_variants! { impl_all_get_ident, self } } + + pub(crate) fn data_type(&self) -> Result { + let data_type = match self { + ScalarImpl::Int16(_) => DataType::Int16, + ScalarImpl::Int32(_) => DataType::Int32, + ScalarImpl::Int64(_) => DataType::Int64, + ScalarImpl::Float32(_) => DataType::Float32, + ScalarImpl::Float64(_) => DataType::Float64, + ScalarImpl::Utf8(_) => DataType::Varchar, + ScalarImpl::Bool(_) => DataType::Boolean, + ScalarImpl::Decimal(_) => DataType::Decimal, + ScalarImpl::Interval(_) => DataType::Interval, + ScalarImpl::NaiveDate(_) => DataType::Date, + ScalarImpl::NaiveDateTime(_) => DataType::Timestamp, + ScalarImpl::NaiveTime(_) => DataType::Time, + ScalarImpl::Struct(data) => { + let types = data + .fields() + .iter() + .map(get_data_type_from_datum) + .collect::>>()?; + DataType::Struct { + fields: types.into(), + } + } + ScalarImpl::List(data) => { + let data = data.values().get(0).ok_or_else(|| { + internal_error("cannot get data type from empty list".to_string()) + })?; + get_data_type_from_datum(data)? + } + }; + Ok(data_type) + } } impl<'scalar> ScalarRefImpl<'scalar> { diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 9ff6284975381..f326b65d09c70 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -13,15 +13,15 @@ // limitations under the License. use itertools::zip_eq; -use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::{DataType, Scalar}; +use risingwave_common::error::{ErrorCode, Result, TrackingIssue}; +use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{ BinaryOperator, DataType as AstDataType, DateTimeField, Expr, Query, TrimWhereField, UnaryOperator, }; use crate::binder::Binder; -use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, Literal, SubqueryKind}; +use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, SubqueryKind}; mod binary_op; mod column; @@ -102,24 +102,10 @@ impl Binder { list, negated, } => self.bind_in_list(*expr, list, negated), - Expr::Row(exprs) => { - let value = self.bind_row(&exprs)?; - let data_type = DataType::Struct { - fields: value - .get_fields() - .iter() - .map(DataType::datum_type_to_data_type) - .collect::>>()? - .into(), - }; - Ok(ExprImpl::Literal(Box::new(Literal::new( - Some(value.to_scalar_value()), - data_type, - )))) - } + Expr::Row(exprs) => Ok(ExprImpl::Literal(Box::new(self.bind_row(&exprs)?))), _ => Err(ErrorCode::NotImplemented( format!("unsupported expression {:?}", expr), - 112.into(), + TrackingIssue::none(), ) .into()), } diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 614241844eecc..3ab8aa3a0add5 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -16,12 +16,12 @@ use itertools::Itertools; use risingwave_common::array::StructValue; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::{DataType, Datum, Scalar}; +use risingwave_common::types::{get_data_type_from_datum, DataType, Datum, Scalar}; use risingwave_sqlparser::ast::{Expr, Values}; use super::bind_context::Clause; use crate::binder::Binder; -use crate::expr::{align_types, ExprImpl}; +use crate::expr::{align_types, ExprImpl, Literal}; #[derive(Debug)] pub struct BoundValues { @@ -85,12 +85,12 @@ impl Binder { /// Bind row to `struct_value` for nested column, /// e.g. Row(1,2,(1,2,3)). /// Only accept value and row expr in row. - pub fn bind_row(&mut self, exprs: &[Expr]) -> Result { - let fields = exprs + pub fn bind_row(&mut self, exprs: &[Expr]) -> Result { + let datums = exprs .iter() .map(|e| match e { Expr::Value(value) => Ok(self.bind_value(value.clone())?.get_data().clone()), - Expr::Row(expr) => Ok(Some(self.bind_row(expr)?.to_scalar_value())), + Expr::Row(expr) => Ok(self.bind_row(expr)?.get_data().clone()), _ => Err(ErrorCode::NotImplemented( format!("unsupported expression {:?}", e), 112.into(), @@ -98,7 +98,16 @@ impl Binder { .into()), }) .collect::>>()?; - Ok(StructValue::new(fields)) + let value = StructValue::new(datums); + let data_type = DataType::Struct { + fields: value + .fields() + .iter() + .map(get_data_type_from_datum) + .collect::>>()? + .into(), + }; + Ok(Literal::new(Some(value.to_scalar_value()), data_type)) } } diff --git a/src/frontend/test_runner/tests/testdata/struct_query.yaml b/src/frontend/test_runner/tests/testdata/struct_query.yaml index fdc77ce44adbd..6e2ed4b319493 100644 --- a/src/frontend/test_runner/tests/testdata/struct_query.yaml +++ b/src/frontend/test_runner/tests/testdata/struct_query.yaml @@ -319,7 +319,7 @@ insert into s values (1,2,(1,2,(1,2,3))); logical_plan: | LogicalInsert { table: s } - LogicalValues { rows: [[1:Int32, 2:Int32, [Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))]:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }]], schema: Schema { fields: [:Int32, :Int32, :Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }] } } + LogicalValues { rows: [[1:Int32, 2:Int32, {Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))}:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }]], schema: Schema { fields: [:Int32, :Int32, :Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }] } } create_source: row_format: protobuf name: s @@ -346,7 +346,7 @@ select * from s where s.v3 = (1,2,(1,2,3)); logical_plan: | LogicalProject { exprs: [$1, $2, $3], expr_alias: [v1, v2, v3] } - LogicalFilter { predicate: ($3 = [Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))]:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }) } + LogicalFilter { predicate: ($3 = {Some(Int32(1)), Some(Int32(2)), Some(Struct(StructValue { fields: [Some(Int32(1)), Some(Int32(2)), Some(Int32(3))] }))}:Struct { fields: [Int32, Int32, Struct { fields: [Int32, Int32, Int32] }] }) } LogicalScan { table: s, columns: [_row_id#0, v1, v2, v3] } create_source: row_format: protobuf From 4c827a81cbe28d3a06d854c50a46ff94f5058db7 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Tue, 10 May 2022 07:51:14 -0400 Subject: [PATCH 30/31] fix --- src/frontend/src/binder/expr/mod.rs | 4 ++-- src/frontend/src/binder/values.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index f326b65d09c70..c8bb5bec5136e 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. use itertools::zip_eq; -use risingwave_common::error::{ErrorCode, Result, TrackingIssue}; +use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{ BinaryOperator, DataType as AstDataType, DateTimeField, Expr, Query, TrimWhereField, @@ -105,7 +105,7 @@ impl Binder { Expr::Row(exprs) => Ok(ExprImpl::Literal(Box::new(self.bind_row(&exprs)?))), _ => Err(ErrorCode::NotImplemented( format!("unsupported expression {:?}", expr), - TrackingIssue::none(), + 112.into(), ) .into()), } diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 3ab8aa3a0add5..dc400ad478ad6 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_common::array::StructValue; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::{ErrorCode, Result, TrackingIssue}; use risingwave_common::types::{get_data_type_from_datum, DataType, Datum, Scalar}; use risingwave_sqlparser::ast::{Expr, Values}; @@ -93,7 +93,7 @@ impl Binder { Expr::Row(expr) => Ok(self.bind_row(expr)?.get_data().clone()), _ => Err(ErrorCode::NotImplemented( format!("unsupported expression {:?}", e), - 112.into(), + TrackingIssue::none(), ) .into()), }) From c82805a91807ea2c4fa5a7bc72b549451f8f2ea5 Mon Sep 17 00:00:00 2001 From: cykbls01 <2541601705@qq.com> Date: Thu, 12 May 2022 07:17:33 -0400 Subject: [PATCH 31/31] fix --- src/common/src/types/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 9f629fd7d7add..4cc675d4b98c8 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -328,6 +328,7 @@ pub type DatumRef<'a> = Option>; pub fn get_data_type_from_datum(datum: &Datum) -> Result { match datum { + // TODO: Predicate data type from None Datum None => Err(internal_error( "cannot get data type from None Datum".to_string(), )),