Skip to content

Commit

Permalink
chore(expr): cleanup all RwErrors (#4873)
Browse files Browse the repository at this point in the history
* chore(expr): cleanup all `RwError`s

Signed-off-by: Bugen Zhao <[email protected]>

* fix

Signed-off-by: Bugen Zhao <[email protected]>

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 25, 2022
1 parent 3f97643 commit 61021b8
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 98 deletions.
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ impl HashAggExecutorBuilder {
task_id: TaskId,
identity: String,
) -> Result<BoxedExecutor> {
let agg_factories = hash_agg_node
let agg_factories: Vec<_> = hash_agg_node
.get_agg_calls()
.iter()
.map(AggStateFactory::new)
.collect::<Result<Vec<AggStateFactory>>>()?;
.try_collect()?;

let group_key_columns = hash_agg_node
.get_group_key()
Expand Down
16 changes: 10 additions & 6 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ impl SortAggExecutor {
.map(|expr| expr.eval(&child_chunk))
.try_collect()?;

let groups = self
let groups: Vec<_> = self
.sorted_groupers
.iter()
.zip_eq(&group_columns)
.map(|(grouper, array)| grouper.detect_groups(array))
.collect::<Result<Vec<EqGroups>>>()?;
.try_collect()?;

let groups = EqGroups::intersect(&groups);

Expand Down Expand Up @@ -219,6 +219,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(group_columns)
.try_for_each(|(grouper, column)| grouper.update(column, start_row_idx, end_row_idx))
.map_err(Into::into)
}

fn update_agg_states(
Expand All @@ -230,6 +231,7 @@ impl SortAggExecutor {
agg_states
.iter_mut()
.try_for_each(|state| state.update_multi(child_chunk, start_row_idx, end_row_idx))
.map_err(Into::into)
}

fn output_sorted_groupers(
Expand All @@ -240,6 +242,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(group_builders)
.try_for_each(|(grouper, builder)| grouper.output(builder))
.map_err(Into::into)
}

fn output_agg_states(
Expand All @@ -250,6 +253,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(agg_builders)
.try_for_each(|(state, builder)| state.output(builder))
.map_err(Into::into)
}

fn create_builders(
Expand Down Expand Up @@ -442,10 +446,10 @@ mod tests {
})
.try_collect()?;

let sorted_groupers = group_exprs
let sorted_groupers: Vec<_> = group_exprs
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.collect::<Result<Vec<BoxedSortedGrouper>>>()?;
.try_collect()?;

let agg_states = vec![count_star];

Expand Down Expand Up @@ -780,10 +784,10 @@ mod tests {
})
.try_collect()?;

let sorted_groupers = group_exprs
let sorted_groupers: Vec<_> = group_exprs
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.collect::<Result<Vec<BoxedSortedGrouper>>>()?;
.try_collect()?;

let agg_states = vec![sum_agg];

Expand Down
10 changes: 10 additions & 0 deletions src/expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use regex;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::ProstFieldNotFound;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -64,3 +65,12 @@ impl From<regex::Error> for ExprError {
}
}
}

impl From<ProstFieldNotFound> for ExprError {
fn from(err: ProstFieldNotFound) -> Self {
Self::Internal(anyhow::anyhow!(
"Failed to decode prost: field not found `{}`",
err.0
))
}
}
8 changes: 5 additions & 3 deletions src/expr/src/expr/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::convert::TryFrom;

use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::bail;
use risingwave_pb::expr::agg_call::Type;

use crate::{ExprError, Result};

