diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 99aae245125eb..cd276e8a966f4 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -976,6 +976,11 @@ pub struct StreamingDeveloperConfig { /// If true, the arrangement backfill will be disabled, /// even if session variable set. pub enable_arrangement_backfill: bool, + + #[serde(default = "default::developer::stream_high_join_amplification_threshold")] + /// If number of hash join matches exceeds this threshold number, + /// it will be logged. + pub high_join_amplification_threshold: usize, } /// The subsections `[batch.developer]`. @@ -1733,6 +1738,10 @@ pub mod default { pub fn stream_enable_arrangement_backfill() -> bool { true } + + pub fn stream_high_join_amplification_threshold() -> usize { + 2048 + } } pub use crate::system_param::default as system; diff --git a/src/config/example.toml b/src/config/example.toml index fc70258788bbc..a50b4b8c10d65 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -118,6 +118,7 @@ stream_memory_controller_eviction_factor_stable = 1.0 stream_memory_controller_sequence_tls_step = 128 stream_memory_controller_sequence_tls_lag = 32 stream_enable_arrangement_backfill = true +stream_high_join_amplification_threshold = 2048 [storage] share_buffers_sync_parallelism = 1 diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 264f6cbb67a81..0bf825138d92a 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -11,11 +11,12 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::collections::{BTreeMap, HashSet}; +use std::num::NonZeroU32; use std::sync::LazyLock; use std::time::Duration; +use governor::{Quota, RateLimiter}; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; @@ -159,6 +160,8 @@ pub struct HashJoinExecutor `BufferedWatermarks` watermark_buffers: BTreeMap>, + + high_join_amplification_threshold: usize, } impl std::fmt::Debug @@ -195,6 +198,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> { append_only_optimize: bool, chunk_size: usize, cnt_rows_received: &'a mut u32, + high_join_amplification_threshold: usize, } impl HashJoinExecutor { @@ -218,6 +222,7 @@ impl HashJoinExecutor, chunk_size: usize, + high_join_amplification_threshold: usize, ) -> Self { let side_l_column_n = input_l.schema().len(); @@ -446,6 +451,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor= 10000 { - static LOG_SUPPERSSER: LazyLock = - LazyLock::new(LogSuppresser::default); + if rows.len() > high_join_amplification_threshold { + static LOG_SUPPERSSER: LazyLock = LazyLock::new(|| { + LogSuppresser::new(RateLimiter::direct(Quota::per_minute( + NonZeroU32::new(1).unwrap(), + ))) + }); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { let join_key_data_types = side_update.ht.join_key_data_types(); let key = key.deserialize(join_key_data_types)?; - tracing::warn!(target: "hash_join_amplification", + tracing::warn!(target: "high_join_amplification", suppressed_count, matched_rows_len = rows.len(), update_table_id = side_update.ht.table_id(), @@ -1205,6 +1217,7 @@ mod tests { false, Arc::new(StreamingMetrics::unused()), 1024, + 2048, ); (tx_l, tx_r, executor.boxed().execute()) } @@ -1297,6 +1310,7 @@ mod tests { true, Arc::new(StreamingMetrics::unused()), 1024, + 2048, ); (tx_l, tx_r, executor.boxed().execute()) } diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index 28a6aa72d4a6f..2d421274cec39 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -155,6 +155,11 @@ impl ExecutorBuilder for HashJoinExecutorBuilder { join_type_proto: node.get_join_type()?, join_key_data_types, chunk_size: params.env.config().developer.chunk_size, + high_join_amplification_threshold: params + .env + .config() + .developer + .high_join_amplification_threshold, }; let exec = args.dispatch()?; @@ -183,6 +188,7 @@ struct HashJoinExecutorDispatcherArgs { join_type_proto: JoinTypeProto, join_key_data_types: Vec, chunk_size: usize, + high_join_amplification_threshold: usize, } impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { @@ -211,6 +217,7 @@ impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { self.is_append_only, self.metrics, self.chunk_size, + self.high_join_amplification_threshold, ) .boxed()) };