Skip to content

Commit

Permalink
feat(stream): always log hash join large amplification records (#16840)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored May 20, 2024
1 parent de86f94 commit 7bb5480
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use std::collections::{BTreeMap, HashSet};
use std::sync::LazyLock;
use std::time::Duration;

use itertools::Itertools;
use multimap::MultiMap;
use risingwave_common::array::Op;
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -829,18 +831,23 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

if let Some(rows) = &matched_rows {
join_matched_join_keys.observe(rows.len() as _);
if rows.len() > 10000 {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::debug!(target: "hash_join_amplification",
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
if rows.len() >= 10000 {
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::warn!(target: "hash_join_amplification",
suppressed_count,
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
}
}
} else {
join_matched_join_keys.observe(0.0)
Expand Down

0 comments on commit 7bb5480

Please sign in to comment.