diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index ff20e5346fc34..a1ca7bcdd7891 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -20,7 +20,7 @@ use std::ops::{Bound, Deref, DerefMut}; use std::sync::Arc; use anyhow::Context; -use futures::future::try_join; +use futures::future::{join, try_join}; use futures::StreamExt; use futures_async_stream::for_await; pub(super) use join_entry_state::JoinEntryState; @@ -450,8 +450,20 @@ impl JoinHashMap { let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; - #[for_await] - for (row, degree) in table_iter.zip(degree_table_iter) { + let mut pinned_table_iter = std::pin::pin!(table_iter); + let mut pinned_degree_table_iter = std::pin::pin!(degree_table_iter); + loop { + // Iterate on both iterators and ensure they have same size. Basically `zip_eq()`. + let (row, degree) = + join(pinned_table_iter.next(), pinned_degree_table_iter.next()).await; + let (row, degree) = match (row, degree) { + (None, None) => break, + (None, Some(_)) | (Some(_), None) => { + panic!("mismatched row and degree table of join key: {:?}", &key) + } + (Some(r), Some(d)) => (r, d), + }; + let row = row?; let degree_row = degree?; let pk1 = row.key();