Skip to content

Commit

Permalink
fix(over window): don't expect stream keys in UpdateDelete and Update…
Browse files Browse the repository at this point in the history
…Insert the same (#12536)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored and stdrc committed Sep 26, 2023
1 parent b511d31 commit f0a9133
Showing 1 changed file with 9 additions and 32 deletions.
41 changes: 9 additions & 32 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{RowRef, StreamChunk};
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Field;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
Expand Down Expand Up @@ -225,26 +225,25 @@ impl<S: StateStore> OverWindowExecutor<S> {
chunk: &'a StreamChunk,
) -> impl Iterator<Item = Record<RowRef<'a>>> {
let mut changes_merged = BTreeMap::new();
for record in chunk.records() {
match record {
Record::Insert { new_row } => {
let pk = DefaultOrdered(this.get_input_pk(new_row));
for (op, row) in chunk.rows() {
let pk = DefaultOrdered(this.get_input_pk(row));
match op {
Op::Insert | Op::UpdateInsert => {
if let Some(prev_change) = changes_merged.get_mut(&pk) {
match prev_change {
Record::Delete { old_row } => {
*prev_change = Record::Update {
old_row: *old_row,
new_row,
new_row: row,
};
}
_ => panic!("inconsistent changes in input chunk"),
}
} else {
changes_merged.insert(pk, record);
changes_merged.insert(pk, Record::Insert { new_row: row });
}
}
Record::Delete { old_row } => {
let pk = DefaultOrdered(this.get_input_pk(old_row));
Op::Delete | Op::UpdateDelete => {
if let Some(prev_change) = changes_merged.get_mut(&pk) {
match prev_change {
Record::Insert { .. } => {
Expand All @@ -261,29 +260,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
_ => panic!("inconsistent changes in input chunk"),
}
} else {
changes_merged.insert(pk, record);
}
}
Record::Update { old_row, new_row } => {
let pk = DefaultOrdered(this.get_input_pk(old_row));
if let Some(prev_change) = changes_merged.get_mut(&pk) {
match prev_change {
Record::Insert { .. } => {
*prev_change = Record::Insert { new_row };
}
Record::Update {
old_row: real_old_row,
..
} => {
*prev_change = Record::Update {
old_row: *real_old_row,
new_row,
};
}
_ => panic!("inconsistent changes in input chunk"),
}
} else {
changes_merged.insert(pk, record);
changes_merged.insert(pk, Record::Delete { old_row: row });
}
}
}
Expand Down

0 comments on commit f0a9133

Please sign in to comment.