From 75395226bdae224902a09827dbe6c04910e70513 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 7 Sep 2023 09:18:29 +0000 Subject: [PATCH] fix(eowc): fix SortBuffer when there are multiple records having same timestamp (#12146) (#12149) Co-authored-by: Richard Chien --- e2e_test/streaming/watermark.slt | 4 +++- src/stream/src/executor/sort_buffer.rs | 19 +++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index d177d258b7c0c..66d3a360fcb70 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -12,7 +12,7 @@ statement ok create materialized view mv as select * from t emit on window close; statement ok -insert into t values ('2023-05-06 16:51:00', 1); +insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3); statement ok insert into t values ('2023-05-06 16:56:01', 1); @@ -25,6 +25,8 @@ query TI select * from mv; ---- 2023-05-06 16:51:00 1 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 statement ok drop materialized view mv; diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index 091b0ee8f085f..2069de09ac159 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -141,19 +141,19 @@ impl SortBuffer { watermark: ScalarImpl, buffer_table: &'a mut StateTable, ) { - let mut last_timestamp = None; + let mut last_table_pk = None; loop { if !self.cache.is_synced() { // Refill the cache, then consume from the cache, to ensure strong row ordering // and prefetch for the next watermark. - self.refill_cache(last_timestamp.take(), buffer_table) + self.refill_cache(last_table_pk.take(), buffer_table) .await?; } #[for_await] for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) { - let ((timestamp_val, _), row) = res?; - last_timestamp = Some(timestamp_val.into_inner()); + let row = res?; + last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row()); yield row; } @@ -169,7 +169,7 @@ impl SortBuffer { buffer_table.update_watermark(watermark, true); } - #[try_stream(ok = (CacheKey, OwnedRow), error = StreamExecutorError)] + #[try_stream(ok = OwnedRow, error = StreamExecutorError)] async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) { while self.cache.is_synced() { let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else { @@ -177,7 +177,7 @@ impl SortBuffer { }; if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() { let row = self.cache.delete(&key).unwrap(); - yield (key, row); + yield row; } else { break; } @@ -187,15 +187,14 @@ impl SortBuffer { /// Clear the cache and refill it with the current content of the buffer table. pub async fn refill_cache( &mut self, - last_timestamp: Option, + last_table_pk: Option, buffer_table: &StateTable, ) -> StreamExecutorResult<()> { let mut filler = self.cache.begin_syncing(); let pk_range = ( - last_timestamp - .as_ref() - .map(|v| Bound::Excluded([Some(v.as_scalar_ref_impl())])) + last_table_pk + .map(Bound::Excluded) .unwrap_or(Bound::Unbounded), Bound::::Unbounded, );