From cf690387bd4694d522bb7933384741e6e4f8d020 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 16:02:28 +0800 Subject: [PATCH 1/5] comment out `bad row count` panic Signed-off-by: Richard Chien --- src/stream/src/executor/aggregation/agg_group.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index d0e97cd4783e..77ca2f805347 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -310,14 +310,22 @@ impl AggGroup { self.states[self.row_count_index], AggState::Value(ref state) => state ); - let row_count = *row_count_state + let mut row_count = *row_count_state .as_datum() .as_ref() .expect("row count state should not be NULL") .as_int64(); if row_count < 0 { tracing::error!(group = ?self.group_key_row(), "bad row count"); - panic!("row count should be non-negative") + // panic!("row count should be non-negative") // TODO: need strict mode sys param / session var + + // NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no + // corresponding group exists before (or has been deleted). In this case, `prev_row_count()` will + // report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput` + // will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from + // incorrect results of existing groups, but at lease can prevent from downstream panicking due to non-existing + // keys. + row_count = 0; } row_count.try_into().unwrap() } From 69c9244bf8bbf364439aab2e21f60722c85b5827 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 16:07:17 +0800 Subject: [PATCH 2/5] typo Signed-off-by: Richard Chien --- src/stream/src/executor/aggregation/agg_group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 77ca2f805347..4a55e351debe 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -320,7 +320,7 @@ impl AggGroup { // panic!("row count should be non-negative") // TODO: need strict mode sys param / session var // NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no - // corresponding group exists before (or has been deleted). In this case, `prev_row_count()` will + // corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will // report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput` // will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from // incorrect results of existing groups, but at lease can prevent from downstream panicking due to non-existing From d80c28596a5314545259efd78f8237d999579864 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 16:09:41 +0800 Subject: [PATCH 3/5] typo Signed-off-by: Richard Chien --- src/stream/src/executor/aggregation/agg_group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 4a55e351debe..b94cc9da6c89 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -323,7 +323,7 @@ impl AggGroup { // corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will // report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput` // will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from - // incorrect results of existing groups, but at lease can prevent from downstream panicking due to non-existing + // incorrect results of existing groups, but at least can prevent from downstream panicking due to non-existing // keys. row_count = 0; } From 69021df5ef6b43fac883508944310dd99234ab06 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 16:18:22 +0800 Subject: [PATCH 4/5] Update src/stream/src/executor/aggregation/agg_group.rs Co-authored-by: stonepage <40830455+st1page@users.noreply.github.com> --- src/stream/src/executor/aggregation/agg_group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index b94cc9da6c89..e06c943c7295 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -324,7 +324,7 @@ impl AggGroup { // report `0`. To ignore the inconsistent, we set `curr_row_count` to `0` here, so that `OnlyOutputIfHasInput` // will return no change, so that the inconsistent will be hidden from downstream. This won't prevent from // incorrect results of existing groups, but at least can prevent from downstream panicking due to non-existing - // keys. + // keys. See https://github.com/risingwavelabs/risingwave/issues/14031 for more information. row_count = 0; } row_count.try_into().unwrap() From aa00786a555aa8d33b1f1aa7f46d140cda1c0ec8 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 16:19:50 +0800 Subject: [PATCH 5/5] panic in debug mode Signed-off-by: Richard Chien --- src/stream/src/executor/aggregation/agg_group.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index e06c943c7295..b876347e6f17 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -317,7 +317,10 @@ impl AggGroup { .as_int64(); if row_count < 0 { tracing::error!(group = ?self.group_key_row(), "bad row count"); - // panic!("row count should be non-negative") // TODO: need strict mode sys param / session var + if cfg!(debug_assertions) { + // TODO: need strict mode sys param / session var + panic!("row count should be non-negative"); + } // NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no // corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will