diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index b80e7a0530d0..8db25f363979 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -525,12 +525,10 @@ impl OpRowMutRef<'_> { } impl StreamChunkMut { - pub fn capacity(&self) -> usize { self.vis.len() } - pub fn vis(&self, i: usize) -> bool { self.vis.is_set(i) } diff --git a/src/stream/src/executor/join/builder.rs b/src/stream/src/executor/join/builder.rs index e8240773b415..c39e385fdee9 100644 --- a/src/stream/src/executor/join/builder.rs +++ b/src/stream/src/executor/join/builder.rs @@ -162,17 +162,36 @@ impl JoinChunkBuilder // NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)` // to avoid this issue - let mut i = 1; + let mut i = 2; while i < c.capacity() { if c.op(i - 1) == Op::UpdateInsert && c.op(i) == Op::UpdateDelete && c.row_ref(i) == c.row_ref(i - 1) { - c.set_op(i - 2, Op::Delete); - c.set_vis(i - 1, false); - c.set_vis(i, false); - c.set_op(i + 1, Op::Insert); - i += 3; + if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert { + c.set_op(i - 2, Op::Delete); + c.set_vis(i - 1, false); + c.set_vis(i, false); + c.set_op(i + 1, Op::Insert); + i += 3; + } else { + debug_assert!( + false, + "unexpected Op sequences {:?}, {:?}, {:?}, {:?}", + c.op(i - 2), + c.op(i - 1), + c.op(i), + c.op(i + 1) + ); + warn!( + "unexpected Op sequences {:?}, {:?}, {:?}, {:?}", + c.op(i - 2), + c.op(i - 1), + c.op(i), + c.op(i + 1) + ); + i += 1; + } } else { i += 1; }