Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Jan 18, 2024
1 parent f7a9ee8 commit 9af52c9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
8 changes: 6 additions & 2 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,10 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
///
/// # Examples:
/// ```
/// use bytes::Bytes;
/// use risingwave_common::catalog::TableId;
/// use risingwave_hummock_sdk::key::{FullKey, TableKey};
///
/// let table_id = TableId { table_id: 1 };
/// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5);
/// let a = FullKeyTracker::<Bytes>::new(full_key1);
Expand All @@ -961,10 +965,10 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
/// // a.observe(full_key_with_smaller_user_key);
///
/// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3);
/// assert_eq!(a.observe(full_key), None);
/// assert_eq!(a.observe(full_key1), None);
///
/// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4);
/// assert_eq!(a.observe(full_key), Some(full_key1));
/// assert_eq!(a.observe(full_key1), Some(full_key1));
/// ```
///
/// Return:
Expand Down
32 changes: 20 additions & 12 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use bytes::{Bytes, BytesMut};
use futures::future::try_join_all;
use futures::{stream, StreamExt, TryFutureExt};
use itertools::Itertools;
use more_asserts::debug_assert_lt;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -455,9 +456,18 @@ pub async fn merge_imms_in_memory(
if let Some(last_full_key) = full_key_tracker.observe(full_key)
&& !last_full_key.is_empty()
{
// Record kv entries
merged_payload.push((last_full_key.user_key.table_key, table_key_versions));

// Reset state before moving onto the new table key
table_key_versions = vec![];
table_key_last_delete_epoch = HummockEpoch::MAX;

// Record range tombstones if any
let target_extended_user_key =
PointRange::from_user_key(last_full_key.user_key.as_ref(), false);
let target_extended_user_key = PointRange::from_user_key(
full_key_tracker.latest_full_key.user_key.as_ref(),
false,
);
while del_iter.is_valid() && del_iter.key().le(&target_extended_user_key) {
let event_key = del_iter.key().to_vec();
del_iter.next().await?;
Expand All @@ -466,23 +476,21 @@ pub async fn merge_imms_in_memory(
new_epoch: del_iter.earliest_epoch(),
});
}

// Record kv entries
merged_payload.push((last_full_key.user_key.table_key, table_key_versions));

// Reset state before moving onto the new table key
table_key_versions = vec![];
table_key_last_delete_epoch = HummockEpoch::MAX;
}
let earliest_range_delete_which_can_see_key =
del_iter.earliest_delete_since(epoch_with_gap.pure_epoch());
if value.is_delete() {
table_key_last_delete_epoch = epoch_with_gap.pure_epoch();
} else if earliest_range_delete_which_can_see_key < table_key_last_delete_epoch {
debug_assert!(
epoch_with_gap.pure_epoch() < earliest_range_delete_which_can_see_key
&& earliest_range_delete_which_can_see_key < table_key_last_delete_epoch
debug_assert_lt!(
epoch_with_gap.pure_epoch(),
earliest_range_delete_which_can_see_key
);
debug_assert_lt!(
earliest_range_delete_which_can_see_key,
table_key_last_delete_epoch
);

table_key_last_delete_epoch = earliest_range_delete_which_can_see_key;
// In each merged immutable memtable, since a union set of delete ranges is constructed
// and thus original delete ranges are replaced with the union set and not
Expand Down

0 comments on commit 9af52c9

Please sign in to comment.