From 7bb54804afd1f27d29e3f4361c2d1cab79c6db21 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 21 May 2024 00:17:51 +0800 Subject: [PATCH] feat(stream): always log hash join large amplification records (#16840) --- src/stream/src/executor/hash_join.rs | 31 +++++++++++++++++----------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 427b036fe7fe..f16f918ddea0 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -13,12 +13,14 @@ // limitations under the License. use std::collections::{BTreeMap, HashSet}; +use std::sync::LazyLock; use std::time::Duration; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -829,18 +831,23 @@ impl HashJoinExecutor 10000 { - let join_key_data_types = side_update.ht.join_key_data_types(); - let key = key.deserialize(join_key_data_types)?; - tracing::debug!(target: "hash_join_amplification", - matched_rows_len = rows.len(), - update_table_id = side_update.ht.table_id(), - match_table_id = side_match.ht.table_id(), - join_key = ?key, - actor_id = ctx.id, - fragment_id = ctx.fragment_id, - "large rows matched for join key" - ); + if rows.len() >= 10000 { + static LOG_SUPPERSSER: LazyLock = + LazyLock::new(LogSuppresser::default); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + let join_key_data_types = side_update.ht.join_key_data_types(); + let key = key.deserialize(join_key_data_types)?; + tracing::warn!(target: "hash_join_amplification", + suppressed_count, + matched_rows_len = rows.len(), + update_table_id = side_update.ht.table_id(), + match_table_id = side_match.ht.table_id(), + join_key = ?key, + actor_id = ctx.id, + fragment_id = ctx.fragment_id, + "large rows matched for join key" + ); + } } } else { join_matched_join_keys.observe(0.0)