Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(expr): cleanup all RwErrors #4873

Merged
merged 7 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ impl HashAggExecutorBuilder {
task_id: TaskId,
identity: String,
) -> Result<BoxedExecutor> {
let agg_factories = hash_agg_node
let agg_factories: Vec<_> = hash_agg_node
.get_agg_calls()
.iter()
.map(AggStateFactory::new)
.collect::<Result<Vec<AggStateFactory>>>()?;
.try_collect()?;

let group_key_columns = hash_agg_node
.get_group_key()
Expand Down
16 changes: 10 additions & 6 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ impl SortAggExecutor {
.map(|expr| expr.eval(&child_chunk))
.try_collect()?;

let groups = self
let groups: Vec<_> = self
.sorted_groupers
.iter()
.zip_eq(&group_columns)
.map(|(grouper, array)| grouper.detect_groups(array))
.collect::<Result<Vec<EqGroups>>>()?;
.try_collect()?;

let groups = EqGroups::intersect(&groups);

Expand Down Expand Up @@ -219,6 +219,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(group_columns)
.try_for_each(|(grouper, column)| grouper.update(column, start_row_idx, end_row_idx))
.map_err(Into::into)
}

fn update_agg_states(
Expand All @@ -230,6 +231,7 @@ impl SortAggExecutor {
agg_states
.iter_mut()
.try_for_each(|state| state.update_multi(child_chunk, start_row_idx, end_row_idx))
.map_err(Into::into)
}

fn output_sorted_groupers(
Expand All @@ -240,6 +242,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(group_builders)
.try_for_each(|(grouper, builder)| grouper.output(builder))
.map_err(Into::into)
}

fn output_agg_states(
Expand All @@ -250,6 +253,7 @@ impl SortAggExecutor {
.iter_mut()
.zip_eq(agg_builders)
.try_for_each(|(state, builder)| state.output(builder))
.map_err(Into::into)
}

fn create_builders(
Expand Down Expand Up @@ -442,10 +446,10 @@ mod tests {
})
.try_collect()?;

let sorted_groupers = group_exprs
let sorted_groupers: Vec<_> = group_exprs
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.collect::<Result<Vec<BoxedSortedGrouper>>>()?;
.try_collect()?;

let agg_states = vec![count_star];

Expand Down Expand Up @@ -780,10 +784,10 @@ mod tests {
})
.try_collect()?;

let sorted_groupers = group_exprs
let sorted_groupers: Vec<_> = group_exprs
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.collect::<Result<Vec<BoxedSortedGrouper>>>()?;
.try_collect()?;

let agg_states = vec![sum_agg];

Expand Down
10 changes: 10 additions & 0 deletions src/expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use regex;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::ProstFieldNotFound;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -64,3 +65,12 @@ impl From<regex::Error> for ExprError {
}
}
}

impl From<ProstFieldNotFound> for ExprError {
fn from(err: ProstFieldNotFound) -> Self {
Self::Internal(anyhow::anyhow!(
"Failed to decode prost: field not found `{}`",
err.0
))
}
}
8 changes: 5 additions & 3 deletions src/expr/src/expr/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::convert::TryFrom;

use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::bail;
use risingwave_pb::expr::agg_call::Type;

use crate::{ExprError, Result};

