Skip to content

Commit

Permalink
feat: compact_noop_update_after_outer_join (#17568)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Jul 9, 2024
1 parent 607a2af commit 9da0627
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ impl OpRowMutRef<'_> {
}

impl StreamChunkMut {
pub fn capacity(&self) -> usize {
self.vis.len()
}

pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}
Expand Down
75 changes: 69 additions & 6 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.collect_vec();

// If pk is contained in join key.
let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
let pk_contained_in_jk_l =
is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
let pk_contained_in_jk_r =
is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());

// check whether join key contains pk in both side
let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
Expand Down Expand Up @@ -3001,8 +3003,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
U- 2 5 . .
U+ 2 5 2 7
U- 2 5 . .
U+ 2 5 2 7
+ . . 4 8
+ . . 6 9"
)
Expand All @@ -3023,6 +3025,67 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
let (mut tx_l, mut tx_r, mut hash_join) =
create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
tx_r.push_barrier(test_epoch(1), false);
hash_join.next_unwrap_ready_barrier()?;

tx_l.push_chunk(StreamChunk::from_pretty(
" I I
+ 1 1
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
+ 1 1 . ."
)
);

tx_r.push_chunk(StreamChunk::from_pretty(
" I I
+ 1 1
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;

assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
U- 1 1 . .
U+ 1 1 1 1"
)
);

tx_l.push_chunk(StreamChunk::from_pretty(
" I I
- 1 1
+ 1 2
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;
let chunk = chunk.compact();
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
- 1 1 1 1
+ 1 2 1 1
"
)
);

Ok(())
}

#[tokio::test]
async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
{
Expand Down Expand Up @@ -3093,8 +3156,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
U- 2 5 . .
U+ 2 5 2 6
U- 2 5 . .
U+ 2 5 2 6
+ . . 4 8
+ . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
* despite matching on eq join on 2
Expand Down
62 changes: 59 additions & 3 deletions src/stream/src/executor/join/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::row::{OwnedRow, Row};
Expand Down Expand Up @@ -156,6 +157,48 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
}

pub fn post_process(c: StreamChunk) -> StreamChunk {
let mut c = StreamChunkMut::from(c);

// NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)`
// to avoid this issue <https://github.com/risingwavelabs/risingwave/issues/17450>
let mut i = 2;
while i < c.capacity() {
if c.op(i - 1) == Op::UpdateInsert
&& c.op(i) == Op::UpdateDelete
&& c.row_ref(i) == c.row_ref(i - 1)
{
if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert {
c.set_op(i - 2, Op::Delete);
c.set_vis(i - 1, false);
c.set_vis(i, false);
c.set_op(i + 1, Op::Insert);
i += 3;
} else {
debug_assert!(
false,
"unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
c.op(i - 2),
c.op(i - 1),
c.op(i),
c.op(i + 1)
);
warn!(
"unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
c.op(i - 2),
c.op(i - 1),
c.op(i),
c.op(i + 1)
);
i += 1;
}
} else {
i += 1;
}
}
c.into()
}

pub fn with_match_on_insert(
&mut self,
row: &RowRef<'_>,
Expand All @@ -166,6 +209,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -174,6 +218,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -191,10 +236,12 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
self.stream_chunk_builder
.append_row(Op::UpdateInsert, row, &matched_row.row)
.map(Self::post_process)
// Inner sides
} else {
self.stream_chunk_builder
.append_row(Op::Insert, row, &matched_row.row)
.map(Self::post_process)
}
}

Expand All @@ -208,6 +255,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -216,6 +264,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -232,6 +281,8 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
self.stream_chunk_builder
.append_row_matched(Op::UpdateInsert, &matched_row.row)
.map(|c: StreamChunk| Self::post_process(c))

// Inner sides
} else {
// concat with the matched_row and append the new
Expand All @@ -241,6 +292,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
// the assumption for U+ after U-.
self.stream_chunk_builder
.append_row(Op::Delete, row, &matched_row.row)
.map(Self::post_process)
}
}

Expand All @@ -252,7 +304,9 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
) -> Option<StreamChunk> {
// if it's a semi join and the side needs to be maintained.
if is_semi(T) && forward_exactly_once(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
self.stream_chunk_builder
.append_row_update(op, row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -262,14 +316,16 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
// if it's outer join or anti join and the side needs to be maintained.
if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
self.stream_chunk_builder
.append_row_update(op, row)
.map(Self::post_process)
} else {
None
}
}

#[inline]
pub fn take(&mut self) -> Option<StreamChunk> {
self.stream_chunk_builder.take()
self.stream_chunk_builder.take().map(Self::post_process)
}
}

0 comments on commit 9da0627

Please sign in to comment.