Skip to content

Commit

Permalink
change the impl
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Jul 8, 2024
1 parent 089141e commit 08e558a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 26 deletions.
6 changes: 6 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ impl OpRowMutRef<'_> {
}

impl StreamChunkMut {

pub fn capacity(&self) -> usize {
self.vis.len()
}


pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}
Expand Down
44 changes: 22 additions & 22 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2607,8 +2607,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 2 5 . .
+ 2 5 2 7"
U- 2 5 . .
U+ 2 5 2 7"
)
);

Expand All @@ -2619,8 +2619,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 3 6 . .
+ 3 6 3 10"
U- 3 6 . .
U+ 3 6 3 10"
)
);

Expand Down Expand Up @@ -2691,8 +2691,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 2 5 . .
+ 2 5 2 7"
U- 2 5 . .
U+ 2 5 2 7"
)
);

Expand All @@ -2703,8 +2703,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- . 6 . .
+ . 6 . 10"
U- . 6 . .
U+ . 6 . 10"
)
);

Expand Down Expand Up @@ -2844,10 +2844,10 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I I I
- 2 5 2 . . .
+ 2 5 2 2 5 1
- 4 9 4 . . .
+ 4 9 4 4 9 2"
U- 2 5 2 . . .
U+ 2 5 2 2 5 1
U- 4 9 4 . . .
U+ 4 9 4 4 9 2"
)
);

Expand All @@ -2858,10 +2858,10 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I I I
- 1 4 1 . . .
+ 1 4 1 1 4 4
- 3 6 3 . . .
+ 3 6 3 3 6 5"
U- 1 4 1 . . .
U+ 1 4 1 1 4 4
U- 3 6 3 . . .
U+ 3 6 3 3 6 5"
)
);

Expand Down Expand Up @@ -3003,8 +3003,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 2 5 . .
+ 2 5 2 7
U- 2 5 . .
U+ 2 5 2 7
+ . . 4 8
+ . . 6 9"
)
Expand Down Expand Up @@ -3060,8 +3060,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 1 1 . .
+ 1 1 1 1"
U- 1 1 . .
U+ 1 1 1 1"
)
);

Expand Down Expand Up @@ -3156,8 +3156,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
- 2 5 . .
+ 2 5 2 6
U- 2 5 . .
U+ 2 5 2 6
+ . . 4 8
+ . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
* despite matching on eq join on 2
Expand Down
26 changes: 22 additions & 4 deletions src/stream/src/executor/join/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::row::{OwnedRow, Row};
Expand Down Expand Up @@ -157,7 +158,26 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}

pub fn post_process(c: StreamChunk) -> StreamChunk {
todo!()
let mut c = StreamChunkMut::from(c);

// NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)`
// to avoid this issue <https://github.com/risingwavelabs/risingwave/issues/17450>
let mut i = 1;
while i < c.capacity() {
if c.op(i - 1) == Op::UpdateInsert
&& c.op(i) == Op::UpdateDelete
&& c.row_ref(i) == c.row_ref(i - 1)
{
c.set_op(i - 2, Op::Delete);
c.set_vis(i - 1, false);
c.set_vis(i, false);
c.set_op(i + 1, Op::Insert);
i += 3;
} else {
i += 1;
}
}
c.into()
}

pub fn with_match_on_insert(
Expand Down Expand Up @@ -287,8 +307,6 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder

#[inline]
pub fn take(&mut self) -> Option<StreamChunk> {
self.stream_chunk_builder
.take()
.map(Self::post_process)
self.stream_chunk_builder.take().map(Self::post_process)
}
}

0 comments on commit 08e558a

Please sign in to comment.