Skip to content

Commit

Permalink
feat(consistency): tolerate bad join degree (#16859)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored May 24, 2024
1 parent e5ad5d9 commit c40e4f8
Showing 1 changed file with 108 additions and 30 deletions.
138 changes: 108 additions & 30 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use std::alloc::Global;
use std::cmp::Ordering;
use std::ops::{Bound, Deref, DerefMut};
use std::sync::Arc;

use anyhow::Context;
use futures::future::{join, try_join};
use futures::StreamExt;
use futures::{pin_mut, stream, StreamExt};
use futures_async_stream::for_await;
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
use risingwave_common::buffer::Bitmap;
Expand All @@ -27,6 +28,7 @@ use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common_estimate_size::EstimateSize;
Expand All @@ -37,7 +39,7 @@ use super::row::{DegreeType, EncodedJoinRow};
use crate::cache::{new_with_hasher_in, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::consistency::{consistency_error, consistency_panic, enable_strict_consistency};
use crate::executor::error::StreamExecutorResult;
use crate::executor::join::row::JoinRow;
use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -387,39 +389,114 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

let mut pinned_table_iter = std::pin::pin!(table_iter);
let mut pinned_degree_table_iter = std::pin::pin!(degree_table_iter);

// For better tolerating inconsistent stream, we have to first buffer all rows and
// degree rows, and check the number of them, then iterate on them.
let mut rows = vec![];
let mut degree_rows = vec![];
let mut inconsistency_happened = false;
loop {
// Iterate on both iterators and ensure they have same size. Basically `zip_eq()`.
let (row, degree) =
let (row, degree_row) =
join(pinned_table_iter.next(), pinned_degree_table_iter.next()).await;
let (row, degree) = match (row, degree) {
let (row, degree_row) = match (row, degree_row) {
(None, None) => break,
(None, Some(_)) | (Some(_), None) => {
panic!("mismatched row and degree table of join key: {:?}", &key)
(None, Some(_)) => {
inconsistency_happened = true;
consistency_panic!(
"mismatched row and degree table of join key: {:?}, degree table has more rows",
&key
);
break;
}
(Some(_), None) => {
inconsistency_happened = true;
consistency_panic!(
"mismatched row and degree table of join key: {:?}, input table has more rows",
&key
);
break;
}
(Some(r), Some(d)) => (r, d),
};

let row = row?;
let degree_row = degree?;
let pk1 = row.key();
let pk2 = degree_row.key();
debug_assert_eq!(
pk1, pk2,
"mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}",
);
let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
let degree_row = degree_row?;
rows.push(row);
degree_rows.push(degree_row);
}

if inconsistency_happened {
// Pk-based row-degree pairing.
assert_ne!(rows.len(), degree_rows.len());

let row_iter = stream::iter(rows.into_iter()).peekable();
let degree_row_iter = stream::iter(degree_rows.into_iter()).peekable();
pin_mut!(row_iter);
pin_mut!(degree_row_iter);

loop {
match join(row_iter.as_mut().peek(), degree_row_iter.as_mut().peek()).await {
(None, _) | (_, None) => break,
(Some(row), Some(degree_row)) => match row.key().cmp(degree_row.key()) {
Ordering::Greater => {
degree_row_iter.next().await;
}
Ordering::Less => {
row_iter.next().await;
}
Ordering::Equal => {
let row = row_iter.next().await.unwrap();
let degree_row = degree_row_iter.next().await.unwrap();

let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64)
.encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
}
},
}
}
} else {
// 1 to 1 row-degree pairing.
// Actually it's possible that both the input data table and the degree table missed
// some equal number of rows, but let's ignore this case because it should be rare.

assert_eq!(rows.len(), degree_rows.len());

#[for_await]
for (row, degree_row) in
stream::iter(rows.into_iter().zip_eq_fast(degree_rows.into_iter()))
{
let pk1 = row.key();
let pk2 = degree_row.key();
debug_assert_eq!(
pk1, pk2,
"mismatched pk in degree table: pk1: {pk1:?}, pk2: {pk2:?}",
);
let pk = row
.as_ref()
.project(&self.state.pk_indices)
.memcmp_serialize(&self.pk_serializer);
let degree_i64 = degree_row
.datum_at(degree_row.len() - 1)
.expect("degree should not be NULL");
entry_state
.insert(
pk,
JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(),
)
.with_context(|| self.state.error_context(row.row()))?;
}
}
} else {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
Expand Down Expand Up @@ -603,9 +680,10 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
join_row: &mut JoinRow<OwnedRow>,
) {
self.manipulate_degree(join_row_ref, join_row, |d| {
*d = d
.checked_sub(1)
.expect("Tried to decrement zero join row degree")
*d = d.checked_sub(1).unwrap_or_else(|| {
consistency_panic!("Tried to decrement zero join row degree");
0
});
})
}

Expand Down

0 comments on commit c40e4f8

Please sign in to comment.