From 3195ee53517b0eddc6ccfbf0b7ee9295fbf295a6 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 21 Nov 2024 15:52:59 +0800 Subject: [PATCH] perf(topn): compact the (Group)TopN output to avoid amplification (#19451) Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/group_top_n.rs | 294 +++++++----- .../executor/top_n/group_top_n_appendonly.rs | 32 +- .../src/executor/top_n/top_n_appendonly.rs | 91 ++-- src/stream/src/executor/top_n/top_n_cache.rs | 241 +++++----- src/stream/src/executor/top_n/top_n_plain.rs | 418 +++++++----------- src/stream/src/executor/top_n/top_n_state.rs | 7 +- src/stream/src/executor/top_n/utils.rs | 39 +- 7 files changed, 555 insertions(+), 567 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 9457e2f7729c6..c061efd36b75b 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; -use risingwave_common::row::RowExt; +use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; @@ -157,14 +158,18 @@ impl TopNExecutorBase where TopNCache: TopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.limit); - let mut res_rows = Vec::with_capacity(self.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { let keys = K::build_many(&self.group_by, chunk.data_chunk()); + let mut stagings = HashMap::new(); // K -> `TopNStaging` + for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; }; + // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); @@ -184,12 +189,13 @@ where } let mut cache = self.caches.get_mut(group_cache_key).unwrap(); + let staging = stagings.entry(group_cache_key.clone()).or_default(); // apply the chunk to state table match op { Op::Insert | Op::UpdateInsert => { self.managed_state.insert(row_ref); - cache.insert(cache_key, row_ref, &mut res_ops, &mut res_rows); + cache.insert(cache_key, row_ref, staging); } Op::Delete | Op::UpdateDelete => { @@ -200,17 +206,27 @@ where &mut self.managed_state, cache_key, row_ref, - &mut res_ops, - &mut res_rows, + staging, ) .await?; } } } + self.metrics .group_top_n_cached_entry_count .set(self.caches.len() as i64); - generate_output(res_rows, res_ops, &self.schema) + + let data_types = self.schema.data_types(); + let deserializer = RowDeserializer::new(data_types.clone()); + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity())); + for staging in stagings.into_values() { + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -252,7 +268,6 @@ where mod tests { use std::sync::atomic::AtomicU64; - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::Field; use risingwave_common::hash::SerializedKey; @@ -262,7 +277,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; fn create_schema() -> Schema { Schema { @@ -359,7 +374,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -371,14 +386,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -386,58 +400,50 @@ mod tests { + 9 1 1 + 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 ", - ), + ) + .sort_rows(), ); } @@ -455,7 +461,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -467,66 +473,57 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 8 8 2 + 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 8 2 - 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 3 1 2 ", - ), + ) + .sort_rows(), ); } @@ -544,7 +541,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -556,14 +553,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -571,56 +567,148 @@ mod tests { + 9 1 1 + 10 1 1 + 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 + 3 1 2 + 4 1 3", - ), + ) + .sort_rows(), ); } + + #[tokio::test] + async fn test_compact_changes() { + let schema = create_schema(); + let source = MockSource::with_messages(vec![ + Message::Barrier(Barrier::new_test_barrier(test_epoch(1))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 9 + + 0 0 8 + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 8 + + 0 0 4 + + 0 0 3 + + 0 1 12 + + 0 2 26 + - 0 1 12 + + 0 1 11", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 11", // this should result in no chunk output + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + ]) + .into_executor(schema.clone(), vec![2]); + + let state_table = create_in_memory_state_table( + &schema.data_types(), + &[ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + &[0, 1, 2], // table pk = group key (0, 1) + order key (2) + additional pk (empty) + ) + .await; + + let top_n = GroupTopNExecutor::::new( + source, + ActorContext::for_test(0), + schema, + vec![ + ColumnOrder::new(0, OrderType::ascending()), + ColumnOrder::new(1, OrderType::ascending()), + ColumnOrder::new(2, OrderType::ascending()), + ], + (0, 2), // (offset, limit) + vec![ColumnOrder::new(2, OrderType::ascending())], + vec![0, 1], + state_table, + Arc::new(AtomicU64::new(0)), + ) + .unwrap(); + let mut top_n = top_n.boxed().execute(); + + // initial barrier + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 7 + + 0 0 4 + + 0 0 3 + - 0 1 15 + + 0 1 11 + + 0 2 26", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + // no output chunk for the last input chunk + top_n.expect_barrier().await; + } } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index b8e4094486624..f6af3b6674929 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; @@ -137,17 +139,20 @@ impl TopNExecutorBase where TopNCache: AppendOnlyTopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.limit); - let mut res_rows = Vec::with_capacity(self.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { let keys = K::build_many(&self.group_by, chunk.data_chunk()); + let mut stagings = HashMap::new(); // K -> `TopNStaging` let data_types = self.schema.data_types(); - let row_deserializer = RowDeserializer::new(data_types.clone()); + let deserializer = RowDeserializer::new(data_types.clone()); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; }; + // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); @@ -164,22 +169,33 @@ where .await?; self.caches.push(group_cache_key.clone(), topn_cache); } + let mut cache = self.caches.get_mut(group_cache_key).unwrap(); + let staging = stagings.entry(group_cache_key.clone()).or_default(); debug_assert_eq!(op, Op::Insert); cache.insert( cache_key, row_ref, - &mut res_ops, - &mut res_rows, + staging, &mut self.managed_state, - &row_deserializer, + &deserializer, )?; } + self.metrics .group_top_n_cached_entry_count .set(self.caches.len() as i64); - generate_output(res_rows, res_ops, &self.schema) + + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity())); + for staging in stagings.into_values() { + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + } + + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 1b45c82c9c83a..be76101756454 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -17,7 +17,7 @@ use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; -use super::top_n_cache::AppendOnlyTopNCacheTrait; +use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging}; use super::utils::*; use super::{ManagedTopNState, TopNCache}; use crate::executor::prelude::*; @@ -104,11 +104,13 @@ impl TopNExecutorBase where TopNCache: AppendOnlyTopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.cache.limit); - let mut res_rows = Vec::with_capacity(self.cache.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { + let mut staging = TopNStaging::new(); let data_types = self.schema.data_types(); - let row_deserializer = RowDeserializer::new(data_types); + let deserializer = RowDeserializer::new(data_types.clone()); // apply the chunk to state table for (op, row_ref) in chunk.rows() { debug_assert_eq!(op, Op::Insert); @@ -117,14 +119,21 @@ where self.cache.insert( cache_key, row_ref, - &mut res_ops, - &mut res_rows, + &mut staging, &mut self.managed_state, - &row_deserializer, + &deserializer, )?; } - generate_output(res_rows, res_ops, &self.schema) + if staging.is_empty() { + return Ok(None); + } + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len())); + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -151,8 +160,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; - use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{Field, Schema}; @@ -162,7 +169,7 @@ mod tests { use super::AppendOnlyTopNExecutor; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, PkIndices}; fn create_stream_chunks() -> Vec { @@ -241,7 +248,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -251,54 +258,43 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 + 9 4 - - 10 3 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3, 8, 9) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 - + 7 6 - 8 5 + 3 7 - - 7 6 + 1 8" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2, 3, 3) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 3 7 @@ -306,6 +302,7 @@ mod tests { - 3 2 + 1 13" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1, 1, 2) @@ -322,7 +319,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -332,30 +329,26 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3) -> (8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 7 6 @@ -364,17 +357,14 @@ mod tests { - 9 4 + 3 2" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2) -> (3, 3, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -384,6 +374,7 @@ mod tests { - 3 7 + 2 14" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1) -> (1, 2, 2, 3) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index d8211b4ad076c..c7536bf98bbb9 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::BTreeMap; use std::fmt::Debug; use std::future::Future; use itertools::Itertools; use risingwave_common::array::{Op, RowRef}; -use risingwave_common::row::{CompactedRow, Row, RowDeserializer, RowExt}; +use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt}; use risingwave_common::types::DataType; use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_common_estimate_size::EstimateSize; @@ -149,14 +150,7 @@ pub trait TopNCacheTrait { /// /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be /// used to generate messages to be sent to downstream operators. - #[allow(clippy::too_many_arguments)] - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ); + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging); /// Delete input row from the cache. /// @@ -166,15 +160,13 @@ pub trait TopNCacheTrait { /// Because we may need to refill data from the state table to `self.high` during the delete /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix /// scan of the state table. - #[allow(clippy::too_many_arguments)] fn delete( &mut self, group_key: Option, managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> impl Future> + Send; } @@ -286,22 +278,11 @@ impl TopNCache { } impl TopNCacheTrait for TopNCache { - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ) { + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) { if let Some(row_count) = self.table_row_count.as_mut() { *row_count += 1; } - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - let mut to_insert = (cache_key, (&row).into()); let mut is_last_of_lower_cache = false; // for saving one key comparison @@ -328,8 +309,8 @@ impl TopNCacheTrait for TopNCache { // try insert into middle cache if !self.middle_is_full() { - self.middle.insert(to_insert.0, to_insert.1.clone()); - append_res(Op::Insert, to_insert.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); return; } @@ -338,10 +319,10 @@ impl TopNCacheTrait for TopNCache { if is_last_of_lower_cache || &to_insert.0 < middle_last.key() { // make space for the new entry let middle_last = middle_last.remove_entry(); - self.middle.insert(to_insert.0, to_insert.1.clone()); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); - append_res(Op::Delete, middle_last.1.clone()); - append_res(Op::Insert, to_insert.1); + staging.delete(middle_last.0.clone(), middle_last.1.clone()); + staging.insert(to_insert.0, to_insert.1); to_insert = middle_last; // move the last entry to the high cache is_last_of_lower_cache = true; @@ -382,8 +363,7 @@ impl TopNCacheTrait for TopNCache { managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> StreamExecutorResult<()> { if !enable_strict_consistency() && self.table_row_count == Some(0) { // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we @@ -395,11 +375,6 @@ impl TopNCacheTrait for TopNCache { *row_count -= 1; } - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 { // the row is in high self.high.remove(&cache_key); @@ -414,7 +389,7 @@ impl TopNCacheTrait for TopNCache { { // the row is in middle let removed = self.middle.remove(&cache_key); - append_res(Op::Delete, (&row).into()); + staging.delete(cache_key.clone(), (&row).into()); if removed.is_none() { // the middle cache should always be synced, if the key is not found, then it also doesn't @@ -443,8 +418,9 @@ impl TopNCacheTrait for TopNCache { // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - append_res(Op::Insert, high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); } assert!(self.high.is_empty() || self.middle.len() == self.limit); @@ -463,7 +439,7 @@ impl TopNCacheTrait for TopNCache { // bring one element, if any, from middle cache to low cache if !self.middle.is_empty() { let middle_first = self.middle.pop_first().unwrap(); - append_res(Op::Delete, middle_first.1.clone()); + staging.delete(middle_first.0.clone(), middle_first.1.clone()); low.insert(middle_first.0, middle_first.1); // fill the high cache if it's not synced @@ -482,8 +458,9 @@ impl TopNCacheTrait for TopNCache { // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - append_res(Op::Insert, high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); } } } @@ -493,13 +470,7 @@ impl TopNCacheTrait for TopNCache { } impl TopNCacheTrait for TopNCache { - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ) { + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) { if let Some(row_count) = self.table_row_count.as_mut() { *row_count += 1; } @@ -509,18 +480,13 @@ impl TopNCacheTrait for TopNCache { "Offset is not supported yet for WITH TIES, so low cache should be None" ); - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into()); // try insert into middle cache if !self.middle_is_full() { self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); - append_res(Op::Insert, to_insert.1); + staging.insert(to_insert.0.clone(), to_insert.1); return; } @@ -550,7 +516,7 @@ impl TopNCacheTrait for TopNCache { && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - append_res(Op::Delete, middle_last.1.clone()); + staging.delete(middle_last.0.clone(), middle_last.1.clone()); // we can blindly move entries from middle cache to high cache no matter high cache is synced or not self.high.insert(middle_last.0, middle_last.1); } @@ -564,13 +530,13 @@ impl TopNCacheTrait for TopNCache { self.high.retain(|k, _| k.0 != high_last_sort_key); } - append_res(Op::Insert, to_insert.1.clone()); - self.middle.insert(to_insert.0, to_insert.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); } Ordering::Equal => { // the row is in middle and is a tie of the last row - append_res(Op::Insert, to_insert.1.clone()); - self.middle.insert(to_insert.0, to_insert.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); } Ordering::Greater => { // the row is in high @@ -610,8 +576,7 @@ impl TopNCacheTrait for TopNCache { managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> StreamExecutorResult<()> { if !enable_strict_consistency() && self.table_row_count == Some(0) { // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we @@ -627,18 +592,13 @@ impl TopNCacheTrait for TopNCache { "Offset is not supported yet for WITH TIES, so low cache should be None" ); - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - if self.middle.is_empty() { consistency_error!( ?group_key, ?cache_key, "middle cache is empty, but we receive a DELETE operation" ); - append_res(Op::Delete, (&row).into()); + staging.delete(cache_key, (&row).into()); return Ok(()); } @@ -651,7 +611,7 @@ impl TopNCacheTrait for TopNCache { } else { // the row is in middle self.middle.remove(&cache_key); - append_res(Op::Delete, (&row).into()); + staging.delete(cache_key.clone(), (&row).into()); if self.middle.len() >= self.limit { // this can happen when there are ties return Ok(()); @@ -675,12 +635,13 @@ impl TopNCacheTrait for TopNCache { let high_first_sort_key = (high_first.0).0.clone(); assert!(high_first_sort_key > middle_last_sort_key); - append_res(Op::Insert, high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) { - append_res(Op::Insert, row.clone()); - self.middle.insert(cache_key, row); + self.middle.insert(cache_key.clone(), row.clone()); + staging.insert(cache_key, row); } } } @@ -702,8 +663,7 @@ pub trait AppendOnlyTopNCacheTrait { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()>; @@ -714,8 +674,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { @@ -724,11 +683,6 @@ impl AppendOnlyTopNCacheTrait for TopNCache { } managed_state.insert(row_ref); - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - // insert input row into corresponding cache according to its sort key let mut to_insert = (cache_key, row_ref.into()); @@ -754,8 +708,8 @@ impl AppendOnlyTopNCacheTrait for TopNCache { // try insert into middle cache if !self.middle_is_full() { - self.middle.insert(to_insert.0, to_insert.1.clone()); - append_res(Op::Insert, to_insert.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); return Ok(()); } @@ -763,12 +717,11 @@ impl AppendOnlyTopNCacheTrait for TopNCache { // the largest row in `cache.middle` needs to be removed. let middle_last = self.middle.pop_last().unwrap(); debug_assert!(to_insert.0 < middle_last.0); - - append_res(Op::Delete, middle_last.1.clone()); managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); + staging.delete(middle_last.0, middle_last.1); - append_res(Op::Insert, to_insert.1.clone()); - self.middle.insert(to_insert.0, to_insert.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); // Unlike normal topN, append only topN does not use the high part of the cache. @@ -781,8 +734,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { @@ -791,11 +743,6 @@ impl AppendOnlyTopNCacheTrait for TopNCache { "Offset is not supported yet for WITH TIES, so low cache should be empty" ); - let mut append_res = |op: Op, row: CompactedRow| { - res_ops.push(op); - res_rows.push(row); - }; - let to_insert = (cache_key, row_ref); // try insert into middle cache @@ -803,8 +750,8 @@ impl AppendOnlyTopNCacheTrait for TopNCache { if !self.middle_is_full() { managed_state.insert(to_insert.1); let row: CompactedRow = to_insert.1.into(); - self.middle.insert(to_insert.0, row.clone()); - append_res(Op::Insert, row); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); return Ok(()); } @@ -833,25 +780,24 @@ impl AppendOnlyTopNCacheTrait for TopNCache { && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - append_res(Op::Delete, middle_last.1.clone()); - // we don't need to maintain the high part so just delete it from state table managed_state .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); + staging.delete(middle_last.0, middle_last.1); } } managed_state.insert(to_insert.1); let row: CompactedRow = to_insert.1.into(); - append_res(Op::Insert, row.clone()); - self.middle.insert(to_insert.0, row); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); } Ordering::Equal => { // the row is in middle and is a tie of the last row managed_state.insert(to_insert.1); let row: CompactedRow = to_insert.1.into(); - append_res(Op::Insert, row.clone()); - self.middle.insert(to_insert.0, row); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); } Ordering::Greater => { // the row is in high, do nothing @@ -861,3 +807,92 @@ impl AppendOnlyTopNCacheTrait for TopNCache { Ok(()) } } + +/// Used to build diff between before and after applying an input chunk, for `TopNCache` (of one group). +/// It should be maintained when an entry is inserted or deleted from the `middle` cache. +#[derive(Debug, Default)] +pub struct TopNStaging { + to_delete: BTreeMap, + to_insert: BTreeMap, + to_update: BTreeMap, +} + +impl TopNStaging { + pub fn new() -> Self { + Self::default() + } + + /// Insert a row into the staging changes. This method must be called when a row is + /// added to the `middle` cache. + fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) { + if let Some(old_row) = self.to_delete.remove(&cache_key) { + if old_row != row { + self.to_update.insert(cache_key, (old_row, row)); + } + } else { + self.to_insert.insert(cache_key, row); + } + } + + /// Delete a row from the staging changes. This method must be called when a row is + /// removed from the `middle` cache. + fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) { + if self.to_insert.remove(&cache_key).is_some() { + // do nothing more + } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) { + self.to_delete.insert(cache_key, old_row); + } else { + self.to_delete.insert(cache_key, row); + } + } + + /// Get the count of effective changes in the staging. + pub fn len(&self) -> usize { + self.to_delete.len() + self.to_insert.len() + self.to_update.len() + } + + /// Check if the staging is empty. + pub fn is_empty(&self) -> bool { + self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty() + } + + /// Iterate over the changes in the staging. + pub fn into_changes(self) -> impl Iterator { + #[cfg(debug_assertions)] + { + let keys = self + .to_delete + .keys() + .chain(self.to_insert.keys()) + .chain(self.to_update.keys()) + .unique() + .count(); + assert_eq!( + keys, + self.to_delete.len() + self.to_insert.len() + self.to_update.len(), + "should not have duplicate keys with different operations", + ); + } + + // We expect one `CacheKey` to appear at most once in the staging, and, the order of + // the outputs of `TopN` doesn't really matter, so we can simply chain the three maps. + // Although the output order is not important, we still ensure that `Delete`s are emitted + // before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint. + self.to_update + .into_values() + .flat_map(|(old_row, new_row)| { + [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)] + }) + .chain(self.to_delete.into_values().map(|row| (Op::Delete, row))) + .chain(self.to_insert.into_values().map(|row| (Op::Insert, row))) + } + + /// Iterate over the changes in the staging, and deserialize the rows. + pub fn into_deserialized_changes( + self, + deserializer: &RowDeserializer, + ) -> impl Iterator> + '_ { + self.into_changes() + .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?))) + } +} diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 5c9370a9f35db..7dedb31e0f330 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -13,10 +13,11 @@ // limitations under the License. use risingwave_common::array::Op; -use risingwave_common::row::RowExt; +use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; +use super::top_n_cache::TopNStaging; use super::utils::*; use super::{ManagedTopNState, TopNCache, TopNCacheTrait}; use crate::executor::prelude::*; @@ -126,9 +127,11 @@ impl TopNExecutorBase for InnerTopNExecuto where TopNCache: TopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.cache.limit); - let mut res_rows = Vec::with_capacity(self.cache.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { + let mut staging = TopNStaging::new(); // apply the chunk to state table for (op, row_ref) in chunk.rows() { @@ -138,8 +141,7 @@ where Op::Insert | Op::UpdateInsert => { // First insert input row to state store self.managed_state.insert(row_ref); - self.cache - .insert(cache_key, row_ref, &mut res_ops, &mut res_rows) + self.cache.insert(cache_key, row_ref, &mut staging) } Op::Delete | Op::UpdateDelete => { @@ -151,14 +153,24 @@ where &mut self.managed_state, cache_key, row_ref, - &mut res_ops, - &mut res_rows, + &mut staging, ) .await? } } } - generate_output(res_rows, res_ops, &self.schema) + + let data_types = self.schema.data_types(); + let deserializer = RowDeserializer::new(data_types.clone()); + if staging.is_empty() { + return Ok(None); + } + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len())); + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -184,7 +196,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -200,6 +211,7 @@ mod tests { use risingwave_common::util::epoch::test_epoch; use super::*; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_stream_chunks() -> Vec { let chunk1 = StreamChunk::from_pretty( @@ -287,7 +299,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -297,49 +309,38 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); // (8, 9, 10, 11, 12, 13, 14) assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 8 5 @@ -347,29 +348,24 @@ mod tests { + 13 11 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; // (10, 12, 13, 14) - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 - 9 4 - 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -382,7 +378,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -392,76 +388,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -469,13 +447,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } // Should have the same result as above, since there are no duplicate sort keys. @@ -489,7 +465,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, true>::new( + let top_n = TopNExecutor::<_, true>::new( source, ActorContext::for_test(0), schema, @@ -499,76 +475,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -576,13 +534,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -595,7 +551,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -605,60 +561,46 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 8 5" + + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -668,12 +610,10 @@ mod tests { - 11 8 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -684,6 +624,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source_new() -> Executor { let mut chunks = vec![ StreamChunk::from_pretty( @@ -812,7 +753,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -822,55 +763,42 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); - - let res = top_n_executor.next().await.unwrap().unwrap(); - assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -890,7 +818,7 @@ mod tests { .await; let source = create_source_new_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -900,33 +828,22 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); - - let res = top_n_executor.next().await.unwrap().unwrap(); - assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[ @@ -944,7 +861,7 @@ mod tests { // recovery let source = create_source_new_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::<_, false>::new( + let top_n_after_recovery = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -954,41 +871,33 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -999,6 +908,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source() -> Executor { let mut chunks = vec![ @@ -1070,7 +980,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1080,64 +990,56 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } fn create_source_before_recovery() -> Executor { @@ -1149,8 +1051,7 @@ mod tests { + 3 2 + 10 3 + 9 4 - + 8 5 - ", + + 8 5", ), StreamChunk::from_pretty( " I I @@ -1214,7 +1115,7 @@ mod tests { .await; let source = create_source_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1224,41 +1125,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[DataType::Int64, DataType::Int64], @@ -1271,7 +1165,7 @@ mod tests { // recovery let source = create_source_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::new_with_ties_for_test( + let top_n_after_recovery = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1281,42 +1175,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); - println!("hello"); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } } diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index dd03fed93ce99..e75f40bad81cc 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -327,7 +327,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::top_n::top_n_cache::TopNCacheTrait; + use crate::executor::top_n::top_n_cache::{TopNCacheTrait, TopNStaging}; use crate::executor::top_n::{create_cache_key_serde, NO_GROUP_KEY}; use crate::row_nonnull; @@ -490,15 +490,14 @@ mod tests { let row1_bytes = serialize_pk_to_cache_key(row1.clone(), &cache_key_serde); let mut cache = TopNCache::::new(0, 1, data_types); - cache.insert(row1_bytes.clone(), row1.clone(), &mut vec![], &mut vec![]); + cache.insert(row1_bytes.clone(), row1.clone(), &mut TopNStaging::new()); cache .delete( NO_GROUP_KEY, &mut managed_state, row1_bytes, row1, - &mut vec![], - &mut vec![], + &mut TopNStaging::new(), ) .await .unwrap(); diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index f3adebdc8000b..930ed93d71f6e 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -14,11 +14,7 @@ use std::future::Future; -use itertools::Itertools; -use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::row::{CompactedRow, RowDeserializer}; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::ColumnOrder; @@ -28,10 +24,12 @@ use crate::executor::prelude::*; pub trait TopNExecutorBase: Send + 'static { /// Apply the chunk to the dirty state and get the diffs. + /// TODO(rc): There can be a 2 times amplification in terms of the chunk size, so we may need to + /// allow `apply_chunk` return a stream of chunks. Motivation is not quite strong though. fn apply_chunk( &mut self, chunk: StreamChunk, - ) -> impl Future> + Send; + ) -> impl Future>> + Send; /// Flush the buffered chunk to the storage backend. fn flush_data( @@ -102,7 +100,9 @@ where } } Message::Chunk(chunk) => { - yield Message::Chunk(self.inner.apply_chunk(chunk).await?); + if let Some(output_chunk) = self.inner.apply_chunk(chunk).await? { + yield Message::Chunk(output_chunk); + } self.inner.try_flush_data().await?; } Message::Barrier(barrier) => { @@ -120,33 +120,6 @@ where } } -pub fn generate_output( - new_rows: Vec, - new_ops: Vec, - schema: &Schema, -) -> StreamExecutorResult { - if !new_rows.is_empty() { - let mut data_chunk_builder = DataChunkBuilder::new(schema.data_types(), new_rows.len() + 1); - let row_deserializer = RowDeserializer::new(schema.data_types()); - for compacted_row in new_rows { - let res = data_chunk_builder - .append_one_row(row_deserializer.deserialize(compacted_row.row.as_ref())?); - debug_assert!(res.is_none()); - } - // since `new_rows` is not empty, we unwrap directly - let new_data_chunk = data_chunk_builder.consume_all().unwrap(); - let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec()); - Ok(new_stream_chunk) - } else { - let columns = schema - .create_array_builders(0) - .into_iter() - .map(|x| x.finish().into()) - .collect_vec(); - Ok(StreamChunk::new(vec![], columns)) - } -} - /// For a given pk (Row), it can be split into `order_key` and `additional_pk` according to /// `order_by_len`, and the two split parts are serialized separately. pub fn serialize_pk_to_cache_key(pk: impl Row, cache_key_serde: &CacheKeySerde) -> CacheKey {