Skip to content

Commit

Permalink
write row
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 2, 2024
1 parent 6dffc0d commit 120103d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 36 deletions.
95 changes: 59 additions & 36 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
/// data chunk with the executor state
fn hash_eq_match_opt(key: &K, ht: &mut JoinHashMap<K, S>) -> Option<Option<HashValueType>> {
if !key.null_bitmap().is_subset(ht.null_matched()) {
tracing::info!("no match for join key: {:?}", key);
None
} else {
Some(ht.take_state_opt(key))
Expand Down Expand Up @@ -788,7 +789,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
useful_state_clean_columns: &'a [(usize, &'a Watermark)],
cond: &'a mut Option<NonStrictExpression>,
append_only_optimize: bool,
entry_state_ref: &'a mut Option<JoinEntryState>,
) {
let mut entry_state = JoinEntryState::default();
let mut entry_state_count = 0;
Expand Down Expand Up @@ -819,10 +819,18 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
}
}
match op {
Op::Insert | Op::UpdateInsert => {
tracing::info!("insert row: {:?}", row);
side_update.ht.insert_row(key, row)?;
}
Op::Delete | Op::UpdateDelete => {
side_update.ht.delete_row(key, row)?;
}
}
if entry_state_count <= entry_state_max_rows {
*entry_state_ref = Some(entry_state);
side_match.ht.update_state(key, Box::new(entry_state));
} else {
*entry_state_ref = None;
}
}

Expand Down Expand Up @@ -1137,41 +1145,56 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
// join_matched_join_keys.observe(0.0)
// }

if let Some(rows) = matched_rows {
#[for_await]
for chunk in Self::handle_cached_matched_rows(
rows,
row,
op,
key,
&mut hashjoin_chunk_builder,
side_match,
side_update,
&useful_state_clean_columns,
cond,
append_only_optimize,
) {
yield chunk?;
match matched_rows {
Some(Some(rows)) => {
#[for_await]
for chunk in Self::handle_cached_matched_rows(
Some(rows),
row,
op,
key,
&mut hashjoin_chunk_builder,
side_match,
side_update,
&useful_state_clean_columns,
cond,
append_only_optimize,
) {
yield chunk?;
}
}
} else {
let mut entry_state = None;
#[for_await]
for chunk in Self::handle_fetch_matched_rows(
row,
key,
op,
&mut hashjoin_chunk_builder,
side_match,
side_update,
&useful_state_clean_columns,
cond,
append_only_optimize,
&mut entry_state,
) {
yield chunk?;
Some(None) => {
#[for_await]
for chunk in Self::handle_fetch_matched_rows(
row,
key,
op,
&mut hashjoin_chunk_builder,
side_match,
side_update,
&useful_state_clean_columns,
cond,
append_only_optimize,
) {
yield chunk?;
}
}
if let Some(entry_state) = entry_state {
side_match.ht.update_state(key, Box::new(entry_state));
None => {
#[for_await]
for chunk in Self::handle_cached_matched_rows(
None,
row,
op,
key,
&mut hashjoin_chunk_builder,
side_match,
side_update,
&useful_state_clean_columns,
cond,
append_only_optimize,
) {
yield chunk?;
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,13 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
pub fn take_state_opt(&mut self, key: &K) -> Option<HashValueType> {
self.metrics.total_lookup_count += 1;
if self.inner.contains(key) {
tracing::info!("hit cache for join key: {:?}", key);
// Do not update the LRU statistics here with `peek_mut` since we will put the state
// back.
let mut state = self.inner.peek_mut(key).unwrap();
Some(state.take())
} else {
tracing::info!("miss cache for join key: {:?}", key);
None
}
}
Expand Down Expand Up @@ -394,6 +396,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

#[try_stream(ok = (Vec<u8>, JoinRow<OwnedRow>), error = StreamExecutorError)]
pub async fn fetch_matched_rows<'a>(&'a self, key: &'a K) {
tracing::debug!("fetching matched rows for join key: {:?}", key);
let key = key.deserialize(&self.join_key_data_types)?;
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let table_iter = self
Expand All @@ -409,6 +412,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
tracing::debug!(
"found matching rows for join key: {:?}, row: {:?}",
key,
row
);
yield (pk, JoinRow::new(row.into_owned_row(), 0));
}
}
Expand Down

0 comments on commit 120103d

Please sign in to comment.