diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index d0e97cd4783e9..b876347e6f179 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -310,14 +310,25 @@ impl AggGroup { self.states[self.row_count_index], AggState::Value(ref state) => state ); - let row_count = *row_count_state + let mut row_count = *row_count_state .as_datum() .as_ref() .expect("row count state should not be NULL") .as_int64(); if row_count < 0 { tracing::error!(group = ?self.group_key_row(), "bad row count"); - panic!("row count should be non-negative") + if cfg!(debug_assertions) { + // TODO: need strict mode sys param / session var + panic!("row count should be non-negative"); + } + + // NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no + // corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will + // report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput` + // will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from + // incorrect results of existing groups, but at least can prevent from downstream panicking due to non-existing + // keys. See https://github.com/risingwavelabs/risingwave/issues/14031 for more information. + row_count = 0; } row_count.try_into().unwrap() }