diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 091e199d7b52a..5a01fabd25149 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -21,7 +21,7 @@ use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_record::Record; -use risingwave_common::array::{RowRef, StreamChunk}; +use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Field; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; @@ -225,26 +225,25 @@ impl OverWindowExecutor { chunk: &'a StreamChunk, ) -> impl Iterator>> { let mut changes_merged = BTreeMap::new(); - for record in chunk.records() { - match record { - Record::Insert { new_row } => { - let pk = DefaultOrdered(this.get_input_pk(new_row)); + for (op, row) in chunk.rows() { + let pk = DefaultOrdered(this.get_input_pk(row)); + match op { + Op::Insert | Op::UpdateInsert => { if let Some(prev_change) = changes_merged.get_mut(&pk) { match prev_change { Record::Delete { old_row } => { *prev_change = Record::Update { old_row: *old_row, - new_row, + new_row: row, }; } _ => panic!("inconsistent changes in input chunk"), } } else { - changes_merged.insert(pk, record); + changes_merged.insert(pk, Record::Insert { new_row: row }); } } - Record::Delete { old_row } => { - let pk = DefaultOrdered(this.get_input_pk(old_row)); + Op::Delete | Op::UpdateDelete => { if let Some(prev_change) = changes_merged.get_mut(&pk) { match prev_change { Record::Insert { .. } => { @@ -261,29 +260,7 @@ impl OverWindowExecutor { _ => panic!("inconsistent changes in input chunk"), } } else { - changes_merged.insert(pk, record); - } - } - Record::Update { old_row, new_row } => { - let pk = DefaultOrdered(this.get_input_pk(old_row)); - if let Some(prev_change) = changes_merged.get_mut(&pk) { - match prev_change { - Record::Insert { .. } => { - *prev_change = Record::Insert { new_row }; - } - Record::Update { - old_row: real_old_row, - .. - } => { - *prev_change = Record::Update { - old_row: *real_old_row, - new_row, - }; - } - _ => panic!("inconsistent changes in input chunk"), - } - } else { - changes_merged.insert(pk, record); + changes_merged.insert(pk, Record::Delete { old_row: row }); } } }