Skip to content

Commit

Permalink
fallback to pk-based row-degree pairing when inconsistency happens
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed May 21, 2024
1 parent 0c6de0f commit 099212e
Showing 1 changed file with 103 additions and 26 deletions.
129 changes: 103 additions & 26 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use std::alloc::Global;
use std::cmp::Ordering;
use std::ops::{Bound, Deref, DerefMut};
use std::sync::Arc;

use anyhow::Context;
use futures::future::{join, try_join};
use futures::StreamExt;
use futures::{pin_mut, stream, StreamExt};
use futures_async_stream::for_await;
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
use risingwave_common::buffer::Bitmap;
Expand All @@ -27,6 +28,7 @@ use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common_estimate_size::EstimateSize;
Expand Down Expand Up @@ -387,39 +389,114 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

let mut pinned_table_iter = std::pin::pin!(table_iter);
let mut pinned_degree_table_iter = std::pin::pin!(degree_table_iter);

// For better tolerating inconsistent stream, we have to first buffer all rows and
// degree rows, and check the number of them, then iterate on them.
let mut rows = vec![];
let mut degree_rows = vec![];
let mut inconsistency_happened = false;
loop {
// Iterate on both iterators and ensure they have same size. Basically `zip_eq()`.
let (row, degree) =
let (row, degree_row) =
join(pinned_table_iter.next(), pinned_degree_table_iter.next()).await;
let (row, degree) = match (row, degree) {
let (row, degree_row) = match (row, degree_row) {
(None, None) => break,
(None, Some(_)) | (Some(_), None) => {
panic!("mismatched row and degree table of join key: {:?}", &key)
(None, Some(_)) => {
inconsistency_happened = true;
consistency_panic!(
"mismatched row and degree table of join key: {:?}, degree table has more rows",
&key
);
break;
}
(Some(_), None) => {
inconsistency_happened = true;
consistency_panic!(
"mismatched row and degree table of join key: {:?}, input table has more rows",
&key
);
break;
}
(Some(r), Some(d)) => (r, d),
};

let row = row?;
let degree_row = degree?;
let pk1 = row.key();
let pk2 = degree_row.key();
debug_assert_eq!(
pk1, pk2,
"mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}",
);
let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
let degree_row = degree_row?;
rows.push(row);
degree_rows.push(degree_row);
}

if inconsistency_happened {
// Pk-based row-degree pairing.
assert_ne!(rows.len(), degree_rows.len());

let row_iter = stream::iter(rows.into_iter()).peekable();
let degree_row_iter = stream::iter(degree_rows.into_iter()).peekable();
pin_mut!(row_iter);
pin_mut!(degree_row_iter);

loop {
match join(row_iter.as_mut().peek(), degree_row_iter.as_mut().peek()).await {
(None, _) | (_, None) => break,
(Some(row), Some(degree_row)) => match row.key().cmp(degree_row.key()) {
Ordering::Greater => {
degree_row_iter.next().await;
}
Ordering::Less => {
row_iter.next().await;
}
Ordering::Equal => {
let row = row_iter.next().await.unwrap();
let degree_row = degree_row_iter.next().await.unwrap();

let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64)
.encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
}
},
}
}
} else {
// 1 to 1 row-degree pairing.
// Actually it's possible that both the input data table and the degree table missed
// some equal number of rows, but let's ignore this case because it should be rare.

assert_eq!(rows.len(), degree_rows.len());

#[for_await]
for (row, degree_row) in
stream::iter(rows.into_iter().zip_eq_fast(degree_rows.into_iter()))
{
let pk1 = row.key();
let pk2 = degree_row.key();
debug_assert_eq!(
pk1, pk2,
"mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}",
);
let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
}
}
} else {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
Expand Down

0 comments on commit 099212e

Please sign in to comment.