Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): make degree_state field in JoinHashMap an Option #18539

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@
bool is_append_only = 14;
}

message AsOfJoinNode {

Check failure on line 458 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "null_safe" on message "AsOfJoinNode" was deleted without reserving the name "null_safe".

Check failure on line 458 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "12" with name "asof_desc" on message "AsOfJoinNode" was deleted without reserving the name "asof_desc".

Check failure on line 458 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "null_safe" on message "AsOfJoinNode" was deleted without reserving the number "11".

Check failure on line 458 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "12" with name "asof_desc" on message "AsOfJoinNode" was deleted without reserving the number "12".
plan_common.AsOfJoinType join_type = 1;
repeated int32 left_key = 2;
repeated int32 right_key = 3;
Expand All @@ -463,22 +463,18 @@
catalog.Table left_table = 4;
// Used for internal table states.
catalog.Table right_table = 5;
// Used for internal table states.
catalog.Table left_degree_table = 6;
// Used for internal table states.
catalog.Table right_degree_table = 7;
// The output indices of current node
repeated uint32 output_indices = 8;
repeated uint32 output_indices = 6;

Check failure on line 467 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "output_indices" on message "AsOfJoinNode" changed option "json_name" from "leftDegreeTable" to "outputIndices".

Check failure on line 467 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "output_indices" on message "AsOfJoinNode" changed cardinality from "optional with explicit presence" to "repeated".

Check failure on line 467 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "output_indices" on message "AsOfJoinNode" changed type from "message" to "uint32". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 467 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "AsOfJoinNode" changed name from "left_degree_table" to "output_indices".
// Left deduped input pk indices. The pk of the left_table and
// The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices]
// left_inequality_key is not used but for forward compatibility.
repeated uint32 left_deduped_input_pk_indices = 9;
repeated uint32 left_deduped_input_pk_indices = 7;

Check failure on line 471 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" with name "left_deduped_input_pk_indices" on message "AsOfJoinNode" changed option "json_name" from "rightDegreeTable" to "leftDedupedInputPkIndices".

Check failure on line 471 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" with name "left_deduped_input_pk_indices" on message "AsOfJoinNode" changed cardinality from "optional with explicit presence" to "repeated".
// Right deduped input pk indices.
// The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices]
// right_inequality_key is not used but for forward compatibility.
repeated uint32 right_deduped_input_pk_indices = 10;
repeated bool null_safe = 11;
optional plan_common.AsOfJoinDesc asof_desc = 12;
repeated uint32 right_deduped_input_pk_indices = 8;
repeated bool null_safe = 9;
optional plan_common.AsOfJoinDesc asof_desc = 10;
}

