Skip to content

Commit

Permalink
fix: remove the log suppression for join amplification (#17861)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Jul 31, 2024
1 parent 8cb8694 commit 2c23809
Showing 1 changed file with 11 additions and 23 deletions.
34 changes: 11 additions & 23 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashSet};
use std::num::NonZeroU32;
use std::sync::LazyLock;
use std::time::Duration;

use governor::{Quota, RateLimiter};
use itertools::Itertools;
use multimap::MultiMap;
use risingwave_common::array::Op;
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -840,25 +836,17 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
if let Some(rows) = &matched_rows {
join_matched_join_keys.observe(rows.len() as _);
if rows.len() > high_join_amplification_threshold {
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(|| {
LogSuppresser::new(RateLimiter::direct(Quota::per_minute(
NonZeroU32::new(1).unwrap(),
)))
});
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::warn!(target: "high_join_amplification",
suppressed_count,
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
}
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::warn!(target: "high_join_amplification",
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
}
} else {
join_matched_join_keys.observe(0.0)
Expand Down

0 comments on commit 2c23809

Please sign in to comment.