diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 427b036fe7fe..f16f918ddea0 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -13,12 +13,14 @@ // limitations under the License. use std::collections::{BTreeMap, HashSet}; +use std::sync::LazyLock; use std::time::Duration; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -829,18 +831,23 @@ impl HashJoinExecutor 10000 { - let join_key_data_types = side_update.ht.join_key_data_types(); - let key = key.deserialize(join_key_data_types)?; - tracing::debug!(target: "hash_join_amplification", - matched_rows_len = rows.len(), - update_table_id = side_update.ht.table_id(), - match_table_id = side_match.ht.table_id(), - join_key = ?key, - actor_id = ctx.id, - fragment_id = ctx.fragment_id, - "large rows matched for join key" - ); + if rows.len() >= 10000 { + static LOG_SUPPERSSER: LazyLock = + LazyLock::new(LogSuppresser::default); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + let join_key_data_types = side_update.ht.join_key_data_types(); + let key = key.deserialize(join_key_data_types)?; + tracing::warn!(target: "hash_join_amplification", + suppressed_count, + matched_rows_len = rows.len(), + update_table_id = side_update.ht.table_id(), + match_table_id = side_match.ht.table_id(), + join_key = ?key, + actor_id = ctx.id, + fragment_id = ctx.fragment_id, + "large rows matched for join key" + ); + } } } else { join_matched_join_keys.observe(0.0)