message TemporalJoinNode {
Expand Down
79 changes: 11 additions & 68 deletions src/stream/src/executor/asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
null_safe: Vec<bool>,
output_indices: Vec<usize>,
state_table_l: StateTable<S>,
degree_state_table_l: StateTable<S>,
state_table_r: StateTable<S>,
degree_state_table_r: StateTable<S>,
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
Expand Down Expand Up @@ -219,22 +217,9 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
let state_pk_indices_l = input_l.pk_indices().to_vec();
let state_pk_indices_r = input_r.pk_indices().to_vec();

let state_order_key_indices_l = state_table_l.pk_indices();
let state_order_key_indices_r = state_table_r.pk_indices();

let state_join_key_indices_l = params_l.join_key_indices;
let state_join_key_indices_r = params_r.join_key_indices;

let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();

let degree_pk_indices_l = (state_join_key_indices_l.len()
..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
.collect_vec();
let degree_pk_indices_r = (state_join_key_indices_r.len()
..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
.collect_vec();

// If pk is contained in join key.
let pk_contained_in_jk_l =
is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
Expand All @@ -253,20 +238,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor

assert_eq!(join_key_data_types_l, join_key_data_types_r);

let degree_all_data_types_l = state_order_key_indices_l
.iter()
.map(|idx| state_all_data_types_l[*idx].clone())
.collect_vec();
let degree_all_data_types_r = state_order_key_indices_r
.iter()
.map(|idx| state_all_data_types_r[*idx].clone())
.collect_vec();

let null_matched = K::Bitmap::from_bool_vec(null_safe);

let need_degree_table_l = false;
let need_degree_table_r = false;

let (left_to_output, right_to_output) = {
let (left_len, right_len) = if is_left_semi_or_anti(T) {
(state_all_data_types_l.len(), 0usize)
Expand Down Expand Up @@ -303,12 +276,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
state_all_data_types_l.clone(),
state_table_l,
params_l.deduped_pk_indices,
degree_join_key_indices_l,
degree_all_data_types_l,
degree_state_table_l,
degree_pk_indices_l,
None,
null_matched.clone(),
need_degree_table_l,
pk_contained_in_jk_l,
inequal_key_idx_l,
metrics.clone(),
Expand All @@ -321,7 +290,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
i2o_mapping: left_to_output,
i2o_mapping_indexed: l2o_indexed,
start_pos: 0,
need_degree_table: need_degree_table_l,
need_degree_table: false,
},
side_r: JoinSide {
ht: JoinHashMap::new(
Expand All @@ -331,12 +300,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
state_all_data_types_r.clone(),
state_table_r,
params_r.deduped_pk_indices,
degree_join_key_indices_r,
degree_all_data_types_r,
degree_state_table_r,
degree_pk_indices_r,
None,
null_matched,
need_degree_table_r,
pk_contained_in_jk_r,
inequal_key_idx_r,
metrics.clone(),
Expand All @@ -349,7 +314,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
start_pos: side_l_column_n,
i2o_mapping: right_to_output,
i2o_mapping_indexed: r2o_indexed,
need_degree_table: need_degree_table_r,
need_degree_table: false,
},
metrics,
chunk_size,
Expand Down Expand Up @@ -675,7 +640,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
{
yield chunk;
}
side_update.ht.insert_row(key, row).await?;
side_update.ht.insert_row(key, row)?;
}
Op::Delete | Op::UpdateDelete => {
if let Some(matched_row_by_inequality) = matched_row_by_inequality {
Expand Down Expand Up @@ -924,7 +889,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor

match op {
Op::Insert | Op::UpdateInsert => {
side_update.ht.insert_row(key, row).await?;
side_update.ht.insert_row(key, row)?;
}
Op::Delete | Op::UpdateDelete => {
side_update.ht.delete_row(key, row)?;
Expand Down Expand Up @@ -976,7 +941,7 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
) -> StateTable<MemoryStateStore> {
let column_descs = data_types
.iter()
.enumerate()
Expand All @@ -991,27 +956,7 @@ mod tests {
)
.await;

// Create degree table
let mut degree_table_column_descs = vec![];
pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
degree_table_column_descs.push(ColumnDesc::unnamed(
ColumnId::new(pk_id as i32),
data_types[*idx].clone(),
))
});
degree_table_column_descs.push(ColumnDesc::unnamed(
ColumnId::new(pk_indices.len() as i32),
DataType::Int64,
));
let degree_state_table = StateTable::new_without_distribution(
mem_state,
TableId::new(table_id + 1),
degree_table_column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
)
.await;
(state_table, degree_state_table)
state_table
}

async fn create_executor<const T: AsOfJoinTypePrimitive>(
Expand All @@ -1033,7 +978,7 @@ mod tests {

let mem_state = MemoryStateStore::new();

let (state_l, degree_state_l) = create_in_memory_state_table(
let state_l = create_in_memory_state_table(
mem_state.clone(),
&[DataType::Int64, DataType::Int64, DataType::Int64],
&[
Expand All @@ -1046,7 +991,7 @@ mod tests {
)
.await;

let (state_r, degree_state_r) = create_in_memory_state_table(
let state_r = create_in_memory_state_table(
mem_state,
&[DataType::Int64, DataType::Int64, DataType::Int64],
&[
Expand All @@ -1055,7 +1000,7 @@ mod tests {
OrderType::ascending(),
],
&[0, asof_desc.right_idx, 1],
2,
1,
)
.await;

Expand All @@ -1080,9 +1025,7 @@ mod tests {
vec![false],
(0..schema_len).collect_vec(),
state_l,
degree_state_l,
state_r,
degree_state_r,
Arc::new(AtomicU64::new(0)),
Arc::new(StreamingMetrics::unused()),
1024,
Expand Down
52 changes: 24 additions & 28 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
let state_pk_indices_l = input_l.pk_indices().to_vec();
let state_pk_indices_r = input_r.pk_indices().to_vec();

let state_order_key_indices_l = state_table_l.pk_indices();
let state_order_key_indices_r = state_table_r.pk_indices();

let state_join_key_indices_l = params_l.join_key_indices;
let state_join_key_indices_r = params_r.join_key_indices;

Expand Down Expand Up @@ -285,20 +282,26 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

assert_eq!(join_key_data_types_l, join_key_data_types_r);

let degree_all_data_types_l = state_order_key_indices_l
.iter()
.map(|idx| state_all_data_types_l[*idx].clone())
.collect_vec();
let degree_all_data_types_r = state_order_key_indices_r
.iter()
.map(|idx| state_all_data_types_r[*idx].clone())
.collect_vec();

let null_matched = K::Bitmap::from_bool_vec(null_safe);

let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;

let degree_state_l = need_degree_table_l.then(|| {
TableInner::new(
degree_pk_indices_l,
degree_join_key_indices_l,
degree_state_table_l,
)
});
let degree_state_r = need_degree_table_r.then(|| {
TableInner::new(
degree_pk_indices_r,
degree_join_key_indices_r,
degree_state_table_r,
)
});

let (left_to_output, right_to_output) = {
let (left_len, right_len) = if is_left_semi_or_anti(T) {
(state_all_data_types_l.len(), 0usize)
Expand Down Expand Up @@ -389,12 +392,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
state_all_data_types_l.clone(),
state_table_l,
params_l.deduped_pk_indices,
degree_join_key_indices_l,
degree_all_data_types_l,
degree_state_table_l,
degree_pk_indices_l,
degree_state_l,
null_matched.clone(),
need_degree_table_l,
pk_contained_in_jk_l,
None,
metrics.clone(),
Expand All @@ -420,12 +419,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
state_all_data_types_r.clone(),
state_table_r,
params_r.deduped_pk_indices,
degree_join_key_indices_r,
degree_all_data_types_r,
degree_state_table_r,
degree_pk_indices_r,
degree_state_r,
null_matched,
need_degree_table_r,
pk_contained_in_jk_r,
None,
metrics.clone(),
Expand Down Expand Up @@ -946,14 +941,15 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}

if append_only_optimize && let Some(row) = append_only_matched_row {
side_match.ht.delete(key, row)?;
if side_match.need_degree_table {
side_match.ht.delete(key, row)?;
} else {
side_match.ht.delete_row(key, row.row)?;
}
} else if side_update.need_degree_table {
side_update
.ht
.insert(key, JoinRow::new(row, degree))
.await?;
side_update.ht.insert(key, JoinRow::new(row, degree))?;
} else {
side_update.ht.insert_row(key, row).await?;
side_update.ht.insert_row(key, row)?;
}
} else {
// Row which violates null-safe bitmap will never be matched so we need not
Expand Down
Loading
Loading