Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consistency): tolerate inconsistent stream in OverWindow #17168

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/stream/src/executor/over_window/frame_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub(super) fn merge_rows_frames(rows_frames: &[&RowsFrameBounds]) -> RowsFrameBo
///
/// More examples can be found in the comment inside [`find_curr_for_rows_frame`].
pub(super) fn find_first_curr_for_rows_frame<'cache>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
delta_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand All @@ -116,7 +116,7 @@ pub(super) fn find_first_curr_for_rows_frame<'cache>(
///
/// This is the symmetric function of [`find_first_curr_for_rows_frame`].
pub(super) fn find_last_curr_for_rows_frame<'cache>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
delta_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand All @@ -127,7 +127,7 @@ pub(super) fn find_last_curr_for_rows_frame<'cache>(
/// to some CURRENT ROW, find the cache key corresponding to the start row in
/// that frame.
pub(super) fn find_frame_start_for_rows_frame<'cache>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
curr_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand All @@ -140,7 +140,7 @@ pub(super) fn find_frame_start_for_rows_frame<'cache>(
///
/// This is the symmetric function of [`find_frame_start_for_rows_frame`].
pub(super) fn find_frame_end_for_rows_frame<'cache>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
curr_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand Down Expand Up @@ -189,7 +189,7 @@ pub(super) fn find_left_for_range_frames<'cache>(
logical_order_value: impl ToDatumRef,
cache_key_pk_len: usize, // this is dirty but we have no better choice
) -> &'cache CacheKey {
find_for_range_frames::<true>(
find_for_range_frames::<true /* LEFT */>(
range_frames,
part_with_delta,
logical_order_value,
Expand All @@ -206,7 +206,7 @@ pub(super) fn find_right_for_range_frames<'cache>(
logical_order_value: impl ToDatumRef,
cache_key_pk_len: usize, // this is dirty but we have no better choice
) -> &'cache CacheKey {
find_for_range_frames::<false>(
find_for_range_frames::<false /* RIGHT */>(
range_frames,
part_with_delta,
logical_order_value,
Expand All @@ -217,7 +217,7 @@ pub(super) fn find_right_for_range_frames<'cache>(
// -------------------------- ↑ PUBLIC INTERFACE ↑ --------------------------

fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
delta_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand Down Expand Up @@ -329,7 +329,7 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
}

fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
frame_bounds: &'_ RowsFrameBounds,
frame_bounds: &RowsFrameBounds,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
curr_key: &'cache CacheKey,
) -> &'cache CacheKey {
Expand Down
58 changes: 43 additions & 15 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::window_function::{
create_window_state, StateKey, WindowFuncCall, WindowStates,
};
use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode;

use super::over_partition::{
new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache,
PartitionDelta,
};
use crate::cache::ManagedLruCache;
use crate::common::metrics::MetricsInfo;
use crate::consistency::consistency_panic;
use crate::executor::monitor::OverWindowMetrics;
use crate::executor::over_window::over_partition::AffectedRange;
use crate::executor::prelude::*;
Expand Down Expand Up @@ -212,7 +212,20 @@ impl<S: StateStore> OverWindowExecutor<S> {
new_row: row,
};
}
_ => panic!("inconsistent changes in input chunk"),
_ => {
consistency_panic!(
?pk,
"inconsistent changes in input chunk, double-inserting"
);
if let Record::Update { old_row, .. } = prev_change {
*prev_change = Record::Update {
old_row: *old_row,
new_row: row,
};
} else {
*prev_change = Record::Insert { new_row: row };
}
}
}
} else {
changes_merged.insert(pk, Record::Insert { new_row: row });
Expand All @@ -232,7 +245,13 @@ impl<S: StateStore> OverWindowExecutor<S> {
old_row: *real_old_row,
};
}
_ => panic!("inconsistent changes in input chunk"),
_ => {
consistency_panic!(
?pk,
"inconsistent changes in input chunk, double-deleting"
);
*prev_change = Record::Delete { old_row: row };
}
}
} else {
changes_merged.insert(pk, Record::Delete { old_row: row });
Expand Down Expand Up @@ -357,13 +376,17 @@ impl<S: StateStore> OverWindowExecutor<S> {
}
}
(existed, record) => {
let vnode = this.state_table.compute_vnode_by_pk(&key.pk);
let raw_key = serialize_pk_with_vnode(
&key.pk,
this.state_table.pk_serde(),
vnode,
// when stream is inconsistent, there may be an `Update` of which the old pk does not actually exist
consistency_panic!(
?existed,
?record,
"other cases should not exist",
);
panic!("other cases should not exist. raw_key: {:?}, existed: {:?}, new: {:?}", raw_key, existed, record);

key_change_update_buffer.insert(pk, record);
if let Some(chunk) = chunk_builder.append_record(existed) {
yield chunk;
}
}
}
} else {
Expand All @@ -375,6 +398,15 @@ impl<S: StateStore> OverWindowExecutor<S> {
partition.write_record(&mut this.state_table, key, record);
}

if !key_change_update_buffer.is_empty() {
consistency_panic!(
?key_change_update_buffer,
"key-change update buffer should be empty after processing"
);
// if in non-strict mode, we can reach here, but we don't know the `StateKey`,
// so just ignore the buffer.
}

let cache_len = partition.cache_real_len();
let stats = partition.summarize();
metrics
Expand Down Expand Up @@ -423,21 +455,17 @@ impl<S: StateStore> OverWindowExecutor<S> {
async fn build_changes_for_partition(
this: &ExecutorInner<S>,
partition: &mut OverPartition<'_, S>,
delta: PartitionDelta,
mut delta: PartitionDelta,
) -> StreamExecutorResult<(
BTreeMap<StateKey, Record<OwnedRow>>,
Option<RangeInclusive<StateKey>>,
)> {
assert!(!delta.is_empty(), "if there's no delta, we won't be here");

let mut part_changes = BTreeMap::new();

// Find affected ranges, this also ensures that all rows in the affected ranges are loaded
// into the cache.
// TODO(rc): maybe we can find affected ranges for each window function call (each frame) to simplify
// the implementation of `find_affected_ranges`
let (part_with_delta, affected_ranges) = partition
.find_affected_ranges(&this.state_table, &delta)
.find_affected_ranges(&this.state_table, &mut delta)
.await?;

let snapshot = part_with_delta.snapshot();
Expand Down
110 changes: 77 additions & 33 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use static_assertions::const_assert;

use super::general::RowConverter;
use crate::common::table::state_table::StateTable;
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::executor::over_window::frame_finder::*;
use crate::executor::StreamExecutorResult;

Expand Down Expand Up @@ -255,19 +256,19 @@ pub(super) struct OverPartitionStats {
/// included for computing the new state.
#[derive(Debug, Educe)]
#[educe(Clone, Copy)]
pub(super) struct AffectedRange<'cache> {
pub first_frame_start: &'cache CacheKey,
pub first_curr_key: &'cache CacheKey,
pub last_curr_key: &'cache CacheKey,
pub last_frame_end: &'cache CacheKey,
pub(super) struct AffectedRange<'a> {
pub first_frame_start: &'a CacheKey,
pub first_curr_key: &'a CacheKey,
pub last_curr_key: &'a CacheKey,
pub last_frame_end: &'a CacheKey,
}

impl<'cache> AffectedRange<'cache> {
impl<'a> AffectedRange<'a> {
fn new(
first_frame_start: &'cache CacheKey,
first_curr_key: &'cache CacheKey,
last_curr_key: &'cache CacheKey,
last_frame_end: &'cache CacheKey,
first_frame_start: &'a CacheKey,
first_curr_key: &'a CacheKey,
last_curr_key: &'a CacheKey,
last_frame_end: &'a CacheKey,
) -> Self {
Self {
first_frame_start,
Expand Down Expand Up @@ -436,43 +437,39 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
/// Find all ranges in the partition that are affected by the given delta.
/// The returned ranges are guaranteed to be sorted and non-overlapping. All keys in the ranges
/// are guaranteed to be cached, which means they should be [`Sentinelled::Normal`]s.
pub async fn find_affected_ranges<'s, 'cache>(
pub async fn find_affected_ranges<'s, 'delta>(
&'s mut self,
table: &'_ StateTable<S>,
delta: &'cache PartitionDelta,
table: &StateTable<S>,
delta: &'delta mut PartitionDelta,
) -> StreamExecutorResult<(
DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
Vec<AffectedRange<'cache>>,
DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
Vec<AffectedRange<'delta>>,
)>
where
's: 'cache,
'a: 'delta,
's: 'delta,
{
self.ensure_delta_in_cache(table, delta).await?;
let delta = &*delta; // let's make it immutable

if delta.is_empty() {
return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![]));
}

let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();

let range_frame_logical_curr =
calc_logical_curr_for_range_frames(&self.range_frames, delta_first, delta_last);

if self.cache_policy.is_full() {
// ensure everything is in the cache
self.extend_cache_to_boundary(table).await?;
} else {
// TODO(rc): later we should extend cache using `self.super_rows_frame_bounds` and
// `range_frame_logical_curr` as hints.

// ensure the cache covers all delta (if possible)
self.extend_cache_by_range(table, delta_first..=delta_last)
.await?;
}

loop {
// TERMINATEABILITY: `extend_cache_leftward_by_n` and `extend_cache_rightward_by_n` keep
// pushing the cache to the boundary of current partition. In these two methods, when
// any side of boundary is reached, the sentinel key will be removed, so finally
// `Self::find_affected_ranges_readonly` will return `Ok`.

// SAFETY: Here we shortly borrow the range cache and turn the reference into a
// `'cache` one to bypass the borrow checker. This is safe because we only return
// `'delta` one to bypass the borrow checker. This is safe because we only return
// the reference once we don't need to do any further mutation.
let cache_inner = unsafe { &*(self.range_cache.inner() as *const _) };
let part_with_delta = DeltaBTreeMap::new(cache_inner, delta);
Expand Down Expand Up @@ -502,6 +499,53 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
}
}

async fn ensure_delta_in_cache(
&mut self,
table: &StateTable<S>,
delta: &mut PartitionDelta,
) -> StreamExecutorResult<()> {
if delta.is_empty() {
return Ok(());
}

let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();

if self.cache_policy.is_full() {
// ensure everything is in the cache
self.extend_cache_to_boundary(table).await?;
} else {
// TODO(rc): later we should extend cache using `self.super_rows_frame_bounds` and
// `range_frame_logical_curr` as hints.

// ensure the cache covers all delta (if possible)
self.extend_cache_by_range(table, delta_first..=delta_last)
.await?;
}

if !enable_strict_consistency() {
// in non-strict mode, we should ensure the delta is consistent with the cache
let cache = self.range_cache.inner();
delta.retain(|key, change| match &*change {
Change::Insert(_) => {
// this also includes the case of double-insert and ghost-update,
// but since we already lost the information, let's just ignore it
true
}
Change::Delete => {
// if the key is not in the cache, it's a ghost-delete
let consistent = cache.contains_key(key);
if !consistent {
consistency_error!(?key, "removing a row with non-existing key");
}
consistent
}
});
}

Ok(())
}

/// Try to find affected ranges on immutable range cache + delta. If the algorithm reaches
/// any sentinel node in the cache, which means some entries in the affected range may be
/// in the state table, it returns an `Err((bool, bool))` to notify the caller that the
Expand All @@ -510,11 +554,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
/// TODO(rc): Currently at most one range will be in the result vector. Ideally we should
/// recognize uncontinuous changes in the delta and find multiple ranges, but that will be
/// too complex for now.
fn find_affected_ranges_readonly<'cache>(
&'_ self,
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
fn find_affected_ranges_readonly<'delta>(
&self,
part_with_delta: DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
range_frame_logical_curr: Option<&(Sentinelled<Datum>, Sentinelled<Datum>)>,
) -> std::result::Result<Vec<AffectedRange<'cache>>, (bool, bool)> {
) -> std::result::Result<Vec<AffectedRange<'delta>>, (bool, bool)> {
if part_with_delta.first_key().is_none() {
// nothing is left after applying the delta, meaning all entries are deleted
return Ok(vec![]);
Expand Down
Loading