/// Kind of aggregation function
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum AggKind {
Expand Down Expand Up @@ -53,7 +55,7 @@ impl std::fmt::Display for AggKind {
}

impl TryFrom<Type> for AggKind {
type Error = RwError;
type Error = ExprError;

fn try_from(prost: Type) -> Result<Self> {
match prost {
Expand All @@ -66,7 +68,7 @@ impl TryFrom<Type> for AggKind {
Type::SingleValue => Ok(AggKind::SingleValue),
Type::ApproxCountDistinct => Ok(AggKind::ApproxCountDistinct),
Type::ArrayAgg => Ok(AggKind::ArrayAgg),
Type::Unspecified => Err(ErrorCode::InternalError("Unrecognized agg.".into()).into()),
Type::Unspecified => bail!("Unrecognized agg."),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/expr/expr_is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ mod tests {

use risingwave_common::array::column::Column;
use risingwave_common::array::{ArrayBuilder, ArrayImpl, DataChunk, DecimalArrayBuilder, Row};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, Decimal};

use crate::expr::expr_is_null::{IsNotNullExpression, IsNullExpression};
use crate::expr::{BoxedExpression, InputRefExpression};
use crate::Result;

fn do_test(
expr: BoxedExpression,
Expand Down
14 changes: 4 additions & 10 deletions src/expr/src/vector_op/agg/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use dyn_clone::DynClone;
use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_pb::expr::AggCall;
Expand All @@ -32,6 +32,7 @@ use crate::vector_op::agg::count_star::CountStar;
use crate::vector_op::agg::functions::*;
use crate::vector_op::agg::general_agg::*;
use crate::vector_op::agg::general_distinct_agg::*;
use crate::Result;

/// An `Aggregator` supports `update` data and `output` result.
pub trait Aggregator: Send + DynClone + 'static {
Expand Down Expand Up @@ -141,11 +142,7 @@ impl AggStateFactory {
filter,
)?
}
_ => {
return Err(
ErrorCode::InternalError(format!("Invalid agg call: {:?}", agg_kind)).into(),
);
}
_ => bail!("Invalid agg call: {:?}", agg_kind),
};

Ok(Self {
Expand Down Expand Up @@ -201,12 +198,9 @@ pub fn create_agg_state_unary(
},
)*
(unimpl_input, unimpl_agg, unimpl_ret, distinct) => {
return Err(
ErrorCode::InternalError(format!(
bail!(
"unsupported aggregator: type={:?} input={:?} output={:?} distinct={}",
unimpl_agg, unimpl_input, unimpl_ret, distinct
))
.into(),
)
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/expr/src/vector_op/agg/approx_count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

const INDEX_BITS: u8 = 14; // number of bits used for finding the index of each 64-bit hash
const NUM_OF_REGISTERS: usize = 1 << INDEX_BITS; // number of indices available
Expand Down Expand Up @@ -171,7 +172,7 @@ impl Aggregator for ApproxCountDistinct {
self.registers = [0; NUM_OF_REGISTERS];
match builder {
ArrayBuilderImpl::Int64(b) => b.append(Some(result)).map_err(Into::into),
_ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()),
_ => bail!("Unexpected builder for count(*)."),
}
}
}
Expand Down
13 changes: 4 additions & 9 deletions src/expr/src/vector_op/agg/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, DataChunk, ListValue, RowRef};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::{DataType, Datum, Scalar};
use risingwave_common::util::ordered::OrderedRow;
use risingwave_common::util::sort_util::{OrderPair, OrderType};

use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

#[derive(Clone)]
struct ArrayAggUnordered {
Expand Down Expand Up @@ -76,10 +77,7 @@ impl Aggregator for ArrayAggUnordered {
.append(Some(self.get_result_and_reset().as_scalar_ref()))
.map_err(Into::into)
} else {
Err(
ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8)))
.into(),
)
bail!("Builder fail to match {}.", stringify!(Utf8))
}
}
}
Expand Down Expand Up @@ -156,10 +154,7 @@ impl Aggregator for ArrayAggOrdered {
.append(Some(self.get_result_and_reset().as_scalar_ref()))
.map_err(Into::into)
} else {
Err(
ErrorCode::InternalError(format!("Builder fail to match {}.", stringify!(Utf8)))
.into(),
)
bail!("Builder fail to match {}.", stringify!(Utf8))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/expr/src/vector_op/agg/count_star.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::Result;

#[derive(Clone)]
pub struct CountStar {
Expand Down Expand Up @@ -103,7 +104,7 @@ impl Aggregator for CountStar {
let res = std::mem::replace(&mut self.result, 0) as i64;
match builder {
ArrayBuilderImpl::Int64(b) => b.append(Some(res)).map_err(Into::into),
_ => Err(ErrorCode::InternalError("Unexpected builder for count(*).".into()).into()),
_ => bail!("Unexpected builder for count(*)."),
}
}
}
9 changes: 5 additions & 4 deletions src/expr/src/vector_op/agg/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use risingwave_common::array::{Array, ListRef, StructRef};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;

use crate::Result;

/// Essentially `RTFn` is an alias of the specific Fn. It was aliased not to
/// shorten the `where` clause of `GeneralAgg`, but to workaround an compiler
Expand Down Expand Up @@ -172,10 +174,9 @@ where
) -> Result<Option<<T as Array>::RefItem<'a>>> {
self.count += 1;
if self.count > 1 {
Err(ErrorCode::InternalError(
"SingleValue aggregation can only accept exactly one value. But there is more than one.".to_string(),
bail!(
"SingleValue aggregation can only accept exactly one value. But there is more than one.",
)
.into())
} else {
Ok(input)
}
Expand Down
20 changes: 6 additions & 14 deletions src/expr/src/vector_op/agg/general_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::marker::PhantomData;

use risingwave_common::array::*;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::bail;
use risingwave_common::types::*;

use crate::expr::ExpressionRef;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::vector_op::agg::functions::RTFn;
use crate::Result;

#[derive(Clone)]
pub struct GeneralAgg<T, F, R>
Expand Down Expand Up @@ -139,11 +140,7 @@ macro_rules! impl_aggregator {
{
self.update_single_concrete(i, input, row_id)
} else {
Err(ErrorCode::InternalError(format!(
"Input fail to match {}.",
stringify!($input_variant)
))
.into())
bail!("Input fail to match {}.", stringify!($input_variant))
}
}

Expand All @@ -158,24 +155,19 @@ macro_rules! impl_aggregator {
{
self.update_multi_concrete(i, input, start_row_id, end_row_id)
} else {
Err(ErrorCode::InternalError(format!(
bail!(
"Input fail to match {} or builder fail to match {}.",
stringify!($input_variant),
stringify!($result_variant)
))
.into())
)
}
}

fn output(&mut self, builder: &mut ArrayBuilderImpl) -> Result<()> {
if let ArrayBuilderImpl::$result_variant(b) = builder {
self.output_concrete(b)
} else {
Err(ErrorCode::InternalError(format!(
"Builder fail to match {}.",
stringify!($result_variant)
))
.into())
bail!("Builder fail to match {}.", stringify!($result_variant))
}
}
}
Expand Down
Loading