Skip to content

Commit

Permalink
fix(eowc): fix SortBuffer when there are multiple records having same…
Browse files Browse the repository at this point in the history
… timestamp (#12146)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Sep 7, 2023
1 parent 183a98b commit f1672f7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
4 changes: 3 additions & 1 deletion e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ statement ok
create materialized view mv as select * from t emit on window close;

statement ok
insert into t values ('2023-05-06 16:51:00', 1);
insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3);

statement ok
insert into t values ('2023-05-06 16:56:01', 1);
Expand All @@ -25,6 +25,8 @@ query TI
select * from mv;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3

statement ok
drop materialized view mv;
Expand Down
19 changes: 9 additions & 10 deletions src/stream/src/executor/sort_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,19 @@ impl<S: StateStore> SortBuffer<S> {
watermark: ScalarImpl,
buffer_table: &'a mut StateTable<S>,
) {
let mut last_timestamp = None;
let mut last_table_pk = None;
loop {
if !self.cache.is_synced() {
// Refill the cache, then consume from the cache, to ensure strong row ordering
// and prefetch for the next watermark.
self.refill_cache(last_timestamp.take(), buffer_table)
self.refill_cache(last_table_pk.take(), buffer_table)
.await?;
}

#[for_await]
for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) {
let ((timestamp_val, _), row) = res?;
last_timestamp = Some(timestamp_val.into_inner());
let row = res?;
last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row());
yield row;
}

Expand All @@ -169,15 +169,15 @@ impl<S: StateStore> SortBuffer<S> {
buffer_table.update_watermark(watermark, true);
}

#[try_stream(ok = (CacheKey, OwnedRow), error = StreamExecutorError)]
#[try_stream(ok = OwnedRow, error = StreamExecutorError)]
async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) {
while self.cache.is_synced() {
let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else {
break;
};
if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() {
let row = self.cache.delete(&key).unwrap();
yield (key, row);
yield row;
} else {
break;
}
Expand All @@ -187,15 +187,14 @@ impl<S: StateStore> SortBuffer<S> {
/// Clear the cache and refill it with the current content of the buffer table.
pub async fn refill_cache(
&mut self,
last_timestamp: Option<ScalarImpl>,
last_table_pk: Option<OwnedRow>,
buffer_table: &StateTable<S>,
) -> StreamExecutorResult<()> {
let mut filler = self.cache.begin_syncing();

let pk_range = (
last_timestamp
.as_ref()
.map(|v| Bound::Excluded([Some(v.as_scalar_ref_impl())]))
last_table_pk
.map(Bound::Excluded)
.unwrap_or(Bound::Unbounded),
Bound::<row::Empty>::Unbounded,
);
Expand Down

0 comments on commit f1672f7

Please sign in to comment.