/// Kind of aggregation function
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum AggKind {
Expand Down Expand Up @@ -53,7 +55,7 @@ impl std::fmt::Display for AggKind {
}

impl TryFrom<Type> for AggKind {
type Error = RwError;
type Error = ExprError;

fn try_from(prost: Type) -> Result<Self> {
match prost {
Expand All @@ -66,7 +68,7 @@ impl TryFrom<Type> for AggKind {
Type::SingleValue => Ok(AggKind::SingleValue),
Type::ApproxCountDistinct => Ok(AggKind::ApproxCountDistinct),
Type::ArrayAgg => Ok(AggKind::ArrayAgg),
Type::Unspecified => Err(ErrorCode::InternalError("Unrecognized agg.".into()).into()),
Type::Unspecified => bail!("Unrecognized agg."),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ mod tests {

use risingwave_common::array::column::Column;
use risingwave_common::array::{ArrayBuilder, ArrayImpl, DataChunk, DecimalArrayBuilder, Row};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, Decimal};

use crate::expr::expr_is_null::{IsNotNullExpression, IsNullExpression};
use crate::expr::{BoxedExpression, InputRefExpression};
use crate::Result;

fn do_test(
expr: BoxedExpression,
Expand Down
14 changes: 4 additions & 10 deletions src/expr/src/vector_op/agg/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use dyn_clone::DynClone;
use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_pb::expr::AggCall;
Expand All @@ -32,6 +32,7 @@ use crate::vector_op::agg::count_star::CountStar;
use crate::vector_op::agg::functions::*;
use crate::vector_op::agg::general_agg::*;
use crate::vector_op::agg::general_distinct_agg::*;
use crate::Result;

/// An `Aggregator` supports `update` data and `output` result.
pub trait Aggregator: Send + DynClone + 'static {
Expand Down Expand Up @@ -141,11 +142,7 @@ impl AggStateFactory {
filter,
)?
}
_ => {
return Err(
ErrorCode::InternalError(format!("Invalid agg call: {:?}", agg_kind)).into(),
);
}
_ => bail!("Invalid agg call: {:?}", agg_kind),
};

Ok(Self {
Expand Down Expand Up @@ -201,12 +198,9 @@ pub fn create_agg_state_unary(
},
)*
(unimpl_input, unimpl_agg, unimpl_ret, distinct) => {
return Err(
ErrorCode::InternalError(format!(
bail!(
"unsupported aggregator: type={:?} input={:?} output={:?} distinct={}",
unimpl_agg, unimpl_input, unimpl_ret, distinct
))
.into(),
)
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/expr/src/vector_op/agg/approx_count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

const INDEX_BITS: u8 = 14; // number of bits used for finding the index of each 64-bit hash
const NUM_OF_REGISTERS: usize = 1 << INDEX_BITS; // number of indices available
Expand Down Expand Up @@ -171,7 +172,7 @@ impl Aggregator for ApproxCountDistinct {
self.registers = [0; NUM_OF_REGISTERS];
match builder {
ArrayBuilderImpl::Int64(b) => b.append(Some(result)).map_err(Into::into),
_ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()),
_ => bail!("Unexpected builder for count(*)."),
}
}
}
Expand Down
13 changes: 4 additions & 9 deletions src/expr/src/vector_op/agg/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, DataChunk, ListValue, RowRef};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::{DataType, Datum, Scalar};
use risingwave_common::util::ordered::OrderedRow;
use risingwave_common::util::sort_util::{OrderPair, OrderType};

use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

#[derive(Clone)]
struct ArrayAggUnordered {
Expand Down Expand Up @@ -76,10 +77,7 @@ impl Aggregator for ArrayAggUnordered {
.append(Some(self.get_result_and_reset().as_scalar_ref()))
.map_err(Into::into)
} else {
Err(
ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8)))
.into(),
)
bail!("Builder fail to match {}.", stringify!(Utf8))
}
}
}
Expand Down Expand Up @@ -156,10 +154,7 @@ impl Aggregator for ArrayAggOrdered {
.append(Some(self.get_result_and_reset().as_scalar_ref()))
.map_err(Into::into)
} else {
Err(
ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8)))
.into(),
)
bail!("Builder fail to match {}.", stringify!(Utf8))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/expr/src/vector_op/agg/count_star.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

#[derive(Clone)]
pub struct CountStar {
Expand Down Expand Up @@ -103,7 +104,7 @@ impl Aggregator for CountStar {
let res = std::mem::replace(&mut self.result, 0) as i64;
match builder {
ArrayBuilderImpl::Int64(b) => b.append(Some(res)).map_err(Into::into),
_ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()),
_ => bail!("Unexpected builder for count(*)."),
}
}
}
9 changes: 5 additions & 4 deletions src/expr/src/vector_op/agg/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use risingwave_common::array::{Array, ListRef, StructRef};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;

use crate::Result;

/// Essentially `RTFn` is an alias of the specific Fn. It was aliased not to
/// shorten the `where` clause of `GeneralAgg`, but to workaround an compiler
Expand Down Expand Up @@ -172,10 +174,9 @@ where
) -> Result<Option<<T as Array>::RefItem<'a>>> {
self.count += 1;
if self.count > 1 {
Err(ErrorCode::InternalError(
"SingleValue aggregation can only accept exactly one value. But there is more than one.".to_string(),
bail!(
"SingleValue aggregation can only accept exactly one value. But there is more than one.",
)
.into())
} else {
Ok(input)
}
Expand Down
20 changes: 6 additions & 14 deletions src/expr/src/vector_op/agg/general_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::marker::PhantomData;

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::vector_op::agg::functions::RTFn;
use crate::Result;

#[derive(Clone)]
pub struct GeneralAgg<T, F, R>
Expand Down Expand Up @@ -139,11 +140,7 @@ macro_rules! impl_aggregator {
{
self.update_single_concrete(i, input, row_id)
} else {
Err(ErrorCode::InternalError(format!(
"Input fail to match {}.",
stringify!($input_variant)
))
.into())
bail!("Input fail to match {}.", stringify!($input_variant))
}
}

Expand All @@ -158,24 +155,19 @@ macro_rules! impl_aggregator {
{
self.update_multi_concrete(i, input, start_row_id, end_row_id)
} else {
Err(ErrorCode::InternalError(format!(
bail!(
"Input fail to match {} or builder fail to match {}.",
stringify!($input_variant),
stringify!($result_variant)
))
.into())
)
}
}

fn output(&mut self, builder: &mut ArrayBuilderImpl) -> Result<()> {
if let ArrayBuilderImpl::$result_variant(b) = builder {
self.output_concrete(b)
} else {
Err(ErrorCode::InternalError(format!(
"Builder fail to match {}.",
stringify!($result_variant)
))
.into())
bail!("Builder fail to match {}.", stringify!($result_variant))
}
}
}
Expand Down
Loading

0 comments on commit 61021b8

Please sign in to comment.