Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: extract some common functions for join #14868

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/stream/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use builder::*;
pub use column_mapping::*;
pub use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;

mod builder;
pub mod cache;
mod column_mapping;
pub mod log_store_impl;
Expand Down
222 changes: 8 additions & 214 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,112 +33,26 @@ use risingwave_expr::ExprError;
use risingwave_storage::StateStore;
use tokio::time::Instant;

use self::JoinType::{FullOuter, LeftOuter, LeftSemi, RightAnti, RightOuter, RightSemi};
use self::builder::JoinChunkBuilder;
use super::barrier_align::*;
use super::error::{StreamExecutorError, StreamExecutorResult};
use super::managed_state::join::*;
use super::join::hash_join::*;
use super::join::row::JoinRow;
use super::join::{JoinTypePrimitive, SideTypePrimitive, *};
use super::monitor::StreamingMetrics;
use super::watermark::*;
use super::{
ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, Message,
PkIndicesRef, Watermark,
};
use crate::common::table::state_table::StateTable;
use crate::common::JoinStreamChunkBuilder;
use crate::executor::expect_first_barrier_from_aligned_stream;
use crate::executor::JoinType::LeftAnti;
use crate::executor::join::builder::JoinStreamChunkBuilder;
use crate::task::AtomicU64Ref;

/// The `JoinType` and `SideType` are to mimic a enum, because currently
/// enum is not supported in const generic.
// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
pub type JoinTypePrimitive = u8;

/// Evict the cache every n rows.
const EVICT_EVERY_N_ROWS: u32 = 16;

#[allow(non_snake_case, non_upper_case_globals)]
pub mod JoinType {
use super::JoinTypePrimitive;
pub const Inner: JoinTypePrimitive = 0;
pub const LeftOuter: JoinTypePrimitive = 1;
pub const RightOuter: JoinTypePrimitive = 2;
pub const FullOuter: JoinTypePrimitive = 3;
pub const LeftSemi: JoinTypePrimitive = 4;
pub const LeftAnti: JoinTypePrimitive = 5;
pub const RightSemi: JoinTypePrimitive = 6;
pub const RightAnti: JoinTypePrimitive = 7;
}

pub type SideTypePrimitive = u8;
#[allow(non_snake_case, non_upper_case_globals)]
pub mod SideType {
use super::SideTypePrimitive;
pub const Left: SideTypePrimitive = 0;
pub const Right: SideTypePrimitive = 1;
}

const fn is_outer_side(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
join_type == JoinType::FullOuter
|| (join_type == JoinType::LeftOuter && side_type == SideType::Left)
|| (join_type == JoinType::RightOuter && side_type == SideType::Right)
}

const fn outer_side_null(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
join_type == JoinType::FullOuter
|| (join_type == JoinType::LeftOuter && side_type == SideType::Right)
|| (join_type == JoinType::RightOuter && side_type == SideType::Left)
}

/// Send the update only once if the join type is semi/anti and the update is the same side as the
/// join
const fn forward_exactly_once(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
&& side_type == SideType::Left)
|| ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
&& side_type == SideType::Right)
}

const fn only_forward_matched_side(
join_type: JoinTypePrimitive,
side_type: SideTypePrimitive,
) -> bool {
((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
&& side_type == SideType::Right)
|| ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
&& side_type == SideType::Left)
}

const fn is_semi(join_type: JoinTypePrimitive) -> bool {
join_type == JoinType::LeftSemi || join_type == JoinType::RightSemi
}

const fn is_anti(join_type: JoinTypePrimitive) -> bool {
join_type == JoinType::LeftAnti || join_type == JoinType::RightAnti
}

const fn is_left_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti
}

const fn is_right_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
join_type == JoinType::RightSemi || join_type == JoinType::RightAnti
}

const fn need_left_degree(join_type: JoinTypePrimitive) -> bool {
join_type == FullOuter
|| join_type == LeftOuter
|| join_type == LeftAnti
|| join_type == LeftSemi
}

const fn need_right_degree(join_type: JoinTypePrimitive) -> bool {
join_type == FullOuter
|| join_type == RightOuter
|| join_type == RightAnti
|| join_type == RightSemi
}

fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
}
Expand Down Expand Up @@ -295,10 +209,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> Executor for HashJoi
}
}

struct HashJoinChunkBuilder<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> {
stream_chunk_builder: JoinStreamChunkBuilder,
}

struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
ctx: &'a ActorContextRef,
side_l: &'a mut JoinSide<K, S>,
Expand All @@ -312,121 +222,6 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
cnt_rows_received: &'a mut u32,
}

impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> HashJoinChunkBuilder<T, SIDE> {
fn with_match_on_insert(
&mut self,
row: &RowRef<'_>,
matched_row: &JoinRow<OwnedRow>,
) -> Option<StreamChunk> {
// Left/Right Anti sides
if is_anti(T) {
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
} else {
None
}
// Left/Right Semi sides
} else if is_semi(T) {
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
} else {
None
}
// Outer sides
} else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
// if the matched_row does not have any current matches
// `StreamChunkBuilder` guarantees that `UpdateDelete` will never
// issue an output chunk.
if self
.stream_chunk_builder
.append_row_matched(Op::UpdateDelete, &matched_row.row)
.is_some()
{
unreachable!("`Op::UpdateDelete` should not yield chunk");
}
self.stream_chunk_builder
.append_row(Op::UpdateInsert, row, &matched_row.row)
// Inner sides
} else {
self.stream_chunk_builder
.append_row(Op::Insert, row, &matched_row.row)
}
}

fn with_match_on_delete(
&mut self,
row: &RowRef<'_>,
matched_row: &JoinRow<OwnedRow>,
) -> Option<StreamChunk> {
// Left/Right Anti sides
if is_anti(T) {
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Insert, &matched_row.row)
} else {
None
}
// Left/Right Semi sides
} else if is_semi(T) {
if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
self.stream_chunk_builder
.append_row_matched(Op::Delete, &matched_row.row)
} else {
None
}
// Outer sides
} else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
// if the matched_row does not have any current
// matches
if self
.stream_chunk_builder
.append_row(Op::UpdateDelete, row, &matched_row.row)
.is_some()
{
unreachable!("`Op::UpdateDelete` should not yield chunk");
}
self.stream_chunk_builder
.append_row_matched(Op::UpdateInsert, &matched_row.row)
// Inner sides
} else {
// concat with the matched_row and append the new
// row
// FIXME: we always use `Op::Delete` here to avoid
// violating
// the assumption for U+ after U-.
self.stream_chunk_builder
.append_row(Op::Delete, row, &matched_row.row)
}
}

#[inline]
fn forward_exactly_once_if_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
// if it's a semi join and the side needs to be maintained.
if is_semi(T) && forward_exactly_once(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
} else {
None
}
}

#[inline]
fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
// if it's outer join or anti join and the side needs to be maintained.
if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
self.stream_chunk_builder.append_row_update(op, row)
} else {
None
}
}

#[inline]
fn take(&mut self) -> Option<StreamChunk> {
self.stream_chunk_builder.take()
}
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -1025,14 +820,13 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
})
.collect_vec();

let mut hashjoin_chunk_builder = HashJoinChunkBuilder::<T, SIDE> {
stream_chunk_builder: JoinStreamChunkBuilder::new(
let mut hashjoin_chunk_builder =
JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the names become more confused after renaming.

chunk_size,
actual_output_data_types.to_vec(),
side_update.i2o_mapping.clone(),
side_match.i2o_mapping.clone(),
),
};
));

let join_matched_join_keys = ctx
.streaming_metrics
Expand Down
Loading
Loading