From 2f6d63579d25945b9b0c98fa0ff218043171c11a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 3 Apr 2024 00:24:11 +0800 Subject: [PATCH] feat(stream): use `LogSuppresser` for consistency error log (#16082) Signed-off-by: Richard Chien --- .../src/executor/aggregation/agg_group.rs | 4 +-- src/stream/src/executor/join/hash_join.rs | 9 ++---- .../managed_state/join/join_entry_state.rs | 9 ++---- src/stream/src/lib.rs | 29 +++++++++++++++++-- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index f5d4a50ce29c0..b374b3d3165ff 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -30,7 +30,7 @@ use risingwave_storage::StateStore; use super::agg_state::{AggState, AggStateStorage}; use crate::common::table::state_table::StateTable; -use crate::consistency::inconsistency_panic; +use crate::consistency::consistency_panic; use crate::executor::error::StreamExecutorResult; use crate::executor::PkIndices; @@ -317,7 +317,7 @@ impl AggGroup { .expect("row count state should not be NULL") .as_int64(); if row_count < 0 { - inconsistency_panic!(group = ?self.group_key_row(), row_count, "row count should be non-negative"); + consistency_panic!(group = ?self.group_key_row(), row_count, "row count should be non-negative"); // NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no // corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index f7088181e3c3b..88b3ede24b619 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -37,7 +37,7 @@ use super::row::{DegreeType, EncodedJoinRow}; use crate::cache::{new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; -use crate::consistency::enable_strict_consistency; +use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::error::StreamExecutorResult; use crate::executor::join::row::JoinRow; use crate::executor::monitor::StreamingMetrics; @@ -685,7 +685,7 @@ impl JoinEntryState { assert!(ret.is_ok(), "we have removed existing entry, if any"); if removed { // if not silent, we should log the error - tracing::error!(?key, "double inserting a join state entry"); + consistency_error!(?key, "double inserting a join state entry"); } } @@ -700,10 +700,7 @@ impl JoinEntryState { } else if enable_strict_consistency() { Err(JoinEntryError::RemoveError) } else { - tracing::error!( - ?pk, - "removing a join state entry but it is not in the cache" - ); + consistency_error!(?pk, "removing a join state entry but it's not in the cache"); Ok(()) } } diff --git a/src/stream/src/executor/managed_state/join/join_entry_state.rs b/src/stream/src/executor/managed_state/join/join_entry_state.rs index da4db92f3eb84..a4dab83240147 100644 --- a/src/stream/src/executor/managed_state/join/join_entry_state.rs +++ b/src/stream/src/executor/managed_state/join/join_entry_state.rs @@ -16,7 +16,7 @@ use risingwave_common_estimate_size::KvSize; use thiserror::Error; use super::*; -use crate::consistency::enable_strict_consistency; +use crate::consistency::{consistency_error, enable_strict_consistency}; /// We manages a `HashMap` in memory for all entries belonging to a join key. /// When evicted, `cached` does not hold any entries. @@ -70,7 +70,7 @@ impl JoinEntryState { assert!(ret.is_ok(), "we have removed existing entry, if any"); if removed { // if not silent, we should log the error - tracing::error!(?key, "double inserting a join state entry"); + consistency_error!(?key, "double inserting a join state entry"); } } @@ -85,10 +85,7 @@ impl JoinEntryState { } else if enable_strict_consistency() { Err(JoinEntryError::RemoveError) } else { - tracing::error!( - ?pk, - "removing a join state entry but it is not in the cache" - ); + consistency_error!(?pk, "removing a join state entry but it's not in the cache"); Ok(()) } } diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 4214c38bdb779..1a78320de8cd1 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -74,10 +74,12 @@ mod consistency { static INSANE_MODE: LazyLock = LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE")); + /// Check if the insane mode is enabled. pub(crate) fn insane() -> bool { *INSANE_MODE } + /// Check if strict consistency is required. pub(crate) fn enable_strict_consistency() -> bool { let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency); if cfg!(test) { @@ -88,13 +90,34 @@ mod consistency { } } - macro_rules! inconsistency_panic { + /// Log an error message for breaking consistency. Must only be called in non-strict mode. + /// The log message will be suppressed if it is called too frequently. + macro_rules! consistency_error { + ($($arg:tt)*) => { + debug_assert!(!crate::consistency::enable_strict_consistency()); + + use std::sync::LazyLock; + use risingwave_common::log::LogSuppresser; + + static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!(suppressed_count, $($arg)*); + } + }; + } + pub(crate) use consistency_error; + + /// Log an error message for breaking consistency, then panic if strict consistency is required. + /// The log message will be suppressed if it is called too frequently. + macro_rules! consistency_panic { ($($arg:tt)*) => { - tracing::error!($($arg)*); if crate::consistency::enable_strict_consistency() { + tracing::error!($($arg)*); panic!("inconsistency happened, see error log for details"); + } else { + crate::consistency::consistency_error!($($arg)*); } }; } - pub(crate) use inconsistency_panic; + pub(crate) use consistency_panic; }