Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact_noop_update_after_outer_join #17568

Merged
merged 8 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ impl OpRowMutRef<'_> {
}

impl StreamChunkMut {
pub fn capacity(&self) -> usize {
self.vis.len()
}

pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}
Expand Down
75 changes: 69 additions & 6 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.collect_vec();

// If pk is contained in join key.
let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
let pk_contained_in_jk_l =
is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
let pk_contained_in_jk_r =
is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());

// check whether join key contains pk in both side
let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
Expand Down Expand Up @@ -3001,8 +3003,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
U- 2 5 . .
U+ 2 5 2 7
U- 2 5 . .
U+ 2 5 2 7
+ . . 4 8
+ . . 6 9"
)
Expand All @@ -3023,6 +3025,67 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
let (mut tx_l, mut tx_r, mut hash_join) =
create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
tx_r.push_barrier(test_epoch(1), false);
hash_join.next_unwrap_ready_barrier()?;

tx_l.push_chunk(StreamChunk::from_pretty(
" I I
+ 1 1
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
+ 1 1 . ."
)
);

tx_r.push_chunk(StreamChunk::from_pretty(
" I I
+ 1 1
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;

assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
U- 1 1 . .
U+ 1 1 1 1"
)
);

tx_l.push_chunk(StreamChunk::from_pretty(
" I I
- 1 1
+ 1 2
",
));
let chunk = hash_join.next_unwrap_ready_chunk()?;
let chunk = chunk.compact();
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I I I I
- 1 1 1 1
+ 1 2 1 1
"
)
);

Ok(())
}

#[tokio::test]
async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
{
Expand Down Expand Up @@ -3093,8 +3156,8 @@ mod tests {
chunk,
StreamChunk::from_pretty(
" I I I I
U- 2 5 . .
U+ 2 5 2 6
U- 2 5 . .
U+ 2 5 2 6
+ . . 4 8
+ . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
* despite matching on eq join on 2
Expand Down
62 changes: 59 additions & 3 deletions src/stream/src/executor/join/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::row::{OwnedRow, Row};
Expand Down Expand Up @@ -156,6 +157,48 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
}

pub fn post_process(c: StreamChunk) -> StreamChunk {
let mut c = StreamChunkMut::from(c);
Comment on lines +160 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comment about this.

Also is it possible to add this in append_row_matched etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only do it on the JoinChunkBuilder but not for the JoinStreamChunkBuilder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not adding comments :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot. 😶‍🌫️


// NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL), UpdateIns(k, new)`
// to avoid this issue <https://github.com/risingwavelabs/risingwave/issues/17450>
let mut i = 2;
while i < c.capacity() {
if c.op(i - 1) == Op::UpdateInsert
&& c.op(i) == Op::UpdateDelete
st1page marked this conversation as resolved.
Show resolved Hide resolved
&& c.row_ref(i) == c.row_ref(i - 1)
{
if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert {
c.set_op(i - 2, Op::Delete);
c.set_vis(i - 1, false);
c.set_vis(i, false);
c.set_op(i + 1, Op::Insert);
i += 3;
} else {
debug_assert!(
false,
"unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
c.op(i - 2),
c.op(i - 1),
c.op(i),
c.op(i + 1)
);
warn!(
"unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
c.op(i - 2),
c.op(i - 1),
c.op(i),
c.op(i + 1)
);
i += 1;
}
} else {
i += 1;
}
}
c.into()
}

pub fn with_match_on_insert(
&mut self,
row: &RowRef<'_>,
Expand All @@ -166,6 +209,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -174,6 +218,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -191,10 +236,12 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
self.stream_chunk_builder
.append_row(Op::UpdateInsert, row, &matched_row.row)
.map(Self::post_process)
// Inner sides
} else {
self.stream_chunk_builder
.append_row(Op::Insert, row, &matched_row.row)
.map(Self::post_process)
}
}

Expand All @@ -208,6 +255,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -216,6 +264,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -232,6 +281,8 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
}
self.stream_chunk_builder
.append_row_matched(Op::UpdateInsert, &matched_row.row)
.map(|c: StreamChunk| Self::post_process(c))

// Inner sides
} else {
// concat with the matched_row and append the new
Expand All @@ -241,6 +292,7 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
// the assumption for U+ after U-.
self.stream_chunk_builder
.append_row(Op::Delete, row, &matched_row.row)
.map(Self::post_process)
}
}

Expand All @@ -252,7 +304,9 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
) -> Option<StreamChunk> {
// if it's a semi join and the side needs to be maintained.
if is_semi(T) && forward_exactly_once(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
self.stream_chunk_builder
.append_row_update(op, row)
.map(Self::post_process)
} else {
None
}
Expand All @@ -262,14 +316,16 @@ impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder
pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
// if it's outer join or anti join and the side needs to be maintained.
if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
self.stream_chunk_builder
.append_row_update(op, row)
.map(Self::post_process)
} else {
None
}
}

#[inline]
pub fn take(&mut self) -> Option<StreamChunk> {
self.stream_chunk_builder.take()
self.stream_chunk_builder.take().map(Self::post_process)
}
}
Loading