From 61021b86ee84ba00bbc471db709bc865de2947e1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 25 Aug 2022 19:26:57 +0800 Subject: [PATCH] chore(expr): cleanup all `RwError`s (#4873) * chore(expr): cleanup all `RwError`s Signed-off-by: Bugen Zhao * fix Signed-off-by: Bugen Zhao Signed-off-by: Bugen Zhao --- src/batch/src/executor/hash_agg.rs | 4 ++-- src/batch/src/executor/sort_agg.rs | 16 +++++++++----- src/expr/src/error.rs | 10 +++++++++ src/expr/src/expr/agg.rs | 8 ++++--- src/expr/src/expr/expr_is_null.rs | 2 +- src/expr/src/vector_op/agg/aggregator.rs | 14 ++++-------- .../vector_op/agg/approx_count_distinct.rs | 5 +++-- src/expr/src/vector_op/agg/array_agg.rs | 13 ++++------- src/expr/src/vector_op/agg/count_star.rs | 5 +++-- src/expr/src/vector_op/agg/functions.rs | 9 ++++---- src/expr/src/vector_op/agg/general_agg.rs | 20 +++++------------ .../src/vector_op/agg/general_distinct_agg.rs | 21 +++++------------- .../vector_op/agg/general_sorted_grouper.rs | 22 +++++-------------- src/expr/src/vector_op/agg/string_agg.rs | 18 +++++---------- 14 files changed, 69 insertions(+), 98 deletions(-) diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index b118a5837db99..830400dbacd56 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -74,11 +74,11 @@ impl HashAggExecutorBuilder { task_id: TaskId, identity: String, ) -> Result { - let agg_factories = hash_agg_node + let agg_factories: Vec<_> = hash_agg_node .get_agg_calls() .iter() .map(AggStateFactory::new) - .collect::>>()?; + .try_collect()?; let group_key_columns = hash_agg_node .get_group_key() diff --git a/src/batch/src/executor/sort_agg.rs b/src/batch/src/executor/sort_agg.rs index 70046d84efaa6..c41c9ef5026ca 100644 --- a/src/batch/src/executor/sort_agg.rs +++ b/src/batch/src/executor/sort_agg.rs @@ -128,12 +128,12 @@ impl SortAggExecutor { .map(|expr| expr.eval(&child_chunk)) .try_collect()?; - let groups = self + let groups: Vec<_> = self .sorted_groupers .iter() .zip_eq(&group_columns) .map(|(grouper, array)| grouper.detect_groups(array)) - .collect::>>()?; + .try_collect()?; let groups = EqGroups::intersect(&groups); @@ -219,6 +219,7 @@ impl SortAggExecutor { .iter_mut() .zip_eq(group_columns) .try_for_each(|(grouper, column)| grouper.update(column, start_row_idx, end_row_idx)) + .map_err(Into::into) } fn update_agg_states( @@ -230,6 +231,7 @@ impl SortAggExecutor { agg_states .iter_mut() .try_for_each(|state| state.update_multi(child_chunk, start_row_idx, end_row_idx)) + .map_err(Into::into) } fn output_sorted_groupers( @@ -240,6 +242,7 @@ impl SortAggExecutor { .iter_mut() .zip_eq(group_builders) .try_for_each(|(grouper, builder)| grouper.output(builder)) + .map_err(Into::into) } fn output_agg_states( @@ -250,6 +253,7 @@ impl SortAggExecutor { .iter_mut() .zip_eq(agg_builders) .try_for_each(|(state, builder)| state.output(builder)) + .map_err(Into::into) } fn create_builders( @@ -442,10 +446,10 @@ mod tests { }) .try_collect()?; - let sorted_groupers = group_exprs + let sorted_groupers: Vec<_> = group_exprs .iter() .map(|e| create_sorted_grouper(e.return_type())) - .collect::>>()?; + .try_collect()?; let agg_states = vec![count_star]; @@ -780,10 +784,10 @@ mod tests { }) .try_collect()?; - let sorted_groupers = group_exprs + let sorted_groupers: Vec<_> = group_exprs .iter() .map(|e| create_sorted_grouper(e.return_type())) - .collect::>>()?; + .try_collect()?; let agg_states = vec![sum_agg]; diff --git a/src/expr/src/error.rs b/src/expr/src/error.rs index 24fcd943b1107..96d1d850c6619 100644 --- a/src/expr/src/error.rs +++ b/src/expr/src/error.rs @@ -17,6 +17,7 @@ use regex; use risingwave_common::array::ArrayError; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::types::DataType; +use risingwave_pb::ProstFieldNotFound; use thiserror::Error; #[derive(Error, Debug)] @@ -64,3 +65,12 @@ impl From for ExprError { } } } + +impl From for ExprError { + fn from(err: ProstFieldNotFound) -> Self { + Self::Internal(anyhow::anyhow!( + "Failed to decode prost: field not found `{}`", + err.0 + )) + } +} diff --git a/src/expr/src/expr/agg.rs b/src/expr/src/expr/agg.rs index 0d5309a0083bd..9bebb6e0412ae 100644 --- a/src/expr/src/expr/agg.rs +++ b/src/expr/src/expr/agg.rs @@ -14,9 +14,11 @@ use std::convert::TryFrom; -use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::bail; use risingwave_pb::expr::agg_call::Type; +use crate::{ExprError, Result}; + /// Kind of aggregation function #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum AggKind { @@ -53,7 +55,7 @@ impl std::fmt::Display for AggKind { } impl TryFrom for AggKind { - type Error = RwError; + type Error = ExprError; fn try_from(prost: Type) -> Result { match prost { @@ -66,7 +68,7 @@ impl TryFrom for AggKind { Type::SingleValue => Ok(AggKind::SingleValue), Type::ApproxCountDistinct => Ok(AggKind::ApproxCountDistinct), Type::ArrayAgg => Ok(AggKind::ArrayAgg), - Type::Unspecified => Err(ErrorCode::InternalError("Unrecognized agg.".into()).into()), + Type::Unspecified => bail!("Unrecognized agg."), } } } diff --git a/src/expr/src/expr/expr_is_null.rs b/src/expr/src/expr/expr_is_null.rs index 6858c691023c8..368f8904daf9a 100644 --- a/src/expr/src/expr/expr_is_null.rs +++ b/src/expr/src/expr/expr_is_null.rs @@ -103,11 +103,11 @@ mod tests { use risingwave_common::array::column::Column; use risingwave_common::array::{ArrayBuilder, ArrayImpl, DataChunk, DecimalArrayBuilder, Row}; - use risingwave_common::error::Result; use risingwave_common::types::{DataType, Decimal}; use crate::expr::expr_is_null::{IsNotNullExpression, IsNullExpression}; use crate::expr::{BoxedExpression, InputRefExpression}; + use crate::Result; fn do_test( expr: BoxedExpression, diff --git a/src/expr/src/vector_op/agg/aggregator.rs b/src/expr/src/vector_op/agg/aggregator.rs index 7a84702c7c631..ec61d68f67970 100644 --- a/src/expr/src/vector_op/agg/aggregator.rs +++ b/src/expr/src/vector_op/agg/aggregator.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use dyn_clone::DynClone; use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; use risingwave_common::util::sort_util::{OrderPair, OrderType}; use risingwave_pb::expr::AggCall; @@ -32,6 +32,7 @@ use crate::vector_op::agg::count_star::CountStar; use crate::vector_op::agg::functions::*; use crate::vector_op::agg::general_agg::*; use crate::vector_op::agg::general_distinct_agg::*; +use crate::Result; /// An `Aggregator` supports `update` data and `output` result. pub trait Aggregator: Send + DynClone + 'static { @@ -141,11 +142,7 @@ impl AggStateFactory { filter, )? } - _ => { - return Err( - ErrorCode::InternalError(format!("Invalid agg call: {:?}", agg_kind)).into(), - ); - } + _ => bail!("Invalid agg call: {:?}", agg_kind), }; Ok(Self { @@ -201,12 +198,9 @@ pub fn create_agg_state_unary( }, )* (unimpl_input, unimpl_agg, unimpl_ret, distinct) => { - return Err( - ErrorCode::InternalError(format!( + bail!( "unsupported aggregator: type={:?} input={:?} output={:?} distinct={}", unimpl_agg, unimpl_input, unimpl_ret, distinct - )) - .into(), ) } } diff --git a/src/expr/src/vector_op/agg/approx_count_distinct.rs b/src/expr/src/vector_op/agg/approx_count_distinct.rs index c436927fba91c..c2ba75a2a04e5 100644 --- a/src/expr/src/vector_op/agg/approx_count_distinct.rs +++ b/src/expr/src/vector_op/agg/approx_count_distinct.rs @@ -16,11 +16,12 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; use crate::expr::ExpressionRef; use crate::vector_op::agg::aggregator::Aggregator; +use crate::Result; const INDEX_BITS: u8 = 14; // number of bits used for finding the index of each 64-bit hash const NUM_OF_REGISTERS: usize = 1 << INDEX_BITS; // number of indices available @@ -171,7 +172,7 @@ impl Aggregator for ApproxCountDistinct { self.registers = [0; NUM_OF_REGISTERS]; match builder { ArrayBuilderImpl::Int64(b) => b.append(Some(result)).map_err(Into::into), - _ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()), + _ => bail!("Unexpected builder for count(*)."), } } } diff --git a/src/expr/src/vector_op/agg/array_agg.rs b/src/expr/src/vector_op/agg/array_agg.rs index 76619ca8b6e14..65dbdfcad7586 100644 --- a/src/expr/src/vector_op/agg/array_agg.rs +++ b/src/expr/src/vector_op/agg/array_agg.rs @@ -13,12 +13,13 @@ // limitations under the License. use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, DataChunk, ListValue, RowRef}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::{DataType, Datum, Scalar}; use risingwave_common::util::ordered::OrderedRow; use risingwave_common::util::sort_util::{OrderPair, OrderType}; use crate::vector_op::agg::aggregator::Aggregator; +use crate::Result; #[derive(Clone)] struct ArrayAggUnordered { @@ -76,10 +77,7 @@ impl Aggregator for ArrayAggUnordered { .append(Some(self.get_result_and_reset().as_scalar_ref())) .map_err(Into::into) } else { - Err( - ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8))) - .into(), - ) + bail!("Builder fail to match {}.", stringify!(Utf8)) } } } @@ -156,10 +154,7 @@ impl Aggregator for ArrayAggOrdered { .append(Some(self.get_result_and_reset().as_scalar_ref())) .map_err(Into::into) } else { - Err( - ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8))) - .into(), - ) + bail!("Builder fail to match {}.", stringify!(Utf8)) } } } diff --git a/src/expr/src/vector_op/agg/count_star.rs b/src/expr/src/vector_op/agg/count_star.rs index d4e529397ba48..ce3543da7d963 100644 --- a/src/expr/src/vector_op/agg/count_star.rs +++ b/src/expr/src/vector_op/agg/count_star.rs @@ -13,11 +13,12 @@ // limitations under the License. use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; use crate::expr::ExpressionRef; use crate::vector_op::agg::aggregator::Aggregator; +use crate::Result; #[derive(Clone)] pub struct CountStar { @@ -103,7 +104,7 @@ impl Aggregator for CountStar { let res = std::mem::replace(&mut self.result, 0) as i64; match builder { ArrayBuilderImpl::Int64(b) => b.append(Some(res)).map_err(Into::into), - _ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()), + _ => bail!("Unexpected builder for count(*)."), } } } diff --git a/src/expr/src/vector_op/agg/functions.rs b/src/expr/src/vector_op/agg/functions.rs index 6d127f3f4cee5..9b589f8f9fca6 100644 --- a/src/expr/src/vector_op/agg/functions.rs +++ b/src/expr/src/vector_op/agg/functions.rs @@ -13,7 +13,9 @@ // limitations under the License. use risingwave_common::array::{Array, ListRef, StructRef}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; + +use crate::Result; /// Essentially `RTFn` is an alias of the specific Fn. It was aliased not to /// shorten the `where` clause of `GeneralAgg`, but to workaround an compiler @@ -172,10 +174,9 @@ where ) -> Result::RefItem<'a>>> { self.count += 1; if self.count > 1 { - Err(ErrorCode::InternalError( - "SingleValue aggregation can only accept exactly one value. But there is more than one.".to_string(), + bail!( + "SingleValue aggregation can only accept exactly one value. But there is more than one.", ) - .into()) } else { Ok(input) } diff --git a/src/expr/src/vector_op/agg/general_agg.rs b/src/expr/src/vector_op/agg/general_agg.rs index c582fe80e01a1..a12ba5f26d6b5 100644 --- a/src/expr/src/vector_op/agg/general_agg.rs +++ b/src/expr/src/vector_op/agg/general_agg.rs @@ -15,12 +15,13 @@ use std::marker::PhantomData; use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; use crate::expr::ExpressionRef; use crate::vector_op::agg::aggregator::Aggregator; use crate::vector_op::agg::functions::RTFn; +use crate::Result; #[derive(Clone)] pub struct GeneralAgg @@ -139,11 +140,7 @@ macro_rules! impl_aggregator { { self.update_single_concrete(i, input, row_id) } else { - Err(ErrorCode::InternalError(format!( - "Input fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Input fail to match {}.", stringify!($input_variant)) } } @@ -158,12 +155,11 @@ macro_rules! impl_aggregator { { self.update_multi_concrete(i, input, start_row_id, end_row_id) } else { - Err(ErrorCode::InternalError(format!( + bail!( "Input fail to match {} or builder fail to match {}.", stringify!($input_variant), stringify!($result_variant) - )) - .into()) + ) } } @@ -171,11 +167,7 @@ macro_rules! impl_aggregator { if let ArrayBuilderImpl::$result_variant(b) = builder { self.output_concrete(b) } else { - Err(ErrorCode::InternalError(format!( - "Builder fail to match {}.", - stringify!($result_variant) - )) - .into()) + bail!("Builder fail to match {}.", stringify!($result_variant)) } } } diff --git a/src/expr/src/vector_op/agg/general_distinct_agg.rs b/src/expr/src/vector_op/agg/general_distinct_agg.rs index cb3a631ad886a..67ed108d6dbdb 100644 --- a/src/expr/src/vector_op/agg/general_distinct_agg.rs +++ b/src/expr/src/vector_op/agg/general_distinct_agg.rs @@ -16,12 +16,13 @@ use std::collections::HashSet; use std::marker::PhantomData; use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; use crate::expr::ExpressionRef; use crate::vector_op::agg::aggregator::Aggregator; use crate::vector_op::agg::functions::RTFn; +use crate::Result; /// Where the actual aggregation happens. /// @@ -144,11 +145,7 @@ macro_rules! impl_aggregator { } Ok(()) } else { - Err(ErrorCode::InternalError(format!( - "Input fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Input fail to match {}.", stringify!($input_variant)) } } @@ -163,11 +160,7 @@ macro_rules! impl_aggregator { { self.update_multi_concrete(i, input, start_row_id, end_row_id) } else { - Err(ErrorCode::InternalError(format!( - "Input fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Input fail to match {}.", stringify!($input_variant)) } } @@ -175,11 +168,7 @@ macro_rules! impl_aggregator { if let ArrayBuilderImpl::$result_variant(b) = builder { self.output_concrete(b) } else { - Err(ErrorCode::InternalError(format!( - "Builder fail to match {}.", - stringify!($result_variant) - )) - .into()) + bail!("Builder fail to match {}.", stringify!($result_variant)) } } } diff --git a/src/expr/src/vector_op/agg/general_sorted_grouper.rs b/src/expr/src/vector_op/agg/general_sorted_grouper.rs index 660a9bb058731..85f1e7256411e 100644 --- a/src/expr/src/vector_op/agg/general_sorted_grouper.rs +++ b/src/expr/src/vector_op/agg/general_sorted_grouper.rs @@ -13,9 +13,11 @@ // limitations under the License. use risingwave_common::array::*; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::*; +use crate::Result; + /// `EqGroups` encodes the grouping information in the sort aggregate algorithm. /// /// - `EqGroups::intersect` combines `EqGroups` from each column into a single one. @@ -170,11 +172,7 @@ macro_rules! impl_sorted_grouper { if let ArrayImpl::$input_variant(i) = input { self.detect_groups_concrete(i) } else { - Err(ErrorCode::InternalError(format!( - "Input fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Input fail to match {}.", stringify!($input_variant)) } } @@ -187,11 +185,7 @@ macro_rules! impl_sorted_grouper { if let ArrayImpl::$input_variant(i) = input { self.update_concrete(i, start_idx, end_idx) } else { - Err(ErrorCode::InternalError(format!( - "Input fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Input fail to match {}.", stringify!($input_variant)) } } @@ -199,11 +193,7 @@ macro_rules! impl_sorted_grouper { if let ArrayBuilderImpl::$input_variant(b) = builder { self.output_concrete(b) } else { - Err(ErrorCode::InternalError(format!( - "Builder fail to match {}.", - stringify!($input_variant) - )) - .into()) + bail!("Builder fail to match {}.", stringify!($input_variant)) } } } diff --git a/src/expr/src/vector_op/agg/string_agg.rs b/src/expr/src/vector_op/agg/string_agg.rs index abf1177736e48..4d506e818251a 100644 --- a/src/expr/src/vector_op/agg/string_agg.rs +++ b/src/expr/src/vector_op/agg/string_agg.rs @@ -16,13 +16,14 @@ use std::collections::BinaryHeap; use std::sync::Arc; use risingwave_common::array::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, DataChunk, Row}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail; use risingwave_common::types::{DataType, Scalar, ScalarImpl}; use risingwave_common::util::encoding_for_comparison::{encode_row, is_type_encodable}; use risingwave_common::util::sort_util::{DescOrderedRow, OrderPair}; use crate::expr::ExpressionRef; use crate::vector_op::agg::aggregator::Aggregator; +use crate::Result; #[derive(Clone)] enum StringAggState { @@ -159,10 +160,7 @@ impl Aggregator for StringAgg { } Ok(()) } else { - Err( - ErrorCode::InternalError(format!("Input fail to match {}.", stringify!(Utf8))) - .into(), - ) + bail!("Input fail to match {}.", stringify!(Utf8)) } } @@ -184,10 +182,7 @@ impl Aggregator for StringAgg { } Ok(()) } else { - Err( - ErrorCode::InternalError(format!("Input fail to match {}.", stringify!(Utf8))) - .into(), - ) + bail!("Input fail to match {}.", stringify!(Utf8)) } } @@ -198,10 +193,7 @@ impl Aggregator for StringAgg { .append(res.as_ref().map(|x| x.as_scalar_ref())) .map_err(Into::into) } else { - Err( - ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8))) - .into(), - ) + bail!("Builder fail to match {}.", stringify!(Utf8)) } } }