From 5b625f81b06217c4350a57b8457006e6b9bfdd8d Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Wed, 18 Sep 2024 18:26:02 +0800 Subject: [PATCH 1/6] refactor(stream): make `degree_state` field in `JoinHashMap` an Option (#18539) --- proto/stream_plan.proto | 14 +-- src/stream/src/executor/asof_join.rs | 87 +++------------- src/stream/src/executor/hash_join.rs | 52 +++++----- src/stream/src/executor/join/hash_join.rs | 116 ++++++++++------------ src/stream/src/from_proto/asof_join.rs | 14 --- 5 files changed, 93 insertions(+), 190 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index ca67737aeafe0..d6a6ae0ed67e9 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -463,22 +463,18 @@ message AsOfJoinNode { catalog.Table left_table = 4; // Used for internal table states. catalog.Table right_table = 5; - // Used for internal table states. - catalog.Table left_degree_table = 6; - // Used for internal table states. - catalog.Table right_degree_table = 7; // The output indices of current node - repeated uint32 output_indices = 8; + repeated uint32 output_indices = 6; // Left deduped input pk indices. The pk of the left_table and // The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices] // left_inequality_key is not used but for forward compatibility. - repeated uint32 left_deduped_input_pk_indices = 9; + repeated uint32 left_deduped_input_pk_indices = 7; // Right deduped input pk indices. // The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices] // right_inequality_key is not used but for forward compatibility. - repeated uint32 right_deduped_input_pk_indices = 10; - repeated bool null_safe = 11; - optional plan_common.AsOfJoinDesc asof_desc = 12; + repeated uint32 right_deduped_input_pk_indices = 8; + repeated bool null_safe = 9; + optional plan_common.AsOfJoinDesc asof_desc = 10; } message TemporalJoinNode { diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index b2cd81a04d3b4..74e55deca75fa 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -186,9 +186,7 @@ impl AsOfJoinExecutor null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, watermark_epoch: AtomicU64Ref, metrics: Arc, chunk_size: usize, @@ -219,22 +217,9 @@ impl AsOfJoinExecutor let state_pk_indices_l = input_l.pk_indices().to_vec(); let state_pk_indices_r = input_r.pk_indices().to_vec(); - let state_order_key_indices_l = state_table_l.pk_indices(); - let state_order_key_indices_r = state_table_r.pk_indices(); - let state_join_key_indices_l = params_l.join_key_indices; let state_join_key_indices_r = params_r.join_key_indices; - let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec(); - let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec(); - - let degree_pk_indices_l = (state_join_key_indices_l.len() - ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len()) - .collect_vec(); - let degree_pk_indices_r = (state_join_key_indices_r.len() - ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len()) - .collect_vec(); - // If pk is contained in join key. let pk_contained_in_jk_l = is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone()); @@ -253,20 +238,8 @@ impl AsOfJoinExecutor assert_eq!(join_key_data_types_l, join_key_data_types_r); - let degree_all_data_types_l = state_order_key_indices_l - .iter() - .map(|idx| state_all_data_types_l[*idx].clone()) - .collect_vec(); - let degree_all_data_types_r = state_order_key_indices_r - .iter() - .map(|idx| state_all_data_types_r[*idx].clone()) - .collect_vec(); - let null_matched = K::Bitmap::from_bool_vec(null_safe); - let need_degree_table_l = false; - let need_degree_table_r = false; - let (left_to_output, right_to_output) = { let (left_len, right_len) = if is_left_semi_or_anti(T) { (state_all_data_types_l.len(), 0usize) @@ -303,12 +276,8 @@ impl AsOfJoinExecutor state_all_data_types_l.clone(), state_table_l, params_l.deduped_pk_indices, - degree_join_key_indices_l, - degree_all_data_types_l, - degree_state_table_l, - degree_pk_indices_l, + None, null_matched.clone(), - need_degree_table_l, pk_contained_in_jk_l, inequal_key_idx_l, metrics.clone(), @@ -321,7 +290,7 @@ impl AsOfJoinExecutor i2o_mapping: left_to_output, i2o_mapping_indexed: l2o_indexed, start_pos: 0, - need_degree_table: need_degree_table_l, + need_degree_table: false, }, side_r: JoinSide { ht: JoinHashMap::new( @@ -331,12 +300,8 @@ impl AsOfJoinExecutor state_all_data_types_r.clone(), state_table_r, params_r.deduped_pk_indices, - degree_join_key_indices_r, - degree_all_data_types_r, - degree_state_table_r, - degree_pk_indices_r, + None, null_matched, - need_degree_table_r, pk_contained_in_jk_r, inequal_key_idx_r, metrics.clone(), @@ -349,7 +314,7 @@ impl AsOfJoinExecutor start_pos: side_l_column_n, i2o_mapping: right_to_output, i2o_mapping_indexed: r2o_indexed, - need_degree_table: need_degree_table_r, + need_degree_table: false, }, metrics, chunk_size, @@ -675,7 +640,7 @@ impl AsOfJoinExecutor { yield chunk; } - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { if let Some(matched_row_by_inequality) = matched_row_by_inequality { @@ -924,7 +889,7 @@ impl AsOfJoinExecutor match op { Op::Insert | Op::UpdateInsert => { - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { side_update.ht.delete_row(key, row)?; @@ -977,13 +942,13 @@ mod tests { order_types: &[OrderType], pk_indices: &[usize], table_id: u32, - ) -> (StateTable, StateTable) { + ) -> StateTable { let column_descs = data_types .iter() .enumerate() .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) .collect_vec(); - let state_table = StateTable::from_table_catalog( + StateTable::from_table_catalog( &gen_pbtable( TableId::new(table_id), column_descs, @@ -994,33 +959,7 @@ mod tests { mem_state.clone(), None, ) - .await; - - // Create degree table - let mut degree_table_column_descs = vec![]; - pk_indices.iter().enumerate().for_each(|(pk_id, idx)| { - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_id as i32), - data_types[*idx].clone(), - )) - }); - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_indices.len() as i32), - DataType::Int64, - )); - let degree_state_table = StateTable::from_table_catalog( - &gen_pbtable( - TableId::new(table_id + 1), - degree_table_column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - 0, - ), - mem_state, - None, - ) - .await; - (state_table, degree_state_table) + .await } async fn create_executor( @@ -1042,7 +981,7 @@ mod tests { let mem_state = MemoryStateStore::new(); - let (state_l, degree_state_l) = create_in_memory_state_table( + let state_l = create_in_memory_state_table( mem_state.clone(), &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1055,7 +994,7 @@ mod tests { ) .await; - let (state_r, degree_state_r) = create_in_memory_state_table( + let state_r = create_in_memory_state_table( mem_state, &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1064,7 +1003,7 @@ mod tests { OrderType::ascending(), ], &[0, asof_desc.right_idx, 1], - 2, + 1, ) .await; @@ -1089,9 +1028,7 @@ mod tests { vec![false], (0..schema_len).collect_vec(), state_l, - degree_state_l, state_r, - degree_state_r, Arc::new(AtomicU64::new(0)), Arc::new(StreamingMetrics::unused()), 1024, diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 2920d5799feb7..6c3e51f5ee55f 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -248,9 +248,6 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor { /// - Full Outer: both side /// - Left Outer/Semi/Anti: left side /// - Right Outer/Semi/Anti: right side - /// - Inner: None. - degree_state: TableInner, + /// - Inner: neither side. + /// + /// Should be set to `None` if `need_degree_table` was set to `false`. + degree_state: Option>, /// If degree table is need need_degree_table: bool, /// Pk is part of the join key. @@ -206,20 +208,27 @@ pub struct JoinHashMap { metrics: JoinHashMapMetrics, } -struct TableInner { +pub struct TableInner { /// Indices of the (cache) pk in a state row pk_indices: Vec, /// Indices of the join key in a state row join_key_indices: Vec, // This should be identical to the pk in state table. order_key_indices: Vec, - // This should be identical to the data types in table schema. - #[expect(dead_code)] - all_data_types: Vec, pub(crate) table: StateTable, } impl TableInner { + pub fn new(pk_indices: Vec, join_key_indices: Vec, table: StateTable) -> Self { + let order_key_indices = table.pk_indices().to_vec(); + Self { + pk_indices, + join_key_indices, + order_key_indices, + table, + } + } + fn error_context(&self, row: &impl Row) -> String { let pk = row.project(&self.pk_indices); let jk = row.project(&self.join_key_indices); @@ -243,12 +252,8 @@ impl JoinHashMap { state_all_data_types: Vec, state_table: StateTable, state_pk_indices: Vec, - degree_join_key_indices: Vec, - degree_all_data_types: Vec, - degree_table: StateTable, - degree_pk_indices: Vec, + degree_state: Option>, null_matched: K::Bitmap, - need_degree_table: bool, pk_contained_in_jk: bool, inequality_key_idx: Option, metrics: Arc, @@ -280,17 +285,10 @@ impl JoinHashMap { pk_indices: state_pk_indices, join_key_indices: state_join_key_indices, order_key_indices: state_table.pk_indices().to_vec(), - all_data_types: state_all_data_types, table: state_table, }; - let degree_state = TableInner { - pk_indices: degree_pk_indices, - join_key_indices: degree_join_key_indices, - order_key_indices: degree_table.pk_indices().to_vec(), - all_data_types: degree_all_data_types, - table: degree_table, - }; + let need_degree_table = degree_state.is_some(); let metrics_info = MetricsInfo::new( metrics.clone(), @@ -322,14 +320,19 @@ impl JoinHashMap { pub fn init(&mut self, epoch: EpochPair) { self.state.table.init_epoch(epoch); - self.degree_state.table.init_epoch(epoch); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.init_epoch(epoch); + } } /// Update the vnode bitmap and manipulate the cache if necessary. pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> bool { let (_previous_vnode_bitmap, cache_may_stale) = self.state.table.update_vnode_bitmap(vnode_bitmap.clone()); - let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap); + let _ = self + .degree_state + .as_mut() + .map(|degree_state| degree_state.table.update_vnode_bitmap(vnode_bitmap.clone())); if cache_may_stale { self.inner.clear(); @@ -341,7 +344,9 @@ impl JoinHashMap { pub fn update_watermark(&mut self, watermark: ScalarImpl) { // TODO: remove data in cache. self.state.table.update_watermark(watermark.clone()); - self.degree_state.table.update_watermark(watermark); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.update_watermark(watermark); + } } /// Take the state for the given `key` out of the hash table and return it. One **MUST** call @@ -380,11 +385,11 @@ impl JoinHashMap { self.state .table .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); - let degree_table_iter_fut = self.degree_state.table.iter_with_prefix( - &key, - sub_range, - PrefetchOptions::default(), - ); + let degree_state = self.degree_state.as_ref().unwrap(); + let degree_table_iter_fut = + degree_state + .table + .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; @@ -542,19 +547,22 @@ impl JoinHashMap { pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.metrics.flush(); self.state.table.commit(epoch).await?; - self.degree_state.table.commit(epoch).await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.commit(epoch).await?; + } Ok(()) } pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { self.state.table.try_flush().await?; - self.degree_state.table.try_flush().await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.try_flush().await?; + } Ok(()) } /// Insert a join row - #[allow(clippy::unused_async)] - pub async fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { + pub fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { let pk = self.serialize_pk_from_row(&value.row); let inequality_key = self @@ -581,42 +589,21 @@ impl JoinHashMap { } // Update the flush buffer. - let (row, degree) = value.to_table_rows(&self.state.order_key_indices); - self.state.table.insert(row); - self.degree_state.table.insert(degree); + if let Some(degree_state) = self.degree_state.as_mut() { + let (row, degree) = value.to_table_rows(&self.state.order_key_indices); + self.state.table.insert(row); + degree_state.table.insert(degree); + } else { + self.state.table.insert(value.row); + } Ok(()) } /// Insert a row. /// Used when the side does not need to update degree. - #[allow(clippy::unused_async)] - pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { + pub fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { let join_row = JoinRow::new(&value, 0); - let pk = self.serialize_pk_from_row(&value); - let inequality_key = self - .inequality_key_desc - .as_ref() - .map(|desc| desc.serialize_inequal_key_from_row(&value)); - // TODO(yuhao): avoid this `contains`. - // https://github.com/risingwavelabs/risingwave/issues/9233 - if self.inner.contains(key) { - // Update cache - let mut entry = self.inner.get_mut(key).unwrap(); - entry - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - } else if self.pk_contained_in_jk { - // Refill cache when the join key exist in neither cache or storage. - self.metrics.insert_cache_miss_count += 1; - let mut state = JoinEntryState::default(); - state - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - self.update_state(key, state.into()); - } - - // Update the flush buffer. - self.state.table.insert(value); + self.insert(key, join_row)?; Ok(()) } @@ -638,7 +625,8 @@ impl JoinHashMap { // If no cache maintained, only update the state table. let (row, degree) = value.to_table_rows(&self.state.order_key_indices); self.state.table.delete(row); - self.degree_state.table.delete(degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.delete(degree); Ok(()) } @@ -687,8 +675,8 @@ impl JoinHashMap { action(&mut join_row.degree); let new_degree = join_row.to_table_rows(&self.state.order_key_indices).1; - - self.degree_state.table.update(old_degree, new_degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.update(old_degree, new_degree); } /// Increment the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in diff --git a/src/stream/src/from_proto/asof_join.rs b/src/stream/src/from_proto/asof_join.rs index 3d74ac884b4f0..f82b4ed490117 100644 --- a/src/stream/src/from_proto/asof_join.rs +++ b/src/stream/src/from_proto/asof_join.rs @@ -44,10 +44,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let table_l = node.get_left_table()?; - let degree_table_l = node.get_left_degree_table()?; - let table_r = node.get_right_table()?; - let degree_table_r = node.get_right_degree_table()?; let params_l = JoinParams::new( node.get_left_key() @@ -84,14 +81,9 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let state_table_l = StateTable::from_table_catalog(table_l, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_l = - StateTable::from_table_catalog(degree_table_l, store.clone(), Some(vnodes.clone())) - .await; let state_table_r = StateTable::from_table_catalog(table_r, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_r = - StateTable::from_table_catalog(degree_table_r, store, Some(vnodes)).await; let join_type_proto = node.get_join_type()?; let as_of_desc_proto = node.get_asof_desc()?; @@ -107,9 +99,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { null_safe, output_indices, state_table_l, - degree_state_table_l, state_table_r, - degree_state_table_r, lru_manager: params.watermark_epoch, metrics: params.executor_stats, join_type_proto, @@ -138,9 +128,7 @@ struct AsOfJoinExecutorDispatcherArgs { null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, lru_manager: AtomicU64Ref, metrics: Arc, join_type_proto: JoinTypeProto, @@ -167,9 +155,7 @@ impl HashKeyDispatcher for AsOfJoinExecutorDispatcherArgs { self.null_safe, self.output_indices, self.state_table_l, - self.degree_state_table_l, self.state_table_r, - self.degree_state_table_r, self.lru_manager, self.metrics, self.chunk_size, From ff479f60996f9ac61b0d8b5ddf40370c8b0217f8 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 18 Sep 2024 20:36:50 +0800 Subject: [PATCH 2/6] refactor(storage): remove legacy delta type `GroupMetaChange` and `GroupTableChange` (#18585) --- proto/hummock.proto | 21 ++----- .../compaction_group/hummock_version_ext.rs | 60 +------------------ src/storage/hummock_sdk/src/version.rs | 33 +--------- 3 files changed, 8 insertions(+), 106 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index dc24870972566..0eb6aae8d5124 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -87,21 +87,6 @@ message GroupConstruct { CompatibilityVersion version = 6; } -message GroupMetaChange { - option deprecated = true; - repeated uint32 table_ids_add = 1 [deprecated = true]; - repeated uint32 table_ids_remove = 2 [deprecated = true]; -} - -message GroupTableChange { - option deprecated = true; - repeated uint32 table_ids = 1 [deprecated = true]; - uint64 target_group_id = 2; - uint64 origin_group_id = 3; - uint64 new_sst_start_id = 4; - CompatibilityVersion version = 5; -} - message GroupDestroy {} message GroupMerge { @@ -110,12 +95,14 @@ message GroupMerge { } message GroupDelta { + reserved 4; + reserved "group_meta_change"; + reserved 5; + reserved "group_table_change"; oneof delta_type { IntraLevelDelta intra_level = 1; GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; - GroupMetaChange group_meta_change = 4 [deprecated = true]; - GroupTableChange group_table_change = 5 [deprecated = true]; GroupMerge group_merge = 6; } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 376626e844242..c1d566b995a1c 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,8 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange, - GroupTableChange, PbLevelType, + CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType, }; use tracing::warn; @@ -49,8 +48,6 @@ pub struct GroupDeltasSummary { pub insert_table_infos: Vec, pub group_construct: Option, pub group_destroy: Option, - pub group_meta_changes: Vec, - pub group_table_change: Option, pub new_vnode_partition_count: u32, pub group_merge: Option, } @@ -66,8 +63,6 @@ pub fn summarize_group_deltas( let mut insert_table_infos = vec![]; let mut group_construct = None; let mut group_destroy = None; - let mut group_meta_changes = vec![]; - let mut group_table_change = None; let mut new_vnode_partition_count = 0; let mut group_merge = None; @@ -93,12 +88,6 @@ pub fn summarize_group_deltas( assert!(group_destroy.is_none()); group_destroy = Some(compaction_group_id); } - GroupDelta::GroupMetaChange(meta_delta) => { - group_meta_changes.push(meta_delta.clone()); - } - GroupDelta::GroupTableChange(meta_delta) => { - group_table_change = Some(meta_delta.clone()); - } GroupDelta::GroupMerge(merge_delta) => { assert!(group_merge.is_none()); group_merge = Some(*merge_delta); @@ -118,8 +107,6 @@ pub fn summarize_group_deltas( insert_table_infos, group_construct, group_destroy, - group_meta_changes, - group_table_change, new_vnode_partition_count, group_merge, } @@ -404,8 +391,7 @@ impl HummockVersion { } } return; - } - if !self.levels.contains_key(&parent_group_id) { + } else if !self.levels.contains_key(&parent_group_id) { warn!(parent_group_id, "non-existing parent group id to init from"); return; } @@ -617,38 +603,6 @@ impl HummockVersion { member_table_ids, group_construct.get_new_sst_start_id(), ); - } else if let Some(group_change) = &summary.group_table_change { - // TODO: may deprecate this branch? This enum variant is not created anywhere - assert!( - group_change.version <= CompatibilityVersion::NoTrivialSplit as _, - "DeltaType::GroupTableChange is not used anymore after CompatibilityVersion::NoMemberTableIds is added" - ); - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - self.init_with_parent_group( - group_change.origin_group_id, - group_change.target_group_id, - BTreeSet::from_iter(group_change.table_ids.clone()), - group_change.new_sst_start_id, - ); - - let levels = self - .levels - .get_mut(&group_change.origin_group_id) - .expect("compaction group should exist"); - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - let mut moving_tables = levels - .member_table_ids - .extract_if(|t| group_change.table_ids.contains(t)) - .collect_vec(); - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - self.levels - .get_mut(compaction_group_id) - .expect("compaction group should exist") - .member_table_ids - .append(&mut moving_tables); } else if let Some(group_merge) = &summary.group_merge { tracing::info!( "group_merge left {:?} right {:?}", @@ -662,16 +616,6 @@ impl HummockVersion { let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { panic!("compaction group {} does not exist", compaction_group_id) }); - #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - for group_meta_delta in &summary.group_meta_changes { - levels - .member_table_ids - .extend(group_meta_delta.table_ids_add.clone()); - levels - .member_table_ids - .retain(|t| !group_meta_delta.table_ids_remove.contains(t)); - levels.member_table_ids.sort(); - } assert!( visible_table_committed_epoch <= version_delta.visible_table_committed_epoch(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 4aecfcde0cf48..fe2825cc8ad0c 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -25,8 +25,8 @@ use risingwave_pb::hummock::group_delta::PbDeltaType; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, - PbGroupMetaChange, PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, - PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, StateTableInfo, StateTableInfoDelta, + PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, + StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -924,11 +924,6 @@ pub enum GroupDeltaCommon { IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), - GroupMetaChange(PbGroupMetaChange), - - #[allow(dead_code)] - GroupTableChange(PbGroupTableChange), - GroupMerge(PbGroupMerge), } @@ -949,12 +944,6 @@ where Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { GroupDeltaCommon::GroupDestroy(pb_group_destroy) } - Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => { - GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) - } - Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { - GroupDeltaCommon::GroupTableChange(pb_group_table_change) - } Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(pb_group_merge) } @@ -978,12 +967,6 @@ where GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)), }, - GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)), - }, - GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change)), - }, GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, @@ -1006,12 +989,6 @@ where GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)), }, - GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change.clone())), - }, - GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change.clone())), - }, GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, @@ -1034,12 +1011,6 @@ where Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { GroupDeltaCommon::GroupDestroy(*pb_group_destroy) } - Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => { - GroupDeltaCommon::GroupMetaChange(pb_group_meta_change.clone()) - } - Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { - GroupDeltaCommon::GroupTableChange(pb_group_table_change.clone()) - } Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(*pb_group_merge) } From 0f1984a58cdb318158c707723057eeded71bcc09 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 18 Sep 2024 22:13:55 +0800 Subject: [PATCH 3/6] chore(scale): Report an error instead of asserting for unsatisfied conditions in the plan. (#18589) Signed-off-by: Shanicky Chen --- src/meta/src/stream/scale.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 1cefc9350b93a..3c73b947d830a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1603,7 +1603,9 @@ impl ScaleController { for (worker_id, n) in decreased_actor_count { if let Some(actor_ids) = worker_to_actors.get(worker_id) { - assert!(actor_ids.len() >= n); + if actor_ids.len() < n { + bail!("plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",fragment_id, worker_id, actor_ids.len(), n); + } let removed_actors: Vec<_> = actor_ids .iter() From d353d1332795c3c46d80f9b2950169029e6859d8 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 19 Sep 2024 10:19:15 +0800 Subject: [PATCH 4/6] feat: improve error of building key encoder (#18563) --- risedev.yml | 3 +++ src/connector/src/sink/encoder/avro.rs | 22 +++++++++++----------- src/connector/src/sink/encoder/mod.rs | 2 +- src/connector/src/sink/encoder/proto.rs | 8 ++++---- src/connector/src/sink/formatter/mod.rs | 10 +++++++--- src/connector/src/sink/mod.rs | 2 +- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/risedev.yml b/risedev.yml index ce04cea773cac..de87b5acea4ee 100644 --- a/risedev.yml +++ b/risedev.yml @@ -59,6 +59,9 @@ profile: # - use: kafka # persist-data: true + # To enable Confluent schema registry, uncomment the following line + # - use: schema-registry + default-v6: steps: - use: meta-node diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 1a9218572814f..0826e3d421d41 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -652,7 +652,7 @@ mod tests { } ] }"#, - "encode q error: avro name ref unsupported yet", + "encode 'q' error: avro name ref unsupported yet", ); test_err( @@ -663,7 +663,7 @@ mod tests { i64::MAX, ))), r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#, - "encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration", + "encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration", ); let avro_schema = AvroSchema::parse_str( @@ -738,7 +738,7 @@ mod tests { }; assert_eq!( err.to_string(), - "Encode error: encode req error: field not present but required" + "Encode error: encode 'req' error: field not present but required" ); let schema = Schema::new(vec![ @@ -751,7 +751,7 @@ mod tests { }; assert_eq!( err.to_string(), - "Encode error: encode extra error: field not in avro" + "Encode error: encode 'extra' error: field not in avro" ); let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap(); @@ -761,14 +761,14 @@ mod tests { }; assert_eq!( err.to_string(), - r#"Encode error: encode error: expect avro record but got ["null","long"]"# + r#"Encode error: encode '' error: expect avro record but got ["null","long"]"# ); test_err( &DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])), (), r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#, - "encode f0 error: cannot encode boolean column as \"int\" field", + "encode 'f0' error: cannot encode boolean column as \"int\" field", ); } @@ -790,7 +790,7 @@ mod tests { &DataType::List(DataType::Int32.into()), Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(), avro_schema, - "encode error: found null but required", + "encode '' error: found null but required", ); test_ok( @@ -829,7 +829,7 @@ mod tests { &DataType::List(DataType::Boolean.into()), (), r#"{"type": "array", "items": "int"}"#, - "encode error: cannot encode boolean column as \"int\" field", + "encode '' error: cannot encode boolean column as \"int\" field", ); } @@ -863,14 +863,14 @@ mod tests { t, datum.to_datum_ref(), both, - r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#, + r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#, ); test_err( t, datum.to_datum_ref(), empty, - "encode error: cannot encode timestamp with time zone column as [] field", + "encode '' error: cannot encode timestamp with time zone column as [] field", ); test_ok( @@ -879,7 +879,7 @@ mod tests { one, Value::Union(0, Value::TimestampMillis(1).into()), ); - test_err(t, None, one, "encode error: found null but required"); + test_err(t, None, one, "encode '' error: found null but required"); test_ok( t, diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 0a8a9e5abce73..dc00a89143c57 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError { write!( f, - "encode {} error: {}", + "encode '{}' error: {}", self.rev_path.iter().rev().join("."), self.message ) diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 88fb445b1c3ba..a488526be7bfb 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -529,7 +529,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode repeated_int_field error: cannot encode integer[] column as int32 field" + "encode 'repeated_int_field' error: cannot encode integer[] column as int32 field" ); let schema = Schema::new(vec![Field::with_name( @@ -554,7 +554,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode repeated_int_field error: array containing null not allowed as repeated field" + "encode 'repeated_int_field' error: array containing null not allowed as repeated field" ); } @@ -573,7 +573,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode not_exists error: field not in proto" + "encode 'not_exists' error: field not in proto" ); let err = validate_fields( @@ -583,7 +583,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - "encode map_field error: field not in proto" + "encode 'map_field' error: field not in proto" ); } } diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 9ac7d2114e458..bb0a41f63c33d 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use risingwave_common::array::StreamChunk; use crate::sink::{Result, SinkError}; @@ -279,8 +279,12 @@ impl FormatterBuild for AppendOnlyFormatter< impl FormatterBuild for UpsertFormatter { async fn build(b: FormatterParams<'_>) -> Result { - let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?; - let val_encoder = VE::build(b.builder, None).await?; + let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)) + .await + .with_context(|| "Failed to build key encoder")?; + let val_encoder = VE::build(b.builder, None) + .await + .with_context(|| "Failed to build value encoder")?; Ok(UpsertFormatter::new(key_encoder, val_encoder)) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index b453af53cca41..4711dd1a3bdb6 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -644,7 +644,7 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), - #[error("Internal error: {0}")] + #[error(transparent)] Internal( #[from] #[backtrace] From 57f828500e6343efc7a7e7bafd080b8cc8a7295f Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Thu, 19 Sep 2024 14:18:38 +0800 Subject: [PATCH 5/6] fix: fix message_queue image tag in docker compose (#18602) --- ci/docker-compose.yml | 2 +- docker/docker-compose-distributed-etcd.yml | 2 +- docker/docker-compose-distributed.yml | 2 +- docker/docker-compose-etcd.yml | 2 +- docker/docker-compose-with-hdfs.yml | 2 +- docker/docker-compose-with-sqlite.yml | 2 +- docker/docker-compose.yml | 2 +- risedev.yml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 11d29d7236367..d89459bc02865 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -34,7 +34,7 @@ services: retries: 5 message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose-distributed-etcd.yml b/docker/docker-compose-distributed-etcd.yml index 5fbfcf11e461c..626ee607ae688 100644 --- a/docker/docker-compose-distributed-etcd.yml +++ b/docker/docker-compose-distributed-etcd.yml @@ -329,7 +329,7 @@ services: retries: 5 restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose-distributed.yml b/docker/docker-compose-distributed.yml index 6eea5a1a1fb37..1aa8306b4a4c4 100644 --- a/docker/docker-compose-distributed.yml +++ b/docker/docker-compose-distributed.yml @@ -299,7 +299,7 @@ services: retries: 5 restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose-etcd.yml b/docker/docker-compose-etcd.yml index f44646f49768e..90c33fff03204 100644 --- a/docker/docker-compose-etcd.yml +++ b/docker/docker-compose-etcd.yml @@ -228,7 +228,7 @@ services: restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose-with-hdfs.yml b/docker/docker-compose-with-hdfs.yml index fc0fbb4067276..c8616e07a0853 100644 --- a/docker/docker-compose-with-hdfs.yml +++ b/docker/docker-compose-with-hdfs.yml @@ -275,7 +275,7 @@ services: retries: 5 restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose-with-sqlite.yml b/docker/docker-compose-with-sqlite.yml index 0dcdb6c11a814..c9b4e53576062 100644 --- a/docker/docker-compose-with-sqlite.yml +++ b/docker/docker-compose-with-sqlite.yml @@ -177,7 +177,7 @@ services: restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 781e3e9a476f0..1197a4b3912fd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -199,7 +199,7 @@ services: restart: always message_queue: - image: "docker.vectorized.io/vectorized/redpanda:latest" + image: "redpandadata/redpanda:latest" command: - redpanda - start diff --git a/risedev.yml b/risedev.yml index de87b5acea4ee..96443a7c0d6e7 100644 --- a/risedev.yml +++ b/risedev.yml @@ -1144,7 +1144,7 @@ compose: risingwave: "ghcr.io/risingwavelabs/risingwave:latest" prometheus: "prom/prometheus:latest" minio: "quay.io/minio/minio:latest" - redpanda: "docker.vectorized.io/vectorized/redpanda:latest" + redpanda: "redpandadata/redpanda:latest" grafana: "grafana/grafana-oss:latest" etcd: "quay.io/coreos/etcd:latest" tempo: "grafana/tempo:latest" From c7b6fff8a9328eedbae145697ab2eac0d1d84b89 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 19 Sep 2024 01:26:21 -0500 Subject: [PATCH 6/6] fix(catalog): add views' dependencies in `rw_depend` (#18596) --- e2e_test/batch/catalog/rw_depend.slt.part | 7 +++++++ src/meta/src/controller/catalog.rs | 22 ++++++++++++++++++++++ src/meta/src/manager/catalog/mod.rs | 8 ++++++++ 3 files changed, 37 insertions(+) diff --git a/e2e_test/batch/catalog/rw_depend.slt.part b/e2e_test/batch/catalog/rw_depend.slt.part index a41a9721365f3..40c781b148727 100644 --- a/e2e_test/batch/catalog/rw_depend.slt.part +++ b/e2e_test/batch/catalog/rw_depend.slt.part @@ -16,6 +16,9 @@ create materialized view mv1 as select t1.a from t1 join s1 on t1.a = s1.a; statement ok create materialized view mv2 as select * from mv1; +statement ok +create view v as select * from mv1; + statement ok create sink sink1 from mv2 with (connector='blackhole'); @@ -37,6 +40,7 @@ mv2 mv1 sink1 mv2 sink2 t1 t2 sink2 +v mv1 statement ok drop sink sink1; @@ -44,6 +48,9 @@ drop sink sink1; statement ok drop table t2 cascade; +statement ok +drop view v; + statement ok drop materialized view mv2; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index c9c3210dd6c67..11fe99771a4b8 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -674,6 +674,28 @@ impl CatalogController { }) .collect_vec(); + let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find() + .select_only() + .columns([ + object_dependency::Column::Oid, + object_dependency::Column::UsedBy, + ]) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object1.def(), + ) + .join(JoinType::InnerJoin, object::Relation::View.def()) + .into_tuple() + .all(&inner.db) + .await?; + + obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| { + PbObjectDependencies { + object_id: table_id as _, + referenced_object_id: view_id as _, + } + })); + let sink_dependencies: Vec<(SinkId, TableId)> = Sink::find() .select_only() .columns([sink::Column::SinkId, sink::Column::TargetTable]) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 4db6711862810..d139ec8e037cb 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -4217,6 +4217,14 @@ impl CatalogManager { } } } + for view in core.views.values() { + for referenced in &view.dependent_relations { + dependencies.push(PbObjectDependencies { + object_id: view.id, + referenced_object_id: *referenced, + }); + } + } for sink in core.sinks.values() { for referenced in &sink.dependent_relations { dependencies.push(PbObjectDependencies {