diff --git a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs index 5349945b03a00..4d7a64a3d0038 100644 --- a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs +++ b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs @@ -39,15 +39,22 @@ impl Rule for DistinctAggRule { let (mut agg_calls, mut agg_group_keys, input) = agg.clone().decompose(); let original_group_keys_len = agg_group_keys.len(); - if self.for_stream { + let (node, flag_values, has_expand) = + Self::build_expand(input, &mut agg_group_keys, &mut agg_calls)?; + + if self.for_stream && !agg_group_keys.is_empty() && has_expand { // Due to performance issue, we don't do 2-phase agg for stream distinct agg with group // by. See https://github.com/risingwavelabs/risingwave/issues/7271 for more. - // TODO(rc): may be we can still apply the rule if there is only one distinct column + // So basicall we have three cases here: + // 1. group by + multiple distinct columns => skip this rule, use backend impl + // 2. no group by + multiple distinct columns => apply this rule + // 3. single distinct column (can be optimized w/o `Expand`) => apply this rule + // TODO(rc): In the last two cases, backend distinct agg implementation can bring + // performance increase, but we don't do it for now cuz it may harm system scalability. + // Need more evaluations later. return None; } - let (node, flag_values, has_expand) = - Self::build_expand(input, &mut agg_group_keys, &mut agg_calls)?; let mid_agg = Self::build_middle_agg(node, agg_group_keys, agg_calls.clone(), has_expand); Some(Self::build_final_agg( mid_agg, @@ -120,7 +127,7 @@ impl DistinctAggRule { let n_different_distinct = distinct_aggs .iter() - .unique_by(|agg_call| agg_call.input_indices()) + .unique_by(|agg_call| agg_call.input_indices()[0]) .count(); assert_ne!(n_different_distinct, 0); // since `distinct_aggs` is not empty here if n_different_distinct == 1 {