Skip to content

Commit

Permalink
feat(stream): add debug logs for join key with high amplification (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored May 17, 2024
1 parent 30f1ebe commit 31cc7c0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 21
timeout_in_minutes: 23
retry: *auto-retry

- label: "end-to-end test (parallel)"
Expand Down
13 changes: 13 additions & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

if let Some(rows) = &matched_rows {
join_matched_join_keys.observe(rows.len() as _);
if rows.len() > 10000 {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::debug!(target: "hash_join_amplification",
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
}
} else {
join_matched_join_keys.observe(0.0)
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
pub fn table_id(&self) -> u32 {
self.state.table.table_id()
}

pub fn join_key_data_types(&self) -> &[DataType] {
&self.join_key_data_types
}
}

use risingwave_common_estimate_size::KvSize;
Expand Down

0 comments on commit 31cc7c0

Please sign in to comment.