diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 711f06f754414..0fd5563d4a790 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -303,6 +303,33 @@ impl StreamChunk { } } + /// Remove the adjacent delete-insert if their row value are the same. + pub fn eliminate_adjacent_noop_update(self) -> Self { + let len = self.data_chunk().capacity(); + let mut c: StreamChunkMut = self.into(); + let mut prev_r = None; + for curr in 0..len { + if !c.vis(curr) { + continue; + } + if let Some(prev) = prev_r { + if matches!(c.op(prev), Op::UpdateDelete | Op::Delete) + && matches!(c.op(curr), Op::UpdateInsert | Op::Insert) + && c.row_ref(prev) == c.row_ref(curr) + { + c.set_vis(prev, false); + c.set_vis(curr, false); + prev_r = None; + } else { + prev_r = Some(curr) + } + } else { + prev_r = Some(curr); + } + } + c.into() + } + /// Reorder columns and set visibility. pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self { Self { @@ -492,6 +519,18 @@ impl OpRowMutRef<'_> { } impl StreamChunkMut { + pub fn vis(&self, i: usize) -> bool { + self.vis.is_set(i) + } + + pub fn op(&self, i: usize) -> Op { + self.ops.get(i) + } + + pub fn row_ref(&self, i: usize) -> RowRef<'_> { + RowRef::with_columns(self.columns(), i) + } + pub fn set_vis(&mut self, n: usize, val: bool) { self.vis.set(n, val); } @@ -788,6 +827,36 @@ mod tests { "\ +---+---+---+ | - | 2 | | ++---+---+---+" + ); + } + + #[test] + fn test_eliminate_adjacent_noop_update() { + let c = StreamChunk::from_pretty( + " I I + - 1 6 D + - 2 2 + + 2 3 + - 2 3 + + 1 6 + - 1 7 + + 1 10 D + + 1 7 + U- 3 7 + U+ 3 7 + + 2 3", + ); + let c = c.eliminate_adjacent_noop_update(); + assert_eq!( + c.to_pretty().to_string(), + "\ ++---+---+---+ +| - | 2 | 2 | +| + | 2 | 3 | +| - | 2 | 3 | +| + | 1 | 6 | +| + | 2 | 3 | +---+---+---+" ); } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index c958d6ab09556..f824ebc7857d5 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -619,7 +619,14 @@ impl RoundRobinDataDispatcher { impl Dispatcher for RoundRobinDataDispatcher { async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { - let chunk = chunk.project(&self.output_indices); + let chunk = if self.output_indices.len() < chunk.columns().len() { + chunk + .project(&self.output_indices) + .eliminate_adjacent_noop_update() + } else { + chunk.project(&self.output_indices) + }; + self.outputs[self.cur].send(Message::Chunk(chunk)).await?; self.cur += 1; self.cur %= self.outputs.len(); @@ -739,7 +746,13 @@ impl Dispatcher for HashDataDispatcher { let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); // Apply output indices after calculating the vnode. - let chunk = chunk.project(&self.output_indices); + let chunk = if self.output_indices.len() < chunk.columns().len() { + chunk + .project(&self.output_indices) + .eliminate_adjacent_noop_update() + } else { + chunk.project(&self.output_indices) + }; for ((vnode, &op), visible) in vnodes .iter() @@ -858,7 +871,13 @@ impl BroadcastDispatcher { impl Dispatcher for BroadcastDispatcher { async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { - let chunk = chunk.project(&self.output_indices); + let chunk = if self.output_indices.len() < chunk.columns().len() { + chunk + .project(&self.output_indices) + .eliminate_adjacent_noop_update() + } else { + chunk.project(&self.output_indices) + }; broadcast_concurrent(self.outputs.values_mut(), Message::Chunk(chunk)).await } @@ -956,7 +975,13 @@ impl Dispatcher for SimpleDispatcher { .exactly_one() .expect("expect exactly one output"); - let chunk = chunk.project(&self.output_indices); + let chunk = if self.output_indices.len() < chunk.columns().len() { + chunk + .project(&self.output_indices) + .eliminate_adjacent_noop_update() + } else { + chunk.project(&self.output_indices) + }; output.send(Message::Chunk(chunk)).await }