diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 0c6a12b1e68f5..ab9b4138cba4e 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::alloc::Global; +use std::cmp::Ordering; use std::ops::{Bound, Deref, DerefMut}; use std::sync::Arc; use anyhow::Context; use futures::future::{join, try_join}; -use futures::StreamExt; +use futures::{pin_mut, stream, StreamExt}; use futures_async_stream::for_await; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use risingwave_common::buffer::Bitmap; @@ -27,6 +28,7 @@ use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common_estimate_size::EstimateSize; @@ -37,7 +39,7 @@ use super::row::{DegreeType, EncodedJoinRow}; use crate::cache::{new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; -use crate::consistency::{consistency_error, enable_strict_consistency}; +use crate::consistency::{consistency_error, consistency_panic, enable_strict_consistency}; use crate::executor::error::StreamExecutorResult; use crate::executor::join::row::JoinRow; use crate::executor::monitor::StreamingMetrics; @@ -387,39 +389,114 @@ impl JoinHashMap { let mut pinned_table_iter = std::pin::pin!(table_iter); let mut pinned_degree_table_iter = std::pin::pin!(degree_table_iter); + + // For better tolerating inconsistent stream, we have to first buffer all rows and + // degree rows, and check the number of them, then iterate on them. + let mut rows = vec![]; + let mut degree_rows = vec![]; + let mut inconsistency_happened = false; loop { - // Iterate on both iterators and ensure they have same size. Basically `zip_eq()`. - let (row, degree) = + let (row, degree_row) = join(pinned_table_iter.next(), pinned_degree_table_iter.next()).await; - let (row, degree) = match (row, degree) { + let (row, degree_row) = match (row, degree_row) { (None, None) => break, - (None, Some(_)) | (Some(_), None) => { - panic!("mismatched row and degree table of join key: {:?}", &key) + (None, Some(_)) => { + inconsistency_happened = true; + consistency_panic!( + "mismatched row and degree table of join key: {:?}, degree table has more rows", + &key + ); + break; + } + (Some(_), None) => { + inconsistency_happened = true; + consistency_panic!( + "mismatched row and degree table of join key: {:?}, input table has more rows", + &key + ); + break; } (Some(r), Some(d)) => (r, d), }; let row = row?; - let degree_row = degree?; - let pk1 = row.key(); - let pk2 = degree_row.key(); - debug_assert_eq!( - pk1, pk2, - "mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}", - ); - let pk = row - .as_ref() - .project(&self.state.pk_indices) - .memcmp_serialize(&self.pk_serializer); - let degree_i64 = degree_row - .datum_at(degree_row.len() - 1) - .expect("degree should not be NULL"); - entry_state - .insert( - pk, - JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(), - ) - .with_context(|| self.state.error_context(row.row()))?; + let degree_row = degree_row?; + rows.push(row); + degree_rows.push(degree_row); + } + + if inconsistency_happened { + // Pk-based row-degree pairing. + assert_ne!(rows.len(), degree_rows.len()); + + let row_iter = stream::iter(rows.into_iter()).peekable(); + let degree_row_iter = stream::iter(degree_rows.into_iter()).peekable(); + pin_mut!(row_iter); + pin_mut!(degree_row_iter); + + loop { + match join(row_iter.as_mut().peek(), degree_row_iter.as_mut().peek()).await { + (None, _) | (_, None) => break, + (Some(row), Some(degree_row)) => match row.key().cmp(degree_row.key()) { + Ordering::Greater => { + degree_row_iter.next().await; + } + Ordering::Less => { + row_iter.next().await; + } + Ordering::Equal => { + let row = row_iter.next().await.unwrap(); + let degree_row = degree_row_iter.next().await.unwrap(); + + let pk = row + .as_ref() + .project(&self.state.pk_indices) + .memcmp_serialize(&self.pk_serializer); + let degree_i64 = degree_row + .datum_at(degree_row.len() - 1) + .expect("degree should not be NULL"); + entry_state + .insert( + pk, + JoinRow::new(row.row(), degree_i64.into_int64() as u64) + .encode(), + ) + .with_context(|| self.state.error_context(row.row()))?; + } + }, + } + } + } else { + // 1 to 1 row-degree pairing. + // Actually it's possible that both the input data table and the degree table missed + // some equal number of rows, but let's ignore this case because it should be rare. + + assert_eq!(rows.len(), degree_rows.len()); + + #[for_await] + for (row, degree_row) in + stream::iter(rows.into_iter().zip_eq_fast(degree_rows.into_iter())) + { + let pk1 = row.key(); + let pk2 = degree_row.key(); + debug_assert_eq!( + pk1, pk2, + "mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}", + ); + let pk = row + .as_ref() + .project(&self.state.pk_indices) + .memcmp_serialize(&self.pk_serializer); + let degree_i64 = degree_row + .datum_at(degree_row.len() - 1) + .expect("degree should not be NULL"); + entry_state + .insert( + pk, + JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(), + ) + .with_context(|| self.state.error_context(row.row()))?; + } } } else { let sub_range: &(Bound, Bound) = @@ -603,9 +680,10 @@ impl JoinHashMap { join_row: &mut JoinRow, ) { self.manipulate_degree(join_row_ref, join_row, |d| { - *d = d - .checked_sub(1) - .expect("Tried to decrement zero join row degree") + *d = d.checked_sub(1).unwrap_or_else(|| { + consistency_panic!("Tried to decrement zero join row degree"); + 0 + }); }) }