Skip to content

Commit

Permalink
rename other agg kind to agg_type
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Sep 23, 2024
1 parent f9485f5 commit 9988c60
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::Result;
// advanced features like order by, filter, distinct, etc. should be handled by the upper layer.
#[derive(Debug, Clone)]
pub struct AggCall {
/// Aggregation kind for constructing agg state.
pub kind: AggType,
/// Aggregation type for constructing agg state.
pub agg_type: AggType,

/// Arguments of aggregation function input.
pub args: AggArgs,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl AggCall {
})
.collect_vec();
Ok(AggCall {
kind: agg_type,
agg_type,
args,
return_type: DataType::from(agg_call.get_return_type()?),
column_orders,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<Iter: Iterator<Item = Token>> Parser<Iter> {
self.tokens.next(); // Consume the RParen

AggCall {
kind: AggType::from_protobuf(func, None, None).unwrap(),
agg_type: AggType::from_protobuf(func, None, None).unwrap(),
args: AggArgs {
data_types: children.iter().map(|(_, ty)| ty.clone()).collect(),
val_indices: children.iter().map(|(idx, _)| *idx).collect(),
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub fn build_retractable(agg: &AggCall) -> Result<BoxedAggregateFunction> {
/// `AggCall`. Such operations should be done in batch or streaming executors.
pub fn build(agg: &AggCall, prefer_append_only: bool) -> Result<BoxedAggregateFunction> {
// handle special kinds
let kind = match &agg.kind {
let kind = match &agg.agg_type {
AggType::UserDefined(udf) => {
return user_defined::new_user_defined(&agg.return_type, udf);
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/sig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl FuncName {

pub fn as_aggregate(&self) -> PbAggKind {
match self {
Self::Aggregate(ty) => *ty,
Self::Aggregate(kind) => *kind,
_ => panic!("Expected an aggregate function"),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/benches/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ fn bench_expr(c: &mut Criterion) {
continue;
}
let agg = match build_append_only(&AggCall {
kind: sig.name.as_aggregate().into(),
agg_type: sig.name.as_aggregate().into(),
args: sig
.inputs_type
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/src/window_function/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(super) fn new(call: &WindowFuncCall) -> Result<BoxedWindowState> {
let agg_type = must_match!(&call.kind, WindowFuncKind::Aggregate(agg_type) => agg_type);
let arg_data_types = call.args.arg_types().to_vec();
let agg_call = AggCall {
kind: agg_type.clone(),
agg_type: agg_type.clone(),
args: call.args.clone(),
return_type: call.return_type.clone(),
column_orders: Vec::new(), // the input is already sorted
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl MaterializedInputState {
.collect_vec();
let cache_key_serializer = OrderedRowSerde::new(cache_key_data_types, order_types);

let cache: Box<dyn AggStateCache + Send + Sync> = match agg_call.kind {
let cache: Box<dyn AggStateCache + Send + Sync> = match agg_call.agg_type {
AggType::Builtin(
PbAggKind::Min | PbAggKind::Max | PbAggKind::FirstValue | PbAggKind::LastValue,
) => Box::new(GenericAggStateCache::new(
Expand All @@ -142,12 +142,12 @@ impl MaterializedInputState {
agg_call.args.arg_types(),
)),
_ => panic!(
"Agg kind `{}` is not expected to have materialized input state",
agg_call.kind
"Agg type `{}` is not expected to have materialized input state",
agg_call.agg_type
),
};
let output_first_value = matches!(
agg_call.kind,
agg_call.agg_type,
AggType::Builtin(
PbAggKind::Min | PbAggKind::Max | PbAggKind::FirstValue | PbAggKind::LastValue
)
Expand Down Expand Up @@ -245,12 +245,12 @@ fn generate_order_columns_before_version_issue_13465(
arg_col_indices: &[usize],
) -> (Vec<usize>, Vec<OrderType>) {
let (mut order_col_indices, mut order_types) = if matches!(
agg_call.kind,
agg_call.agg_type,
AggType::Builtin(PbAggKind::Min | PbAggKind::Max)
) {
// `min`/`max` need not to order by any other columns, but have to
// order by the agg value implicitly.
let order_type = if matches!(agg_call.kind, AggType::Builtin(PbAggKind::Min)) {
let order_type = if matches!(agg_call.agg_type, AggType::Builtin(PbAggKind::Min)) {
OrderType::ascending()
} else {
OrderType::descending()
Expand All @@ -263,7 +263,7 @@ fn generate_order_columns_before_version_issue_13465(
.map(|p| {
(
p.column_index,
if matches!(agg_call.kind, AggType::Builtin(PbAggKind::LastValue)) {
if matches!(agg_call.agg_type, AggType::Builtin(PbAggKind::LastValue)) {
p.order_type.reverse()
} else {
p.order_type
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn agg_call_filter_res(
) -> StreamExecutorResult<Bitmap> {
let mut vis = chunk.visibility().clone();
if matches!(
agg_call.kind,
agg_call.agg_type,
AggType::Builtin(PbAggKind::Min | PbAggKind::Max | PbAggKind::StringAgg)
) {
// should skip NULL value for these kinds of agg function
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub mod agg_executor {
input_fields: Vec<Field>,
is_append_only: bool,
) -> AggStateStorage<S> {
match agg_call.kind {
match agg_call.agg_type {
AggType::Builtin(PbAggKind::Min | PbAggKind::Max) if !is_append_only => {
let mut column_descs = Vec::new();
let mut order_types = Vec::new();
Expand All @@ -353,7 +353,7 @@ pub mod agg_executor {
add_column(*idx, input_fields[*idx].data_type(), None);
}

add_column(agg_call.args.val_indices()[0], agg_call.args.arg_types()[0].clone(), if matches!(agg_call.kind, AggType::Builtin(PbAggKind::Max)) {
add_column(agg_call.args.val_indices()[0], agg_call.args.arg_types()[0].clone(), if matches!(agg_call.agg_type, AggType::Builtin(PbAggKind::Max)) {
Some(OrderType::descending())
} else {
Some(OrderType::ascending())
Expand Down

0 comments on commit 9988c60

Please sign in to comment.