Skip to content

Commit

Permalink
feat(streaming): lower high join amplification logging to > 2048 reco…
Browse files Browse the repository at this point in the history
…rds, and rate limit at 1 per minute (#16957)
  • Loading branch information
kwannoel authored May 28, 2024
1 parent 44306fd commit e2bdd4f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ pub struct StreamingDeveloperConfig {
/// If true, the arrangement backfill will be disabled,
/// even if session variable set.
pub enable_arrangement_backfill: bool,

#[serde(default = "default::developer::stream_high_join_amplification_threshold")]
/// If number of hash join matches exceeds this threshold number,
/// it will be logged.
pub high_join_amplification_threshold: usize,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1733,6 +1738,10 @@ pub mod default {
pub fn stream_enable_arrangement_backfill() -> bool {
true
}

pub fn stream_high_join_amplification_threshold() -> usize {
2048
}
}

pub use crate::system_param::default as system;
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ stream_memory_controller_eviction_factor_stable = 1.0
stream_memory_controller_sequence_tls_step = 128
stream_memory_controller_sequence_tls_lag = 32
stream_enable_arrangement_backfill = true
stream_high_join_amplification_threshold = 2048

[storage]
share_buffers_sync_parallelism = 1
Expand Down
24 changes: 19 additions & 5 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashSet};
use std::num::NonZeroU32;
use std::sync::LazyLock;
use std::time::Duration;

use governor::{Quota, RateLimiter};
use itertools::Itertools;
use multimap::MultiMap;
use risingwave_common::array::Op;
Expand Down Expand Up @@ -159,6 +160,8 @@ pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitiv

/// watermark column index -> `BufferedWatermarks`
watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,

high_join_amplification_threshold: usize,
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
Expand Down Expand Up @@ -195,6 +198,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
append_only_optimize: bool,
chunk_size: usize,
cnt_rows_received: &'a mut u32,
high_join_amplification_threshold: usize,
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
Expand All @@ -218,6 +222,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
is_append_only: bool,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
high_join_amplification_threshold: usize,
) -> Self {
let side_l_column_n = input_l.schema().len();

Expand Down Expand Up @@ -446,6 +451,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
chunk_size,
cnt_rows_received: 0,
watermark_buffers,
high_join_amplification_threshold,
}
}

Expand Down Expand Up @@ -539,6 +545,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
append_only_optimize: self.append_only_optimize,
chunk_size: self.chunk_size,
cnt_rows_received: &mut self.cnt_rows_received,
high_join_amplification_threshold: self.high_join_amplification_threshold,
}) {
left_time += left_start_time.elapsed();
yield Message::Chunk(chunk?);
Expand All @@ -563,6 +570,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
append_only_optimize: self.append_only_optimize,
chunk_size: self.chunk_size,
cnt_rows_received: &mut self.cnt_rows_received,
high_join_amplification_threshold: self.high_join_amplification_threshold,
}) {
right_time += right_start_time.elapsed();
yield Message::Chunk(chunk?);
Expand Down Expand Up @@ -773,6 +781,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
append_only_optimize,
chunk_size,
cnt_rows_received,
high_join_amplification_threshold,
..
} = args;

Expand Down Expand Up @@ -828,13 +837,16 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

if let Some(rows) = &matched_rows {
join_matched_join_keys.observe(rows.len() as _);
if rows.len() >= 10000 {
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if rows.len() > high_join_amplification_threshold {
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(|| {
LogSuppresser::new(RateLimiter::direct(Quota::per_minute(
NonZeroU32::new(1).unwrap(),
)))
});
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::warn!(target: "hash_join_amplification",
tracing::warn!(target: "high_join_amplification",
suppressed_count,
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
Expand Down Expand Up @@ -1205,6 +1217,7 @@ mod tests {
false,
Arc::new(StreamingMetrics::unused()),
1024,
2048,
);
(tx_l, tx_r, executor.boxed().execute())
}
Expand Down Expand Up @@ -1297,6 +1310,7 @@ mod tests {
true,
Arc::new(StreamingMetrics::unused()),
1024,
2048,
);
(tx_l, tx_r, executor.boxed().execute())
}
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/from_proto/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl ExecutorBuilder for HashJoinExecutorBuilder {
join_type_proto: node.get_join_type()?,
join_key_data_types,
chunk_size: params.env.config().developer.chunk_size,
high_join_amplification_threshold: params
.env
.config()
.developer
.high_join_amplification_threshold,
};

let exec = args.dispatch()?;
Expand Down Expand Up @@ -183,6 +188,7 @@ struct HashJoinExecutorDispatcherArgs<S: StateStore> {
join_type_proto: JoinTypeProto,
join_key_data_types: Vec<DataType>,
chunk_size: usize,
high_join_amplification_threshold: usize,
}

impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
Expand Down Expand Up @@ -211,6 +217,7 @@ impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
self.is_append_only,
self.metrics,
self.chunk_size,
self.high_join_amplification_threshold,
)
.boxed())
};
Expand Down

0 comments on commit e2bdd4f

Please sign in to comment.