From 120103df7e4238ee08d3a6bd2f5450e3502488e2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 2 Dec 2024 11:32:19 +0800 Subject: [PATCH] write row --- src/stream/src/executor/hash_join.rs | 95 ++++++++++++++--------- src/stream/src/executor/join/hash_join.rs | 8 ++ 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 7cb6345a3d3b7..7ae7af7fed484 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -739,6 +739,7 @@ impl HashJoinExecutor) -> Option> { if !key.null_bitmap().is_subset(ht.null_matched()) { + tracing::info!("no match for join key: {:?}", key); None } else { Some(ht.take_state_opt(key)) @@ -788,7 +789,6 @@ impl HashJoinExecutor, append_only_optimize: bool, - entry_state_ref: &'a mut Option, ) { let mut entry_state = JoinEntryState::default(); let mut entry_state_count = 0; @@ -819,10 +819,18 @@ impl HashJoinExecutor { + tracing::info!("insert row: {:?}", row); + side_update.ht.insert_row(key, row)?; + } + Op::Delete | Op::UpdateDelete => { + side_update.ht.delete_row(key, row)?; + } + } if entry_state_count <= entry_state_max_rows { - *entry_state_ref = Some(entry_state); + side_match.ht.update_state(key, Box::new(entry_state)); } else { - *entry_state_ref = None; } } @@ -1137,41 +1145,56 @@ impl HashJoinExecutor { + #[for_await] + for chunk in Self::handle_cached_matched_rows( + Some(rows), + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } } - } else { - let mut entry_state = None; - #[for_await] - for chunk in Self::handle_fetch_matched_rows( - row, - key, - op, - &mut hashjoin_chunk_builder, - side_match, - side_update, - &useful_state_clean_columns, - cond, - append_only_optimize, - &mut entry_state, - ) { - yield chunk?; + Some(None) => { + #[for_await] + for chunk in Self::handle_fetch_matched_rows( + row, + key, + op, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } } - if let Some(entry_state) = entry_state { - side_match.ht.update_state(key, Box::new(entry_state)); + None => { + #[for_await] + for chunk in Self::handle_cached_matched_rows( + None, + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } } } } diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index cbaecc71f4505..1b626eb91bcd1 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -361,11 +361,13 @@ impl JoinHashMap { pub fn take_state_opt(&mut self, key: &K) -> Option { self.metrics.total_lookup_count += 1; if self.inner.contains(key) { + tracing::info!("hit cache for join key: {:?}", key); // Do not update the LRU statistics here with `peek_mut` since we will put the state // back. let mut state = self.inner.peek_mut(key).unwrap(); Some(state.take()) } else { + tracing::info!("miss cache for join key: {:?}", key); None } } @@ -394,6 +396,7 @@ impl JoinHashMap { #[try_stream(ok = (Vec, JoinRow), error = StreamExecutorError)] pub async fn fetch_matched_rows<'a>(&'a self, key: &'a K) { + tracing::debug!("fetching matched rows for join key: {:?}", key); let key = key.deserialize(&self.join_key_data_types)?; let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let table_iter = self @@ -409,6 +412,11 @@ impl JoinHashMap { .as_ref() .project(&self.state.pk_indices) .memcmp_serialize(&self.pk_serializer); + tracing::debug!( + "found matching rows for join key: {:?}, row: {:?}", + key, + row + ); yield (pk, JoinRow::new(row.into_owned_row(), 0)); } }