Skip to content

Commit

Permalink
chore(agg): comment out panic (#14232) (#14247)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Richard Chien <[email protected]>
Co-authored-by: stonepage <[email protected]>
  • Loading branch information
3 people authored Dec 28, 2023
1 parent 76bea0f commit 0c64dea
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,25 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
self.states[self.row_count_index],
AggState::Value(ref state) => state
);
let row_count = *row_count_state
let mut row_count = *row_count_state
.as_datum()
.as_ref()
.expect("row count state should not be NULL")
.as_int64();
if row_count < 0 {
tracing::error!(group = ?self.group_key_row(), "bad row count");
panic!("row count should be non-negative")
if cfg!(debug_assertions) {
// TODO: need strict mode sys param / session var
panic!("row count should be non-negative");
}

// NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no
// corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will
// report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput`
// will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from
// incorrect results of existing groups, but at least can prevent from downstream panicking due to non-existing
// keys. See https://github.com/risingwavelabs/risingwave/issues/14031 for more information.
row_count = 0;
}
row_count.try_into().unwrap()
}
Expand Down

0 comments on commit 0c64dea

Please sign in to comment.