diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index ca67737aeafe0..d6a6ae0ed67e9 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -463,22 +463,18 @@ message AsOfJoinNode { catalog.Table left_table = 4; // Used for internal table states. catalog.Table right_table = 5; - // Used for internal table states. - catalog.Table left_degree_table = 6; - // Used for internal table states. - catalog.Table right_degree_table = 7; // The output indices of current node - repeated uint32 output_indices = 8; + repeated uint32 output_indices = 6; // Left deduped input pk indices. The pk of the left_table and // The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices] // left_inequality_key is not used but for forward compatibility. - repeated uint32 left_deduped_input_pk_indices = 9; + repeated uint32 left_deduped_input_pk_indices = 7; // Right deduped input pk indices. // The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices] // right_inequality_key is not used but for forward compatibility. - repeated uint32 right_deduped_input_pk_indices = 10; - repeated bool null_safe = 11; - optional plan_common.AsOfJoinDesc asof_desc = 12; + repeated uint32 right_deduped_input_pk_indices = 8; + repeated bool null_safe = 9; + optional plan_common.AsOfJoinDesc asof_desc = 10; } message TemporalJoinNode { diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index b2cd81a04d3b4..74e55deca75fa 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -186,9 +186,7 @@ impl AsOfJoinExecutor null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, watermark_epoch: AtomicU64Ref, metrics: Arc, chunk_size: usize, @@ -219,22 +217,9 @@ impl AsOfJoinExecutor let state_pk_indices_l = input_l.pk_indices().to_vec(); let state_pk_indices_r = input_r.pk_indices().to_vec(); - let state_order_key_indices_l = state_table_l.pk_indices(); - let state_order_key_indices_r = state_table_r.pk_indices(); - let state_join_key_indices_l = params_l.join_key_indices; let state_join_key_indices_r = params_r.join_key_indices; - let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec(); - let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec(); - - let degree_pk_indices_l = (state_join_key_indices_l.len() - ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len()) - .collect_vec(); - let degree_pk_indices_r = (state_join_key_indices_r.len() - ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len()) - .collect_vec(); - // If pk is contained in join key. let pk_contained_in_jk_l = is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone()); @@ -253,20 +238,8 @@ impl AsOfJoinExecutor assert_eq!(join_key_data_types_l, join_key_data_types_r); - let degree_all_data_types_l = state_order_key_indices_l - .iter() - .map(|idx| state_all_data_types_l[*idx].clone()) - .collect_vec(); - let degree_all_data_types_r = state_order_key_indices_r - .iter() - .map(|idx| state_all_data_types_r[*idx].clone()) - .collect_vec(); - let null_matched = K::Bitmap::from_bool_vec(null_safe); - let need_degree_table_l = false; - let need_degree_table_r = false; - let (left_to_output, right_to_output) = { let (left_len, right_len) = if is_left_semi_or_anti(T) { (state_all_data_types_l.len(), 0usize) @@ -303,12 +276,8 @@ impl AsOfJoinExecutor state_all_data_types_l.clone(), state_table_l, params_l.deduped_pk_indices, - degree_join_key_indices_l, - degree_all_data_types_l, - degree_state_table_l, - degree_pk_indices_l, + None, null_matched.clone(), - need_degree_table_l, pk_contained_in_jk_l, inequal_key_idx_l, metrics.clone(), @@ -321,7 +290,7 @@ impl AsOfJoinExecutor i2o_mapping: left_to_output, i2o_mapping_indexed: l2o_indexed, start_pos: 0, - need_degree_table: need_degree_table_l, + need_degree_table: false, }, side_r: JoinSide { ht: JoinHashMap::new( @@ -331,12 +300,8 @@ impl AsOfJoinExecutor state_all_data_types_r.clone(), state_table_r, params_r.deduped_pk_indices, - degree_join_key_indices_r, - degree_all_data_types_r, - degree_state_table_r, - degree_pk_indices_r, + None, null_matched, - need_degree_table_r, pk_contained_in_jk_r, inequal_key_idx_r, metrics.clone(), @@ -349,7 +314,7 @@ impl AsOfJoinExecutor start_pos: side_l_column_n, i2o_mapping: right_to_output, i2o_mapping_indexed: r2o_indexed, - need_degree_table: need_degree_table_r, + need_degree_table: false, }, metrics, chunk_size, @@ -675,7 +640,7 @@ impl AsOfJoinExecutor { yield chunk; } - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { if let Some(matched_row_by_inequality) = matched_row_by_inequality { @@ -924,7 +889,7 @@ impl AsOfJoinExecutor match op { Op::Insert | Op::UpdateInsert => { - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { side_update.ht.delete_row(key, row)?; @@ -977,13 +942,13 @@ mod tests { order_types: &[OrderType], pk_indices: &[usize], table_id: u32, - ) -> (StateTable, StateTable) { + ) -> StateTable { let column_descs = data_types .iter() .enumerate() .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) .collect_vec(); - let state_table = StateTable::from_table_catalog( + StateTable::from_table_catalog( &gen_pbtable( TableId::new(table_id), column_descs, @@ -994,33 +959,7 @@ mod tests { mem_state.clone(), None, ) - .await; - - // Create degree table - let mut degree_table_column_descs = vec![]; - pk_indices.iter().enumerate().for_each(|(pk_id, idx)| { - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_id as i32), - data_types[*idx].clone(), - )) - }); - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_indices.len() as i32), - DataType::Int64, - )); - let degree_state_table = StateTable::from_table_catalog( - &gen_pbtable( - TableId::new(table_id + 1), - degree_table_column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - 0, - ), - mem_state, - None, - ) - .await; - (state_table, degree_state_table) + .await } async fn create_executor( @@ -1042,7 +981,7 @@ mod tests { let mem_state = MemoryStateStore::new(); - let (state_l, degree_state_l) = create_in_memory_state_table( + let state_l = create_in_memory_state_table( mem_state.clone(), &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1055,7 +994,7 @@ mod tests { ) .await; - let (state_r, degree_state_r) = create_in_memory_state_table( + let state_r = create_in_memory_state_table( mem_state, &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1064,7 +1003,7 @@ mod tests { OrderType::ascending(), ], &[0, asof_desc.right_idx, 1], - 2, + 1, ) .await; @@ -1089,9 +1028,7 @@ mod tests { vec![false], (0..schema_len).collect_vec(), state_l, - degree_state_l, state_r, - degree_state_r, Arc::new(AtomicU64::new(0)), Arc::new(StreamingMetrics::unused()), 1024, diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 2920d5799feb7..6c3e51f5ee55f 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -248,9 +248,6 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor { /// - Full Outer: both side /// - Left Outer/Semi/Anti: left side /// - Right Outer/Semi/Anti: right side - /// - Inner: None. - degree_state: TableInner, + /// - Inner: neither side. + /// + /// Should be set to `None` if `need_degree_table` was set to `false`. + degree_state: Option>, /// If degree table is need need_degree_table: bool, /// Pk is part of the join key. @@ -206,20 +208,27 @@ pub struct JoinHashMap { metrics: JoinHashMapMetrics, } -struct TableInner { +pub struct TableInner { /// Indices of the (cache) pk in a state row pk_indices: Vec, /// Indices of the join key in a state row join_key_indices: Vec, // This should be identical to the pk in state table. order_key_indices: Vec, - // This should be identical to the data types in table schema. - #[expect(dead_code)] - all_data_types: Vec, pub(crate) table: StateTable, } impl TableInner { + pub fn new(pk_indices: Vec, join_key_indices: Vec, table: StateTable) -> Self { + let order_key_indices = table.pk_indices().to_vec(); + Self { + pk_indices, + join_key_indices, + order_key_indices, + table, + } + } + fn error_context(&self, row: &impl Row) -> String { let pk = row.project(&self.pk_indices); let jk = row.project(&self.join_key_indices); @@ -243,12 +252,8 @@ impl JoinHashMap { state_all_data_types: Vec, state_table: StateTable, state_pk_indices: Vec, - degree_join_key_indices: Vec, - degree_all_data_types: Vec, - degree_table: StateTable, - degree_pk_indices: Vec, + degree_state: Option>, null_matched: K::Bitmap, - need_degree_table: bool, pk_contained_in_jk: bool, inequality_key_idx: Option, metrics: Arc, @@ -280,17 +285,10 @@ impl JoinHashMap { pk_indices: state_pk_indices, join_key_indices: state_join_key_indices, order_key_indices: state_table.pk_indices().to_vec(), - all_data_types: state_all_data_types, table: state_table, }; - let degree_state = TableInner { - pk_indices: degree_pk_indices, - join_key_indices: degree_join_key_indices, - order_key_indices: degree_table.pk_indices().to_vec(), - all_data_types: degree_all_data_types, - table: degree_table, - }; + let need_degree_table = degree_state.is_some(); let metrics_info = MetricsInfo::new( metrics.clone(), @@ -322,14 +320,19 @@ impl JoinHashMap { pub fn init(&mut self, epoch: EpochPair) { self.state.table.init_epoch(epoch); - self.degree_state.table.init_epoch(epoch); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.init_epoch(epoch); + } } /// Update the vnode bitmap and manipulate the cache if necessary. pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> bool { let (_previous_vnode_bitmap, cache_may_stale) = self.state.table.update_vnode_bitmap(vnode_bitmap.clone()); - let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap); + let _ = self + .degree_state + .as_mut() + .map(|degree_state| degree_state.table.update_vnode_bitmap(vnode_bitmap.clone())); if cache_may_stale { self.inner.clear(); @@ -341,7 +344,9 @@ impl JoinHashMap { pub fn update_watermark(&mut self, watermark: ScalarImpl) { // TODO: remove data in cache. self.state.table.update_watermark(watermark.clone()); - self.degree_state.table.update_watermark(watermark); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.update_watermark(watermark); + } } /// Take the state for the given `key` out of the hash table and return it. One **MUST** call @@ -380,11 +385,11 @@ impl JoinHashMap { self.state .table .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); - let degree_table_iter_fut = self.degree_state.table.iter_with_prefix( - &key, - sub_range, - PrefetchOptions::default(), - ); + let degree_state = self.degree_state.as_ref().unwrap(); + let degree_table_iter_fut = + degree_state + .table + .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; @@ -542,19 +547,22 @@ impl JoinHashMap { pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.metrics.flush(); self.state.table.commit(epoch).await?; - self.degree_state.table.commit(epoch).await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.commit(epoch).await?; + } Ok(()) } pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { self.state.table.try_flush().await?; - self.degree_state.table.try_flush().await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.try_flush().await?; + } Ok(()) } /// Insert a join row - #[allow(clippy::unused_async)] - pub async fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { + pub fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { let pk = self.serialize_pk_from_row(&value.row); let inequality_key = self @@ -581,42 +589,21 @@ impl JoinHashMap { } // Update the flush buffer. - let (row, degree) = value.to_table_rows(&self.state.order_key_indices); - self.state.table.insert(row); - self.degree_state.table.insert(degree); + if let Some(degree_state) = self.degree_state.as_mut() { + let (row, degree) = value.to_table_rows(&self.state.order_key_indices); + self.state.table.insert(row); + degree_state.table.insert(degree); + } else { + self.state.table.insert(value.row); + } Ok(()) } /// Insert a row. /// Used when the side does not need to update degree. - #[allow(clippy::unused_async)] - pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { + pub fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { let join_row = JoinRow::new(&value, 0); - let pk = self.serialize_pk_from_row(&value); - let inequality_key = self - .inequality_key_desc - .as_ref() - .map(|desc| desc.serialize_inequal_key_from_row(&value)); - // TODO(yuhao): avoid this `contains`. - // https://github.com/risingwavelabs/risingwave/issues/9233 - if self.inner.contains(key) { - // Update cache - let mut entry = self.inner.get_mut(key).unwrap(); - entry - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - } else if self.pk_contained_in_jk { - // Refill cache when the join key exist in neither cache or storage. - self.metrics.insert_cache_miss_count += 1; - let mut state = JoinEntryState::default(); - state - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - self.update_state(key, state.into()); - } - - // Update the flush buffer. - self.state.table.insert(value); + self.insert(key, join_row)?; Ok(()) } @@ -638,7 +625,8 @@ impl JoinHashMap { // If no cache maintained, only update the state table. let (row, degree) = value.to_table_rows(&self.state.order_key_indices); self.state.table.delete(row); - self.degree_state.table.delete(degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.delete(degree); Ok(()) } @@ -687,8 +675,8 @@ impl JoinHashMap { action(&mut join_row.degree); let new_degree = join_row.to_table_rows(&self.state.order_key_indices).1; - - self.degree_state.table.update(old_degree, new_degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.update(old_degree, new_degree); } /// Increment the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in diff --git a/src/stream/src/from_proto/asof_join.rs b/src/stream/src/from_proto/asof_join.rs index 3d74ac884b4f0..f82b4ed490117 100644 --- a/src/stream/src/from_proto/asof_join.rs +++ b/src/stream/src/from_proto/asof_join.rs @@ -44,10 +44,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let table_l = node.get_left_table()?; - let degree_table_l = node.get_left_degree_table()?; - let table_r = node.get_right_table()?; - let degree_table_r = node.get_right_degree_table()?; let params_l = JoinParams::new( node.get_left_key() @@ -84,14 +81,9 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let state_table_l = StateTable::from_table_catalog(table_l, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_l = - StateTable::from_table_catalog(degree_table_l, store.clone(), Some(vnodes.clone())) - .await; let state_table_r = StateTable::from_table_catalog(table_r, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_r = - StateTable::from_table_catalog(degree_table_r, store, Some(vnodes)).await; let join_type_proto = node.get_join_type()?; let as_of_desc_proto = node.get_asof_desc()?; @@ -107,9 +99,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { null_safe, output_indices, state_table_l, - degree_state_table_l, state_table_r, - degree_state_table_r, lru_manager: params.watermark_epoch, metrics: params.executor_stats, join_type_proto, @@ -138,9 +128,7 @@ struct AsOfJoinExecutorDispatcherArgs { null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, lru_manager: AtomicU64Ref, metrics: Arc, join_type_proto: JoinTypeProto, @@ -167,9 +155,7 @@ impl HashKeyDispatcher for AsOfJoinExecutorDispatcherArgs { self.null_safe, self.output_indices, self.state_table_l, - self.degree_state_table_l, self.state_table_r, - self.degree_state_table_r, self.lru_manager, self.metrics, self.chunk_size,