From 87ff2e88d6aa952d3de3cef339cdb61d265812a3 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 6 Jun 2024 14:32:22 +0800 Subject: [PATCH 1/3] add comment to be clearer Signed-off-by: Richard Chien --- src/stream/src/executor/over_window/frame_finder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/over_window/frame_finder.rs b/src/stream/src/executor/over_window/frame_finder.rs index 3154284653f11..d98ac5cebcb49 100644 --- a/src/stream/src/executor/over_window/frame_finder.rs +++ b/src/stream/src/executor/over_window/frame_finder.rs @@ -189,7 +189,7 @@ pub(super) fn find_left_for_range_frames<'cache>( logical_order_value: impl ToDatumRef, cache_key_pk_len: usize, // this is dirty but we have no better choice ) -> &'cache CacheKey { - find_for_range_frames::( + find_for_range_frames::( range_frames, part_with_delta, logical_order_value, @@ -206,7 +206,7 @@ pub(super) fn find_right_for_range_frames<'cache>( logical_order_value: impl ToDatumRef, cache_key_pk_len: usize, // this is dirty but we have no better choice ) -> &'cache CacheKey { - find_for_range_frames::( + find_for_range_frames::( range_frames, part_with_delta, logical_order_value, From f9de5d3299fabd635c0b9d278819a813b8bb1328 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 6 Jun 2024 16:24:59 +0800 Subject: [PATCH 2/3] simplify lifetime Signed-off-by: Richard Chien --- .../src/executor/over_window/frame_finder.rs | 12 ++--- .../src/executor/over_window/general.rs | 2 - .../executor/over_window/over_partition.rs | 44 +++++++++---------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/src/stream/src/executor/over_window/frame_finder.rs b/src/stream/src/executor/over_window/frame_finder.rs index d98ac5cebcb49..3c019e2b1354d 100644 --- a/src/stream/src/executor/over_window/frame_finder.rs +++ b/src/stream/src/executor/over_window/frame_finder.rs @@ -103,7 +103,7 @@ pub(super) fn merge_rows_frames(rows_frames: &[&RowsFrameBounds]) -> RowsFrameBo /// /// More examples can be found in the comment inside [`find_curr_for_rows_frame`]. pub(super) fn find_first_curr_for_rows_frame<'cache>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, delta_key: &'cache CacheKey, ) -> &'cache CacheKey { @@ -116,7 +116,7 @@ pub(super) fn find_first_curr_for_rows_frame<'cache>( /// /// This is the symmetric function of [`find_first_curr_for_rows_frame`]. pub(super) fn find_last_curr_for_rows_frame<'cache>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, delta_key: &'cache CacheKey, ) -> &'cache CacheKey { @@ -127,7 +127,7 @@ pub(super) fn find_last_curr_for_rows_frame<'cache>( /// to some CURRENT ROW, find the cache key corresponding to the start row in /// that frame. pub(super) fn find_frame_start_for_rows_frame<'cache>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, curr_key: &'cache CacheKey, ) -> &'cache CacheKey { @@ -140,7 +140,7 @@ pub(super) fn find_frame_start_for_rows_frame<'cache>( /// /// This is the symmetric function of [`find_frame_start_for_rows_frame`]. pub(super) fn find_frame_end_for_rows_frame<'cache>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, curr_key: &'cache CacheKey, ) -> &'cache CacheKey { @@ -217,7 +217,7 @@ pub(super) fn find_right_for_range_frames<'cache>( // -------------------------- ↑ PUBLIC INTERFACE ↑ -------------------------- fn find_curr_for_rows_frame<'cache, const LEFT: bool>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, delta_key: &'cache CacheKey, ) -> &'cache CacheKey { @@ -329,7 +329,7 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>( } fn find_boundary_for_rows_frame<'cache, const LEFT: bool>( - frame_bounds: &'_ RowsFrameBounds, + frame_bounds: &RowsFrameBounds, part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, curr_key: &'cache CacheKey, ) -> &'cache CacheKey { diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 815e3b6698a51..c382681c4ee01 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -434,8 +434,6 @@ impl OverWindowExecutor { // Find affected ranges, this also ensures that all rows in the affected ranges are loaded // into the cache. - // TODO(rc): maybe we can find affected ranges for each window function call (each frame) to simplify - // the implementation of `find_affected_ranges` let (part_with_delta, affected_ranges) = partition .find_affected_ranges(&this.state_table, &delta) .await?; diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 763965fa5227f..53dc6dafe7ca3 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -255,19 +255,19 @@ pub(super) struct OverPartitionStats { /// included for computing the new state. #[derive(Debug, Educe)] #[educe(Clone, Copy)] -pub(super) struct AffectedRange<'cache> { - pub first_frame_start: &'cache CacheKey, - pub first_curr_key: &'cache CacheKey, - pub last_curr_key: &'cache CacheKey, - pub last_frame_end: &'cache CacheKey, +pub(super) struct AffectedRange<'a> { + pub first_frame_start: &'a CacheKey, + pub first_curr_key: &'a CacheKey, + pub last_curr_key: &'a CacheKey, + pub last_frame_end: &'a CacheKey, } -impl<'cache> AffectedRange<'cache> { +impl<'a> AffectedRange<'a> { fn new( - first_frame_start: &'cache CacheKey, - first_curr_key: &'cache CacheKey, - last_curr_key: &'cache CacheKey, - last_frame_end: &'cache CacheKey, + first_frame_start: &'a CacheKey, + first_curr_key: &'a CacheKey, + last_curr_key: &'a CacheKey, + last_frame_end: &'a CacheKey, ) -> Self { Self { first_frame_start, @@ -436,16 +436,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> { /// Find all ranges in the partition that are affected by the given delta. /// The returned ranges are guaranteed to be sorted and non-overlapping. All keys in the ranges /// are guaranteed to be cached, which means they should be [`Sentinelled::Normal`]s. - pub async fn find_affected_ranges<'s, 'cache>( - &'s mut self, - table: &'_ StateTable, - delta: &'cache PartitionDelta, + pub async fn find_affected_ranges<'delta>( + &mut self, + table: &StateTable, + delta: &'delta PartitionDelta, ) -> StreamExecutorResult<( - DeltaBTreeMap<'cache, CacheKey, OwnedRow>, - Vec>, + DeltaBTreeMap<'delta, CacheKey, OwnedRow>, + Vec>, )> where - 's: 'cache, + 'a: 'delta, { let delta_first = delta.first_key_value().unwrap().0.as_normal_expect(); let delta_last = delta.last_key_value().unwrap().0.as_normal_expect(); @@ -472,7 +472,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { // `Self::find_affected_ranges_readonly` will return `Ok`. // SAFETY: Here we shortly borrow the range cache and turn the reference into a - // `'cache` one to bypass the borrow checker. This is safe because we only return + // `'delta` one to bypass the borrow checker. This is safe because we only return // the reference once we don't need to do any further mutation. let cache_inner = unsafe { &*(self.range_cache.inner() as *const _) }; let part_with_delta = DeltaBTreeMap::new(cache_inner, delta); @@ -510,11 +510,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> { /// TODO(rc): Currently at most one range will be in the result vector. Ideally we should /// recognize uncontinuous changes in the delta and find multiple ranges, but that will be /// too complex for now. - fn find_affected_ranges_readonly<'cache>( - &'_ self, - part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, + fn find_affected_ranges_readonly<'delta>( + &self, + part_with_delta: DeltaBTreeMap<'delta, CacheKey, OwnedRow>, range_frame_logical_curr: Option<&(Sentinelled, Sentinelled)>, - ) -> std::result::Result>, (bool, bool)> { + ) -> std::result::Result>, (bool, bool)> { if part_with_delta.first_key().is_none() { // nothing is left after applying the delta, meaning all entries are deleted return Ok(vec![]); From 491b6a7e0686b5bc8da71204b03667b3503792c5 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 7 Jun 2024 16:38:33 +0800 Subject: [PATCH 3/3] tolerate inconsistency in OverWindow Signed-off-by: Richard Chien --- .../src/executor/over_window/general.rs | 56 ++++++++++---- .../executor/over_window/over_partition.rs | 74 +++++++++++++++---- 2 files changed, 102 insertions(+), 28 deletions(-) diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index c382681c4ee01..23623be6e0f2c 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -29,7 +29,6 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_expr::window_function::{ create_window_state, StateKey, WindowFuncCall, WindowStates, }; -use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode; use super::over_partition::{ new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache, @@ -37,6 +36,7 @@ use super::over_partition::{ }; use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; +use crate::consistency::consistency_panic; use crate::executor::monitor::OverWindowMetrics; use crate::executor::over_window::over_partition::AffectedRange; use crate::executor::prelude::*; @@ -212,7 +212,20 @@ impl OverWindowExecutor { new_row: row, }; } - _ => panic!("inconsistent changes in input chunk"), + _ => { + consistency_panic!( + ?pk, + "inconsistent changes in input chunk, double-inserting" + ); + if let Record::Update { old_row, .. } = prev_change { + *prev_change = Record::Update { + old_row: *old_row, + new_row: row, + }; + } else { + *prev_change = Record::Insert { new_row: row }; + } + } } } else { changes_merged.insert(pk, Record::Insert { new_row: row }); @@ -232,7 +245,13 @@ impl OverWindowExecutor { old_row: *real_old_row, }; } - _ => panic!("inconsistent changes in input chunk"), + _ => { + consistency_panic!( + ?pk, + "inconsistent changes in input chunk, double-deleting" + ); + *prev_change = Record::Delete { old_row: row }; + } } } else { changes_merged.insert(pk, Record::Delete { old_row: row }); @@ -357,13 +376,17 @@ impl OverWindowExecutor { } } (existed, record) => { - let vnode = this.state_table.compute_vnode_by_pk(&key.pk); - let raw_key = serialize_pk_with_vnode( - &key.pk, - this.state_table.pk_serde(), - vnode, + // when stream is inconsistent, there may be an `Update` of which the old pk does not actually exist + consistency_panic!( + ?existed, + ?record, + "other cases should not exist", ); - panic!("other cases should not exist. raw_key: {:?}, existed: {:?}, new: {:?}", raw_key, existed, record); + + key_change_update_buffer.insert(pk, record); + if let Some(chunk) = chunk_builder.append_record(existed) { + yield chunk; + } } } } else { @@ -375,6 +398,15 @@ impl OverWindowExecutor { partition.write_record(&mut this.state_table, key, record); } + if !key_change_update_buffer.is_empty() { + consistency_panic!( + ?key_change_update_buffer, + "key-change update buffer should be empty after processing" + ); + // if in non-strict mode, we can reach here, but we don't know the `StateKey`, + // so just ignore the buffer. + } + let cache_len = partition.cache_real_len(); let stats = partition.summarize(); metrics @@ -423,19 +455,17 @@ impl OverWindowExecutor { async fn build_changes_for_partition( this: &ExecutorInner, partition: &mut OverPartition<'_, S>, - delta: PartitionDelta, + mut delta: PartitionDelta, ) -> StreamExecutorResult<( BTreeMap>, Option>, )> { - assert!(!delta.is_empty(), "if there's no delta, we won't be here"); - let mut part_changes = BTreeMap::new(); // Find affected ranges, this also ensures that all rows in the affected ranges are loaded // into the cache. let (part_with_delta, affected_ranges) = partition - .find_affected_ranges(&this.state_table, &delta) + .find_affected_ranges(&this.state_table, &mut delta) .await?; let snapshot = part_with_delta.snapshot(); diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 53dc6dafe7ca3..efc405aeb265e 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -36,6 +36,7 @@ use static_assertions::const_assert; use super::general::RowConverter; use crate::common::table::state_table::StateTable; +use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::over_window::frame_finder::*; use crate::executor::StreamExecutorResult; @@ -436,35 +437,31 @@ impl<'a, S: StateStore> OverPartition<'a, S> { /// Find all ranges in the partition that are affected by the given delta. /// The returned ranges are guaranteed to be sorted and non-overlapping. All keys in the ranges /// are guaranteed to be cached, which means they should be [`Sentinelled::Normal`]s. - pub async fn find_affected_ranges<'delta>( - &mut self, + pub async fn find_affected_ranges<'s, 'delta>( + &'s mut self, table: &StateTable, - delta: &'delta PartitionDelta, + delta: &'delta mut PartitionDelta, ) -> StreamExecutorResult<( DeltaBTreeMap<'delta, CacheKey, OwnedRow>, Vec>, )> where 'a: 'delta, + 's: 'delta, { + self.ensure_delta_in_cache(table, delta).await?; + let delta = &*delta; // let's make it immutable + + if delta.is_empty() { + return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![])); + } + let delta_first = delta.first_key_value().unwrap().0.as_normal_expect(); let delta_last = delta.last_key_value().unwrap().0.as_normal_expect(); let range_frame_logical_curr = calc_logical_curr_for_range_frames(&self.range_frames, delta_first, delta_last); - if self.cache_policy.is_full() { - // ensure everything is in the cache - self.extend_cache_to_boundary(table).await?; - } else { - // TODO(rc): later we should extend cache using `self.super_rows_frame_bounds` and - // `range_frame_logical_curr` as hints. - - // ensure the cache covers all delta (if possible) - self.extend_cache_by_range(table, delta_first..=delta_last) - .await?; - } - loop { // TERMINATEABILITY: `extend_cache_leftward_by_n` and `extend_cache_rightward_by_n` keep // pushing the cache to the boundary of current partition. In these two methods, when @@ -502,6 +499,53 @@ impl<'a, S: StateStore> OverPartition<'a, S> { } } + async fn ensure_delta_in_cache( + &mut self, + table: &StateTable, + delta: &mut PartitionDelta, + ) -> StreamExecutorResult<()> { + if delta.is_empty() { + return Ok(()); + } + + let delta_first = delta.first_key_value().unwrap().0.as_normal_expect(); + let delta_last = delta.last_key_value().unwrap().0.as_normal_expect(); + + if self.cache_policy.is_full() { + // ensure everything is in the cache + self.extend_cache_to_boundary(table).await?; + } else { + // TODO(rc): later we should extend cache using `self.super_rows_frame_bounds` and + // `range_frame_logical_curr` as hints. + + // ensure the cache covers all delta (if possible) + self.extend_cache_by_range(table, delta_first..=delta_last) + .await?; + } + + if !enable_strict_consistency() { + // in non-strict mode, we should ensure the delta is consistent with the cache + let cache = self.range_cache.inner(); + delta.retain(|key, change| match &*change { + Change::Insert(_) => { + // this also includes the case of double-insert and ghost-update, + // but since we already lost the information, let's just ignore it + true + } + Change::Delete => { + // if the key is not in the cache, it's a ghost-delete + let consistent = cache.contains_key(key); + if !consistent { + consistency_error!(?key, "removing a row with non-existing key"); + } + consistent + } + }); + } + + Ok(()) + } + /// Try to find affected ranges on immutable range cache + delta. If the algorithm reaches /// any sentinel node in the cache, which means some entries in the affected range may be /// in the state table, it returns an `Err((bool, bool))` to notify the caller that the