Skip to content

Commit

Permalink
fix(hummock): fix gc delete keys incorrectly (#14751)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Jan 24, 2024
1 parent 38785f7 commit 7b900e0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
13 changes: 5 additions & 8 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ impl HummockManager {
Some(task) => task,
};

let target_level_id = compact_task.input.target_level;
let target_level_id = compact_task.input.target_level as u32;

let compression_algorithm = match compact_task.compression_algorithm.as_str() {
"Lz4" => 1,
Expand All @@ -980,15 +980,12 @@ impl HummockManager {
watermark,
sorted_output_ssts: vec![],
task_id,
target_level: target_level_id as u32,
target_level: target_level_id,
// only gc delete keys in last level because there may be older version in more bottom
// level.
gc_delete_keys: target_level_id
== current_version
.get_compaction_group_levels(compaction_group_id)
.levels
.len()
- 1,
gc_delete_keys: current_version
.get_compaction_group_levels(compaction_group_id)
.is_last_level(target_level_id),
base_level: compact_task.base_level as u32,
task_status: TaskStatus::Pending as i32,
compaction_group_id: group_config.group_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,13 @@ impl Levels {
&mut self.levels[level_idx - 1]
}

pub fn is_last_level(&self, level_idx: u32) -> bool {
self.levels
.last()
.as_ref()
.map_or(false, |level| level.level_idx == level_idx)
}

pub fn count_ssts(&self) -> usize {
self.get_level0()
.get_sub_levels()
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ pub(crate) mod tests {
.last()
.unwrap();
assert_eq!(1, output_level_info.table_infos.len());
assert_eq!(254, output_level_info.table_infos[0].total_key_count);
assert_eq!(252, output_level_info.table_infos[0].total_key_count);
}

type KeyValue = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>);
Expand Down
14 changes: 12 additions & 2 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::window_function::{
create_window_state, StateKey, WindowFuncCall, WindowStates,
};
use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode;
use risingwave_storage::StateStore;

use super::over_partition::{
Expand Down Expand Up @@ -324,7 +325,8 @@ impl<S: StateStore> OverWindowExecutor<S> {
}

// `input pk` => `Record`
let mut key_change_update_buffer = BTreeMap::new();
let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
BTreeMap::new();
let mut chunk_builder =
StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types());

Expand Down Expand Up @@ -382,7 +384,15 @@ impl<S: StateStore> OverWindowExecutor<S> {
yield chunk;
}
}
_ => panic!("other cases should not exist"),
(existed, record) => {
let vnode = this.state_table.compute_vnode_by_pk(&key.pk);
let raw_key = serialize_pk_with_vnode(
&key.pk,
this.state_table.pk_serde(),
vnode,
);
panic!("other cases should not exist. raw_key: {:?}, existed: {:?}, new: {:?}", raw_key, existed, record);
}
}
} else {
key_change_update_buffer.insert(pk, record);
Expand Down

0 comments on commit 7b900e0

Please sign in to comment.