diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index e3776de6d0fbc..b80e7a0530d01 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -525,6 +525,12 @@ impl OpRowMutRef<'_> { } impl StreamChunkMut { + + pub fn capacity(&self) -> usize { + self.vis.len() + } + + pub fn vis(&self, i: usize) -> bool { self.vis.is_set(i) } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index aa8afa35339e4..66e91cf2ecf2a 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -2607,8 +2607,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 2 5 . . - + 2 5 2 7" + U- 2 5 . . + U+ 2 5 2 7" ) ); @@ -2619,8 +2619,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 3 6 . . - + 3 6 3 10" + U- 3 6 . . + U+ 3 6 3 10" ) ); @@ -2691,8 +2691,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 2 5 . . - + 2 5 2 7" + U- 2 5 . . + U+ 2 5 2 7" ) ); @@ -2703,8 +2703,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - . 6 . . - + . 6 . 10" + U- . 6 . . + U+ . 6 . 10" ) ); @@ -2844,10 +2844,10 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I I I - - 2 5 2 . . . - + 2 5 2 2 5 1 - - 4 9 4 . . . - + 4 9 4 4 9 2" + U- 2 5 2 . . . + U+ 2 5 2 2 5 1 + U- 4 9 4 . . . + U+ 4 9 4 4 9 2" ) ); @@ -2858,10 +2858,10 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I I I - - 1 4 1 . . . - + 1 4 1 1 4 4 - - 3 6 3 . . . - + 3 6 3 3 6 5" + U- 1 4 1 . . . + U+ 1 4 1 1 4 4 + U- 3 6 3 . . . + U+ 3 6 3 3 6 5" ) ); @@ -3003,8 +3003,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 2 5 . . - + 2 5 2 7 + U- 2 5 . . + U+ 2 5 2 7 + . . 4 8 + . . 6 9" ) @@ -3060,8 +3060,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 1 1 . . - + 1 1 1 1" + U- 1 1 . . + U+ 1 1 1 1" ) ); @@ -3156,8 +3156,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - - 2 5 . . - + 2 5 2 6 + U- 2 5 . . + U+ 2 5 2 6 + . . 4 8 + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once * despite matching on eq join on 2 diff --git a/src/stream/src/executor/join/builder.rs b/src/stream/src/executor/join/builder.rs index d8c36e98ce0b3..e8240773b415e 100644 --- a/src/stream/src/executor/join/builder.rs +++ b/src/stream/src/executor/join/builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::array::stream_chunk::StreamChunkMut; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::row::{OwnedRow, Row}; @@ -157,7 +158,26 @@ impl JoinChunkBuilder } pub fn post_process(c: StreamChunk) -> StreamChunk { - todo!() + let mut c = StreamChunkMut::from(c); + + // NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)` + // to avoid this issue + let mut i = 1; + while i < c.capacity() { + if c.op(i - 1) == Op::UpdateInsert + && c.op(i) == Op::UpdateDelete + && c.row_ref(i) == c.row_ref(i - 1) + { + c.set_op(i - 2, Op::Delete); + c.set_vis(i - 1, false); + c.set_vis(i, false); + c.set_op(i + 1, Op::Insert); + i += 3; + } else { + i += 1; + } + } + c.into() } pub fn with_match_on_insert( @@ -287,8 +307,6 @@ impl JoinChunkBuilder #[inline] pub fn take(&mut self) -> Option { - self.stream_chunk_builder - .take() - .map(Self::post_process) + self.stream_chunk_builder.take().map(Self::post_process) } }