diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index b627328cebf3e..d8211b4ad076c 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -30,7 +30,7 @@ use crate::executor::error::StreamExecutorResult; /// `CacheKey` is composed of `(order_by, remaining columns of pk)`. pub type CacheKey = (Vec, Vec); -type Cache = EstimatedBTreeMap; +pub type Cache = EstimatedBTreeMap; const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2; const TOPN_CACHE_MIN_CAPACITY: usize = 10; @@ -47,19 +47,32 @@ const TOPN_CACHE_MIN_CAPACITY: usize = 10; /// `OFFSET m FETCH FIRST n ROWS WITH TIES` and `m <= RANK() <= n` are not supported now, /// since they have different semantics. pub struct TopNCache { - /// Rows in the range `[0, offset)` - pub low: Cache, - /// Rows in the range `[offset, offset+limit)` + /// Rows in the range `[0, offset)`. Should always be synced with state table. + pub low: Option, + + /// Rows in the range `[offset, offset+limit)`. Should always be synced with state table. /// /// When `WITH_TIES` is true, it also stores ties for the last element, /// and thus the size can be larger than `limit`. pub middle: Cache, - /// Rows in the range `[offset+limit, offset+limit+high_capacity)` + + /// Cache of the beginning rows in the range `[offset+limit, ...)`. /// - /// When `WITH_TIES` is true, it also stores ties for the last element, - /// and thus the size can be larger than `high_capacity`. + /// This is very similar to [`TopNStateCache`], which only caches the top-N rows in the table + /// and only accepts new records that are less than the largest in the cache. + /// + /// When `WITH_TIES` is true, it guarantees that the ties of the last element are in the cache, + /// and thus the size can be larger than `rest_cache_capacity`. + /// + /// When the cache becomes empty, if the `table_row_count` is not matched, we need to view the cache + /// as unsynced and refill it from the state table. + /// + /// TODO(rc): later we should reuse [`TopNStateCache`] here. + /// + /// [`TopNStateCache`]: crate::common::state_cache::TopNStateCache pub high: Cache, - pub high_capacity: usize, + pub high_cache_capacity: usize, + pub offset: usize, /// Assumption: `limit != 0` pub limit: usize, @@ -87,8 +100,8 @@ impl Debug for TopNCache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "TopNCache {{\n offset: {}, limit: {}, high_capacity: {},\n", - self.offset, self.limit, self.high_capacity + "TopNCache {{\n offset: {}, limit: {}, high_cache_capacity: {},\n", + self.offset, self.limit, self.high_cache_capacity )?; fn format_cache( @@ -113,7 +126,11 @@ impl Debug for TopNCache { } writeln!(f, " low:")?; - format_cache(f, &self.low, &self.data_types)?; + if let Some(low) = &self.low { + format_cache(f, low, &self.data_types)?; + } else { + writeln!(f, " ")?; + } writeln!(f, "\n middle:")?; format_cache(f, &self.middle, &self.data_types)?; writeln!(f, "\n high:")?; @@ -146,7 +163,7 @@ pub trait TopNCacheTrait { /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be /// used to generate messages to be sent to downstream operators. /// - /// Because we may need to add data from the state table to `self.high` during the delete + /// Because we may need to refill data from the state table to `self.high` during the delete /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix /// scan of the state table. #[allow(clippy::too_many_arguments)] @@ -164,21 +181,22 @@ pub trait TopNCacheTrait { impl TopNCache { /// `data_types` -- Data types for the full row. pub fn new(offset: usize, limit: usize, data_types: Vec) -> Self { - assert!(limit != 0); + assert!(limit > 0); if WITH_TIES { // It's trickier to support. // Also `OFFSET WITH TIES` has different semantic with `a < RANK() < b` assert!(offset == 0, "OFFSET is not supported with WITH TIES"); } + let high_cache_capacity = offset + .checked_add(limit) + .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) + .unwrap_or(usize::MAX) + .max(TOPN_CACHE_MIN_CAPACITY); Self { - low: Cache::new(), + low: if offset > 0 { Some(Cache::new()) } else { None }, middle: Cache::new(), high: Cache::new(), - high_capacity: offset - .checked_add(limit) - .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) - .unwrap_or(usize::MAX) - .max(TOPN_CACHE_MIN_CAPACITY), + high_cache_capacity, offset, limit, table_row_count: None, @@ -189,37 +207,35 @@ impl TopNCache { /// Clear the cache. After this, the cache must be `init` again before use. #[allow(dead_code)] pub fn clear(&mut self) { - self.low.clear(); + self.low.as_mut().map(Cache::clear); self.middle.clear(); self.high.clear(); } /// Get total count of entries in the cache. pub fn len(&self) -> usize { - self.low.len() + self.middle.len() + self.high.len() + self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len() } pub(super) fn update_table_row_count(&mut self, table_row_count: usize) { self.table_row_count = Some(table_row_count) } - fn table_row_count_matched(&self) -> bool { - self.table_row_count - .map(|n| n == self.len()) - .unwrap_or(false) - } - - pub fn is_low_cache_full(&self) -> bool { - assert!(self.low.len() <= self.offset); - let full = self.low.len() == self.offset; - if !full { - assert!(self.middle.is_empty()); - assert!(self.high.is_empty()); + pub fn low_is_full(&self) -> bool { + if let Some(low) = &self.low { + assert!(low.len() <= self.offset); + let full = low.len() == self.offset; + if !full { + assert!(self.middle.is_empty()); + assert!(self.high.is_empty()); + } + full + } else { + true } - full } - pub fn is_middle_cache_full(&self) -> bool { + pub fn middle_is_full(&self) -> bool { // For WITH_TIES, the middle cache can exceed the capacity. if !WITH_TIES { assert!( @@ -229,7 +245,7 @@ impl TopNCache { } let full = self.middle.len() >= self.limit; if full { - assert!(self.is_low_cache_full()); + assert!(self.low_is_full()); } else { assert!( self.high.is_empty(), @@ -239,46 +255,34 @@ impl TopNCache { full } - pub fn is_high_cache_full(&self) -> bool { + pub fn high_is_full(&self) -> bool { // For WITH_TIES, the high cache can exceed the capacity. if !WITH_TIES { - assert!(self.high.len() <= self.high_capacity); + assert!(self.high.len() <= self.high_cache_capacity); } - self.high.len() >= self.high_capacity + self.high.len() >= self.high_cache_capacity } - fn last_cache_key_before_high(&self) -> Option<&CacheKey> { - let middle_last_key = self.middle.last_key_value().map(|(k, _)| k); - middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k)) - } - - /// Use this method instead of `self.high.insert` directly when possible. - /// - /// It only inserts into high cache if the key is smaller than the largest key in the high - /// cache. Otherwise, we simply ignore the row. We will wait until the high cache becomes - /// empty and fill it at that time. - fn insert_high_cache(&mut self, cache_key: CacheKey, row: CompactedRow, is_from_middle: bool) { - if !self.is_high_cache_full() { - if is_from_middle { - self.high.insert(cache_key, row); - return; - } - // For direct insert, we need to check if the key is smaller than the largest key - if let Some(high_last) = self.high.last_key_value() - && cache_key <= *high_last.0 - { - debug_assert!(cache_key != *high_last.0, "cache_key should be unique"); - self.high.insert(cache_key, row); - } + fn high_is_synced(&self) -> bool { + if !self.high.is_empty() { + true } else { - let high_last = self.high.last_entry().unwrap(); - if cache_key <= *high_last.key() { - debug_assert!(cache_key != *high_last.key(), "cache_key should be unique"); - high_last.remove_entry(); - self.high.insert(cache_key, row); - } + // check if table row count matches + self.table_row_count + .map(|n| n == self.len()) + .unwrap_or(false) } } + + fn last_cache_key_before_high(&self) -> Option<&CacheKey> { + let middle_last_key = self.middle.last_key_value().map(|(k, _)| k); + middle_last_key.or_else(|| { + self.low + .as_ref() + .and_then(Cache::last_key_value) + .map(|(k, _)| k) + }) + } } impl TopNCacheTrait for TopNCache { @@ -293,59 +297,85 @@ impl TopNCacheTrait for TopNCache { *row_count += 1; } - if !self.is_low_cache_full() { - self.low.insert(cache_key, (&row).into()); - return; - } - let elem_to_compare_with_middle = if let Some(low_last) = self.low.last_entry() - && cache_key <= *low_last.key() - { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, (&row).into()); - low_last - } else { - (cache_key, (&row).into()) + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); + res_rows.push(row); }; - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_compare_with_middle.0, - elem_to_compare_with_middle.1.clone(), - ); - res_ops.push(Op::Insert); - res_rows.push(elem_to_compare_with_middle.1); + let mut to_insert = (cache_key, (&row).into()); + let mut is_last_of_lower_cache = false; // for saving one key comparison + + let low_is_full = self.low_is_full(); + if let Some(low) = &mut self.low { + // try insert into low cache + + if !low_is_full { + low.insert(to_insert.0, to_insert.1); + return; + } + + // low cache is full + let low_last = low.last_entry().unwrap(); + if &to_insert.0 < low_last.key() { + // make space for the new entry + let low_last = low_last.remove_entry(); + low.insert(to_insert.0, to_insert.1); + to_insert = low_last; // move the last entry to the middle cache + is_last_of_lower_cache = true; + } + } + + // try insert into middle cache + + if !self.middle_is_full() { + self.middle.insert(to_insert.0, to_insert.1.clone()); + append_res(Op::Insert, to_insert.1); return; } - let mut is_from_middle = false; - let elem_to_compare_with_high = { - let middle_last = self.middle.last_entry().unwrap(); - if elem_to_compare_with_middle.0 <= *middle_last.key() { - // If the row in the range of [offset, offset+limit), the largest row in - // `cache.middle` needs to be moved to `cache.high` - let res = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(res.1.clone()); - res_ops.push(Op::Insert); - res_rows.push(elem_to_compare_with_middle.1.clone()); - self.middle - .insert(elem_to_compare_with_middle.0, elem_to_compare_with_middle.1); - is_from_middle = true; - res - } else { - elem_to_compare_with_middle + // middle cache is full + let middle_last = self.middle.last_entry().unwrap(); + if is_last_of_lower_cache || &to_insert.0 < middle_last.key() { + // make space for the new entry + let middle_last = middle_last.remove_entry(); + self.middle.insert(to_insert.0, to_insert.1.clone()); + + append_res(Op::Delete, middle_last.1.clone()); + append_res(Op::Insert, to_insert.1); + + to_insert = middle_last; // move the last entry to the high cache + is_last_of_lower_cache = true; + } + + // try insert into high cache + + // The logic is a bit different from the other two caches, because high cache is not + // guaranteed to be fully synced with the "high part" of the table. + + if is_last_of_lower_cache || self.high_is_synced() { + // For `is_last_of_lower_cache`, an obvious observation is that the key to insert is + // always smaller than any key in the high part of the table. + + if self.high.is_empty() { + // if high cache is empty, we can insert directly + self.high.insert(to_insert.0, to_insert.1); + return; } - }; - self.insert_high_cache( - elem_to_compare_with_high.0, - elem_to_compare_with_high.1, - is_from_middle, - ); + let high_is_full = self.high_is_full(); + let high_last = self.high.last_entry().unwrap(); + + if is_last_of_lower_cache || &to_insert.0 < high_last.key() { + // we can only insert if the key is smaller than the largest key in the high cache + if high_is_full { + // make space for the new entry + high_last.remove_entry(); + } + self.high.insert(to_insert.0, to_insert.1); + } + } } - #[allow(clippy::too_many_arguments)] async fn delete( &mut self, group_key: Option, @@ -365,66 +395,94 @@ impl TopNCacheTrait for TopNCache { *row_count -= 1; } - if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 { - // The row is in high + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); + res_rows.push(row); + }; + + if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 { + // the row is in high self.high.remove(&cache_key); - } else if self.is_low_cache_full() - && (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0) + } else if self.low_is_full() + && self + .low + .as_ref() + .map(|low| &cache_key > low.last_key_value().unwrap().0) + .unwrap_or( + true, // if low is None, `cache_key` should be in middle + ) { - // The row is in mid - self.middle.remove(&cache_key); - res_ops.push(Op::Delete); - res_rows.push((&row).into()); + // the row is in middle + let removed = self.middle.remove(&cache_key); + append_res(Op::Delete, (&row).into()); + + if removed.is_none() { + // the middle cache should always be synced, if the key is not found, then it also doesn't + // exist in the state table + consistency_error!( + ?group_key, + ?cache_key, + "cache key not found in middle cache" + ); + return Ok(()); + } - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // refill the high cache if it's not synced + if !self.high_is_synced() { + self.high.clear(); managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring one element, if any, from high cache to middle cache + // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); + append_res(Op::Insert, high_first.1.clone()); self.middle.insert(high_first.0, high_first.1); } assert!(self.high.is_empty() || self.middle.len() == self.limit); } else { - // The row is in low - self.low.remove(&cache_key); + // the row is in low + let low = self.low.as_mut().unwrap(); + let removed = low.remove(&cache_key); + + if removed.is_none() { + // the low cache should always be synced, if the key is not found, then it also doesn't + // exist in the state table + consistency_error!(?group_key, ?cache_key, "cache key not found in low cache"); + return Ok(()); + } - // Bring one element, if any, from middle cache to low cache + // bring one element, if any, from middle cache to low cache if !self.middle.is_empty() { let middle_first = self.middle.pop_first().unwrap(); - res_ops.push(Op::Delete); - res_rows.push(middle_first.1.clone()); - self.low.insert(middle_first.0, middle_first.1); + append_res(Op::Delete, middle_first.1.clone()); + low.insert(middle_first.0, middle_first.1); - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // fill the high cache if it's not synced + if !self.high_is_synced() { + self.high.clear(); managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring one element, if any, from high cache to middle cache + // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); + append_res(Op::Insert, high_first.1.clone()); self.middle.insert(high_first.0, high_first.1); } } @@ -447,32 +505,36 @@ impl TopNCacheTrait for TopNCache { } assert!( - self.low.is_empty(), - "Offset is not supported yet for WITH TIES, so low cache should be empty" + self.low.is_none(), + "Offset is not supported yet for WITH TIES, so low cache should be None" ); - let elem_to_compare_with_middle = (cache_key, row); + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); + res_rows.push(row); + }; - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_compare_with_middle.0.clone(), - (&elem_to_compare_with_middle.1).into(), - ); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); + let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into()); + + // try insert into middle cache + + if !self.middle_is_full() { + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + append_res(Op::Insert, to_insert.1); return; } - let sort_key = &elem_to_compare_with_middle.0 .0; - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = &middle_last.0 .0.clone(); + // middle cache is full - match sort_key.cmp(middle_last_order_by) { + let to_insert_sort_key = &(to_insert.0).0; + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); + + match to_insert_sort_key.cmp(&middle_last_sort_key) { Ordering::Less => { - // The row is in middle. - let num_ties = self + // the row is in middle + let n_ties_of_last = self .middle - .range((middle_last_order_by.clone(), vec![])..) + .range((middle_last_sort_key.clone(), vec![])..) .count(); // We evict the last row and its ties only if the number of remaining rows still is // still larger than limit, i.e., there are limit-1 other rows. @@ -481,51 +543,67 @@ impl TopNCacheTrait for TopNCache { // insert 0 -> [0,1,1,1,1] // insert 0 -> [0,0,1,1,1,1] // insert 0 -> [0,0,0] - if self.middle.len() - num_ties + 1 >= self.limit { + if self.middle.len() + 1 - n_ties_of_last >= self.limit { + // Middle will be full without the last element and its ties after insertion. + // Let's move the last element and its ties to high cache first. while let Some(middle_last) = self.middle.last_entry() - && middle_last.key().0 == middle_last_order_by.clone() + && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + append_res(Op::Delete, middle_last.1.clone()); + // we can blindly move entries from middle cache to high cache no matter high cache is synced or not self.high.insert(middle_last.0, middle_last.1); } } - if self.high.len() >= self.high_capacity { + if self.high.len() > self.high_cache_capacity { + // evict some entries from high cache if it exceeds the capacity let high_last = self.high.pop_last().unwrap(); - let high_last_order_by = high_last.0 .0; - self.high.retain(|k, _| k.0 != high_last_order_by); + let high_last_sort_key = (high_last.0).0; + // Remove all ties of the last element in high cache, for the sake of simplicity. + // This may cause repeatedly refill the high cache if number of ties is large. + self.high.retain(|k, _| k.0 != high_last_sort_key); } - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + append_res(Op::Insert, to_insert.1.clone()); + self.middle.insert(to_insert.0, to_insert.1); } Ordering::Equal => { - // The row is in middle and is a tie with the last row. - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + // the row is in middle and is a tie of the last row + append_res(Op::Insert, to_insert.1.clone()); + self.middle.insert(to_insert.0, to_insert.1); } Ordering::Greater => { - // The row is in high. - let elem_to_compare_with_high = elem_to_compare_with_middle; - self.insert_high_cache( - elem_to_compare_with_high.0, - elem_to_compare_with_high.1.into(), - false, - ); + // the row is in high + + if self.high_is_synced() { + // only insert into high cache if it is synced + + if self.high.is_empty() { + // if high cache is empty, we can insert directly + self.high.insert(to_insert.0, to_insert.1); + return; + } + + if to_insert_sort_key <= &self.high.last_key().unwrap().0 { + // We can only insert if the key is <= the largest key in the high cache. + // Note that we have all ties of the last element in the high cache, so we can + // safely compare only the sort key. + self.high.insert(to_insert.0, to_insert.1); + } + + if self.high.len() > self.high_cache_capacity { + // evict some entries from high cache if it exceeds the capacity + let high_last = self.high.pop_last().unwrap(); + let high_last_sort_key = (high_last.0).0; + // Remove all ties of the last element in high cache, for the sake of simplicity. + // This may cause repeatedly refill the high cache if number of ties is large. + self.high.retain(|k, _| k.0 != high_last_sort_key); + } + } } } } - #[allow(clippy::too_many_arguments)] async fn delete( &mut self, group_key: Option, @@ -544,56 +622,65 @@ impl TopNCacheTrait for TopNCache { *row_count -= 1; } - // Since low cache is always empty for WITH_TIES, this unwrap is safe. - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = middle_last.0 .0.clone(); + assert!( + self.low.is_none(), + "Offset is not supported yet for WITH TIES, so low cache should be None" + ); - let sort_key = cache_key.0.clone(); - if sort_key > middle_last_order_by { - // The row is in high. + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); + res_rows.push(row); + }; + + if self.middle.is_empty() { + consistency_error!( + ?group_key, + ?cache_key, + "middle cache is empty, but we receive a DELETE operation" + ); + append_res(Op::Delete, (&row).into()); + return Ok(()); + } + + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); + + let to_delete_sort_key = cache_key.0.clone(); + if to_delete_sort_key > middle_last_sort_key { + // the row is in high self.high.remove(&cache_key); } else { - // The row is in middle + // the row is in middle self.middle.remove(&cache_key); - res_ops.push(Op::Delete); - res_rows.push((&row).into()); + append_res(Op::Delete, (&row).into()); if self.middle.len() >= self.limit { - // This can happen when there are ties. + // this can happen when there are ties return Ok(()); } - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // refill the high cache if it's not synced + if !self.high_is_synced() { managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring elements with the same sort key, if any, from high cache to middle cache. + // bring the first element and its ties, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - let high_first_order_by = high_first.0 .0.clone(); - assert!(high_first_order_by > middle_last_order_by); + let high_first_sort_key = (high_first.0).0.clone(); + assert!(high_first_sort_key > middle_last_sort_key); - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); + append_res(Op::Insert, high_first.1.clone()); self.middle.insert(high_first.0, high_first.1); - // We need to trigger insert for all rows with prefix `high_first_order_by` - // in high cache. - for (ordered_pk_row, row) in self.high.extract_if(|k, _| k.0 == high_first_order_by) - { - if ordered_pk_row.0 != high_first_order_by { - break; - } - res_ops.push(Op::Insert); - res_rows.push(row.clone()); - self.middle.insert(ordered_pk_row, row); + for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) { + append_res(Op::Insert, row.clone()); + self.middle.insert(cache_key, row); } } } @@ -632,51 +719,56 @@ impl AppendOnlyTopNCacheTrait for TopNCache { managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { - if self.is_middle_cache_full() && &cache_key >= self.middle.last_key_value().unwrap().0 { + if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() { return Ok(()); } managed_state.insert(row_ref); - // Then insert input row to corresponding cache range according to its order key - if !self.is_low_cache_full() { - self.low.insert(cache_key, row_ref.into()); - return Ok(()); + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); + res_rows.push(row); + }; + + // insert input row into corresponding cache according to its sort key + let mut to_insert = (cache_key, row_ref.into()); + + let low_is_full = self.low_is_full(); + if let Some(low) = &mut self.low { + // try insert into low cache + + if !low_is_full { + low.insert(to_insert.0, to_insert.1); + return Ok(()); + } + + // low cache is full + let low_last = low.last_entry().unwrap(); + if &to_insert.0 < low_last.key() { + // make space for the new entry + let low_last = low_last.remove_entry(); + low.insert(to_insert.0, to_insert.1); + to_insert = low_last; // move the last entry to the middle cache + } } - let elem_to_insert_into_middle = if let Some(low_last) = self.low.last_entry() - && &cache_key <= low_last.key() - { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, row_ref.into()); - low_last - } else { - (cache_key, row_ref.into()) - }; + // try insert into middle cache - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_insert_into_middle.0, - elem_to_insert_into_middle.1.clone(), - ); - res_ops.push(Op::Insert); - res_rows.push(elem_to_insert_into_middle.1); + if !self.middle_is_full() { + self.middle.insert(to_insert.0, to_insert.1.clone()); + append_res(Op::Insert, to_insert.1); return Ok(()); } // The row must be in the range of [offset, offset+limit). // the largest row in `cache.middle` needs to be removed. let middle_last = self.middle.pop_last().unwrap(); - debug_assert!(elem_to_insert_into_middle.0 < middle_last.0); + debug_assert!(to_insert.0 < middle_last.0); - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + append_res(Op::Delete, middle_last.1.clone()); managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); - res_ops.push(Op::Insert); - res_rows.push(elem_to_insert_into_middle.1.clone()); - self.middle - .insert(elem_to_insert_into_middle.0, elem_to_insert_into_middle.1); + append_res(Op::Insert, to_insert.1.clone()); + self.middle.insert(to_insert.0, to_insert.1); // Unlike normal topN, append only topN does not use the high part of the cache. @@ -695,31 +787,38 @@ impl AppendOnlyTopNCacheTrait for TopNCache { row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { assert!( - self.low.is_empty(), + self.low.is_none(), "Offset is not supported yet for WITH TIES, so low cache should be empty" ); - let elem_to_compare_with_middle = (cache_key, row_ref); - - if !self.is_middle_cache_full() { - let row: CompactedRow = elem_to_compare_with_middle.1.into(); - managed_state.insert(elem_to_compare_with_middle.1); - self.middle - .insert(elem_to_compare_with_middle.0.clone(), row.clone()); - res_ops.push(Op::Insert); + + let mut append_res = |op: Op, row: CompactedRow| { + res_ops.push(op); res_rows.push(row); + }; + + let to_insert = (cache_key, row_ref); + + // try insert into middle cache + + if !self.middle_is_full() { + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + self.middle.insert(to_insert.0, row.clone()); + append_res(Op::Insert, row); return Ok(()); } - let sort_key = &elem_to_compare_with_middle.0 .0; - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = &middle_last.0 .0.clone(); + // middle cache is full + + let to_insert_sort_key = &(to_insert.0).0; + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); - match sort_key.cmp(middle_last_order_by) { + match to_insert_sort_key.cmp(&middle_last_sort_key) { Ordering::Less => { - // The row is in middle. - let num_ties = self + // the row is in middle + let n_ties_of_last = self .middle - .range((middle_last_order_by.clone(), vec![])..) + .range((middle_last_sort_key.clone(), vec![])..) .count(); // We evict the last row and its ties only if the number of remaining rows is // still larger than limit, i.e., there are limit-1 other rows. @@ -728,38 +827,34 @@ impl AppendOnlyTopNCacheTrait for TopNCache { // insert 0 -> [0,1,1,1,1] // insert 0 -> [0,0,1,1,1,1] // insert 0 -> [0,0,0] - if self.middle.len() - num_ties + 1 >= self.limit { + if self.middle.len() + 1 - n_ties_of_last >= self.limit { + // middle will be full without the last element and its ties after insertion while let Some(middle_last) = self.middle.last_entry() - && &middle_last.key().0 == middle_last_order_by + && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + append_res(Op::Delete, middle_last.1.clone()); + + // we don't need to maintain the high part so just delete it from state table managed_state .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); } } - managed_state.insert(elem_to_compare_with_middle.1); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + append_res(Op::Insert, row.clone()); + self.middle.insert(to_insert.0, row); } Ordering::Equal => { - // The row is in middle and is a tie with the last row. - managed_state.insert(elem_to_compare_with_middle.1); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + // the row is in middle and is a tie of the last row + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + append_res(Op::Insert, row.clone()); + self.middle.insert(to_insert.0, row); } Ordering::Greater => { - // The row is in high. Do nothing. + // the row is in high, do nothing } } diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 63b9ca94961f8..5c9370a9f35db 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -68,7 +68,7 @@ impl TopNExecutor { let mut inner = InnerTopNExecutor::new(schema, storage_key, offset_and_limit, order_by, state_table)?; - inner.cache.high_capacity = 2; + inner.cache.high_cache_capacity = 2; Ok(TopNExecutorWrapper { input, ctx, inner }) } diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 7cce8ecb4c6ac..dd03fed93ce99 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -27,6 +27,7 @@ use super::top_n_cache::CacheKey; use super::{serialize_pk_to_cache_key, CacheKeySerde, GroupKey, TopNCache}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorResult; +use crate::executor::top_n::top_n_cache::Cache; /// * For TopN, the storage key is: `[ order_by + remaining columns of pk ]` /// * For group TopN, the storage key is: `[ group_key + order_by + remaining columns of pk ]` @@ -144,8 +145,10 @@ impl ManagedTopNState { start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { - let cache = &mut topn_cache.high; + let high_cache = &mut topn_cache.high; + assert!(high_cache.is_empty()); + // TODO(rc): iterate from `start_key` let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table @@ -172,13 +175,13 @@ impl ManagedTopNState { { continue; } - cache.insert(topn_row.cache_key, (&topn_row.row).into()); - if cache.len() == cache_size_limit { + high_cache.insert(topn_row.cache_key, (&topn_row.row).into()); + if high_cache.len() == cache_size_limit { break; } } - if WITH_TIES && topn_cache.is_high_cache_full() { + if WITH_TIES && topn_cache.high_is_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -207,9 +210,10 @@ impl ManagedTopNState { group_key: Option, topn_cache: &mut TopNCache, ) -> StreamExecutorResult<()> { - assert!(topn_cache.low.is_empty()); + assert!(topn_cache.low.as_ref().map(Cache::is_empty).unwrap_or(true)); assert!(topn_cache.middle.is_empty()); assert!(topn_cache.high.is_empty()); + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table @@ -226,14 +230,12 @@ impl ManagedTopNState { let mut group_row_count = 0; - if topn_cache.offset > 0 { + if let Some(low) = &mut topn_cache.low { while let Some(item) = state_table_iter.next().await { group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); - topn_cache - .low - .insert(topn_row.cache_key, (&topn_row.row).into()); - if topn_cache.low.len() == topn_cache.offset { + low.insert(topn_row.cache_key, (&topn_row.row).into()); + if low.len() == topn_cache.offset { break; } } @@ -250,7 +252,7 @@ impl ManagedTopNState { break; } } - if WITH_TIES && topn_cache.is_middle_cache_full() { + if WITH_TIES && topn_cache.middle_is_full() { let middle_last_sort_key = topn_cache.middle.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -269,10 +271,10 @@ impl ManagedTopNState { } assert!( - topn_cache.high_capacity > 0, + topn_cache.high_cache_capacity > 0, "topn cache high_capacity should always > 0" ); - while !topn_cache.is_high_cache_full() + while !topn_cache.high_is_full() && let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -281,7 +283,7 @@ impl ManagedTopNState { .high .insert(topn_row.cache_key, (&topn_row.row).into()); } - if WITH_TIES && topn_cache.is_high_cache_full() { + if WITH_TIES && topn_cache.high_is_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1;