Skip to content

Commit

Permalink
fix(stream):hash join: check degree & row iterator have same length (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Jan 24, 2024
1 parent 603f5ea commit a9ce6c7
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::ops::{Bound, Deref, DerefMut};
use std::sync::Arc;

use anyhow::Context;
use futures::future::try_join;
use futures::future::{join, try_join};
use futures::StreamExt;
use futures_async_stream::for_await;
pub(super) use join_entry_state::JoinEntryState;
Expand Down Expand Up @@ -450,8 +450,20 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
let (table_iter, degree_table_iter) =
try_join(table_iter_fut, degree_table_iter_fut).await?;

#[for_await]
for (row, degree) in table_iter.zip(degree_table_iter) {
let mut pinned_table_iter = std::pin::pin!(table_iter);
let mut pinned_degree_table_iter = std::pin::pin!(degree_table_iter);
loop {
// Iterate on both iterators and ensure they have same size. Basically `zip_eq()`.
let (row, degree) =
join(pinned_table_iter.next(), pinned_degree_table_iter.next()).await;
let (row, degree) = match (row, degree) {
(None, None) => break,
(None, Some(_)) | (Some(_), None) => {
panic!("mismatched row and degree table of join key: {:?}", &key)
}
(Some(r), Some(d)) => (r, d),
};

let row = row?;
let degree_row = degree?;
let pk1 = row.key();
Expand Down

0 comments on commit a9ce6c7

Please sign in to comment.