diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5b07ff5be5123..717724823a190 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -959,7 +959,7 @@ impl HummockManager { Some(task) => task, }; - let target_level_id = compact_task.input.target_level; + let target_level_id = compact_task.input.target_level as u32; let compression_algorithm = match compact_task.compression_algorithm.as_str() { "Lz4" => 1, @@ -975,15 +975,12 @@ impl HummockManager { watermark, sorted_output_ssts: vec![], task_id, - target_level: target_level_id as u32, + target_level: target_level_id, // only gc delete keys in last level because there may be older version in more bottom // level. - gc_delete_keys: target_level_id - == current_version - .get_compaction_group_levels(compaction_group_id) - .levels - .len() - - 1, + gc_delete_keys: current_version + .get_compaction_group_levels(compaction_group_id) + .is_last_level(target_level_id), base_level: compact_task.base_level as u32, task_status: TaskStatus::Pending as i32, compaction_group_id: group_config.group_id, diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index a6950c909e958..9e07598d07920 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -668,6 +668,13 @@ impl Levels { &mut self.levels[level_idx - 1] } + pub fn is_last_level(&self, level_idx: u32) -> bool { + self.levels + .last() + .as_ref() + .map_or(false, |level| level.level_idx == level_idx) + } + pub fn count_ssts(&self) -> usize { self.get_level0() .get_sub_levels() diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 7f3810ccb4c49..1a32053f544db 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1361,7 +1361,7 @@ pub(crate) mod tests { .last() .unwrap(); assert_eq!(1, output_level_info.table_infos.len()); - assert_eq!(254, output_level_info.table_infos[0].total_key_count); + assert_eq!(252, output_level_info.table_infos[0].total_key_count); } type KeyValue = (FullKey>, HummockValue>); diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 0ba4808b93624..1e0d7a21a0740 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -32,6 +32,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_expr::window_function::{ create_window_state, StateKey, WindowFuncCall, WindowStates, }; +use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode; use risingwave_storage::StateStore; use super::over_partition::{ @@ -324,7 +325,8 @@ impl OverWindowExecutor { } // `input pk` => `Record` - let mut key_change_update_buffer = BTreeMap::new(); + let mut key_change_update_buffer: BTreeMap, Record> = + BTreeMap::new(); let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()); @@ -382,7 +384,15 @@ impl OverWindowExecutor { yield chunk; } } - _ => panic!("other cases should not exist"), + (existed, record) => { + let vnode = this.state_table.compute_vnode_by_pk(&key.pk); + let raw_key = serialize_pk_with_vnode( + &key.pk, + this.state_table.pk_serde(), + vnode, + ); + panic!("other cases should not exist. raw_key: {:?}, existed: {:?}, new: {:?}", raw_key, existed, record); + } } } else { key_change_update_buffer.insert(pk, record);