diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index e3776de6d0fbc..8db25f363979e 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -525,6 +525,10 @@ impl OpRowMutRef<'_> { } impl StreamChunkMut { + pub fn capacity(&self) -> usize { + self.vis.len() + } + pub fn vis(&self, i: usize) -> bool { self.vis.is_set(i) } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 0bf825138d92a..66e91cf2ecf2a 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -269,8 +269,10 @@ impl HashJoinExecutor StreamExecutorResult<()> { + let (mut tx_l, mut tx_r, mut hash_join) = + create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await; + + // push the init barrier for left and right + tx_l.push_barrier(test_epoch(1), false); + tx_r.push_barrier(test_epoch(1), false); + hash_join.next_unwrap_ready_barrier()?; + + tx_l.push_chunk(StreamChunk::from_pretty( + " I I + + 1 1 + ", + )); + let chunk = hash_join.next_unwrap_ready_chunk()?; + assert_eq!( + chunk, + StreamChunk::from_pretty( + " I I I I + + 1 1 . ." + ) + ); + + tx_r.push_chunk(StreamChunk::from_pretty( + " I I + + 1 1 + ", + )); + let chunk = hash_join.next_unwrap_ready_chunk()?; + + assert_eq!( + chunk, + StreamChunk::from_pretty( + " I I I I + U- 1 1 . . + U+ 1 1 1 1" + ) + ); + + tx_l.push_chunk(StreamChunk::from_pretty( + " I I + - 1 1 + + 1 2 + ", + )); + let chunk = hash_join.next_unwrap_ready_chunk()?; + let chunk = chunk.compact(); + assert_eq!( + chunk, + StreamChunk::from_pretty( + " I I I I + - 1 1 1 1 + + 1 2 1 1 + " + ) + ); + + Ok(()) + } + #[tokio::test] async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()> { @@ -3093,8 +3156,8 @@ mod tests { chunk, StreamChunk::from_pretty( " I I I I - U- 2 5 . . - U+ 2 5 2 6 + U- 2 5 . . + U+ 2 5 2 6 + . . 4 8 + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once * despite matching on eq join on 2 diff --git a/src/stream/src/executor/join/builder.rs b/src/stream/src/executor/join/builder.rs index 72208aa45ded8..c39e385fdee96 100644 --- a/src/stream/src/executor/join/builder.rs +++ b/src/stream/src/executor/join/builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::array::stream_chunk::StreamChunkMut; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::row::{OwnedRow, Row}; @@ -156,6 +157,48 @@ impl JoinChunkBuilder } } + pub fn post_process(c: StreamChunk) -> StreamChunk { + let mut c = StreamChunkMut::from(c); + + // NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)` + // to avoid this issue + let mut i = 2; + while i < c.capacity() { + if c.op(i - 1) == Op::UpdateInsert + && c.op(i) == Op::UpdateDelete + && c.row_ref(i) == c.row_ref(i - 1) + { + if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert { + c.set_op(i - 2, Op::Delete); + c.set_vis(i - 1, false); + c.set_vis(i, false); + c.set_op(i + 1, Op::Insert); + i += 3; + } else { + debug_assert!( + false, + "unexpected Op sequences {:?}, {:?}, {:?}, {:?}", + c.op(i - 2), + c.op(i - 1), + c.op(i), + c.op(i + 1) + ); + warn!( + "unexpected Op sequences {:?}, {:?}, {:?}, {:?}", + c.op(i - 2), + c.op(i - 1), + c.op(i), + c.op(i + 1) + ); + i += 1; + } + } else { + i += 1; + } + } + c.into() + } + pub fn with_match_on_insert( &mut self, row: &RowRef<'_>, @@ -166,6 +209,7 @@ impl JoinChunkBuilder if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) { self.stream_chunk_builder .append_row_matched(Op::Delete, &matched_row.row) + .map(Self::post_process) } else { None } @@ -174,6 +218,7 @@ impl JoinChunkBuilder if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) { self.stream_chunk_builder .append_row_matched(Op::Insert, &matched_row.row) + .map(Self::post_process) } else { None } @@ -191,10 +236,12 @@ impl JoinChunkBuilder } self.stream_chunk_builder .append_row(Op::UpdateInsert, row, &matched_row.row) + .map(Self::post_process) // Inner sides } else { self.stream_chunk_builder .append_row(Op::Insert, row, &matched_row.row) + .map(Self::post_process) } } @@ -208,6 +255,7 @@ impl JoinChunkBuilder if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) { self.stream_chunk_builder .append_row_matched(Op::Insert, &matched_row.row) + .map(Self::post_process) } else { None } @@ -216,6 +264,7 @@ impl JoinChunkBuilder if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) { self.stream_chunk_builder .append_row_matched(Op::Delete, &matched_row.row) + .map(Self::post_process) } else { None } @@ -232,6 +281,8 @@ impl JoinChunkBuilder } self.stream_chunk_builder .append_row_matched(Op::UpdateInsert, &matched_row.row) + .map(|c: StreamChunk| Self::post_process(c)) + // Inner sides } else { // concat with the matched_row and append the new @@ -241,6 +292,7 @@ impl JoinChunkBuilder // the assumption for U+ after U-. self.stream_chunk_builder .append_row(Op::Delete, row, &matched_row.row) + .map(Self::post_process) } } @@ -252,7 +304,9 @@ impl JoinChunkBuilder ) -> Option { // if it's a semi join and the side needs to be maintained. if is_semi(T) && forward_exactly_once(T, SIDE) { - self.stream_chunk_builder.append_row_update(op, row) + self.stream_chunk_builder + .append_row_update(op, row) + .map(Self::post_process) } else { None } @@ -262,7 +316,9 @@ impl JoinChunkBuilder pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option { // if it's outer join or anti join and the side needs to be maintained. if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) { - self.stream_chunk_builder.append_row_update(op, row) + self.stream_chunk_builder + .append_row_update(op, row) + .map(Self::post_process) } else { None } @@ -270,6 +326,6 @@ impl JoinChunkBuilder #[inline] pub fn take(&mut self) -> Option { - self.stream_chunk_builder.take() + self.stream_chunk_builder.take().map(Self::post_process) } }