Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add assertion to ensure monotonically decreasing user key epoch #14488

Merged
merged 11 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,15 @@ impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
}
}

impl<'a, T> From<FullKey<&'a [u8]>> for FullKey<T>
where
T: AsRef<[u8]> + CopyFromSlice,
{
fn from(value: FullKey<&'a [u8]>) -> Self {
value.copy_into()
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PointRange<T: AsRef<[u8]>> {
// When comparing `PointRange`, we first compare `left_user_key`, then
Expand Down Expand Up @@ -921,6 +930,98 @@ pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
(start, end)
}

pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq> {
pub latest_full_key: FullKey<T>,
last_observed_epoch_with_gap: EpochWithGap,
}

impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
pub fn new(init_full_key: FullKey<T>) -> Self {
let epoch_with_gap = init_full_key.epoch_with_gap;
Self {
latest_full_key: init_full_key,
last_observed_epoch_with_gap: epoch_with_gap,
}
}

/// Check and observe a new full key during iteration
///
/// # Examples:
/// ```
/// use bytes::Bytes;
/// use risingwave_common::catalog::TableId;
/// use risingwave_common::util::epoch::EPOCH_AVAILABLE_BITS;
/// use risingwave_hummock_sdk::EpochWithGap;
/// use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, TableKey};
///
/// let table_id = TableId { table_id: 1 };
/// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5 << EPOCH_AVAILABLE_BITS);
/// let mut a = FullKeyTracker::<Bytes>::new(full_key1.clone());
///
/// // Panic on non-decreasing epoch observed for the same user key.
/// // let full_key_with_larger_epoch = FullKey::new(table_id, TableKey(Bytes::from("c")), 6 << EPOCH_AVAILABLE_BITS);
/// // a.observe(full_key_with_larger_epoch);
///
/// // Panic on non-increasing user key observed.
/// // let full_key_with_smaller_user_key = FullKey::new(table_id, TableKey(Bytes::from("b")), 3 << EPOCH_AVAILABLE_BITS);
/// // a.observe(full_key_with_smaller_user_key);
///
/// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key2), None);
///
/// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key3), Some(full_key1));
/// ```
///
/// Return:
/// - If the provided `key` contains a new user key, return the latest full key observed for the previous user key.
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<FullKey<T>>
where
FullKey<F>: Into<FullKey<T>>,
F: AsRef<[u8]>,
{
match self
.latest_full_key
.user_key
.as_ref()
.cmp(&key.user_key.as_ref())
{
Ordering::Less => {
// Observe a new user key

// Reset epochs
self.last_observed_epoch_with_gap = key.epoch_with_gap;

// Take the previous key and set latest key
Some(std::mem::replace(&mut self.latest_full_key, key.into()))
}
Ordering::Equal => {
if key.epoch_with_gap >= self.last_observed_epoch_with_gap {
// Epoch from the same user key should be monotonically decreasing
panic!(
"key {:?} epoch {:?} >= prev epoch {:?}",
key.user_key, key.epoch_with_gap, self.last_observed_epoch_with_gap
);
}
self.last_observed_epoch_with_gap = key.epoch_with_gap;
None
}
Ordering::Greater => {
// User key should be monotonically increasing
panic!(
"key {:?} <= prev key {:?}",
key,
FullKey {
user_key: self.latest_full_key.user_key.as_ref(),
epoch_with_gap: self.last_observed_epoch_with_gap
}
);
}
}
}
}

#[cfg(test)]
mod tests {
use std::cmp::Ordering;
Expand Down
17 changes: 7 additions & 10 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::util::epoch::is_max_epoch;
use risingwave_hummock_sdk::compact::{
compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
};
use risingwave_hummock_sdk::key::{FullKey, PointRange};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, PointRange};
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch};
Expand Down Expand Up @@ -732,7 +732,7 @@ where
};
let max_key = end_key.to_ref();

let mut last_key = FullKey::default();
let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
let mut watermark_can_see_last_key = false;
let mut user_key_last_delete_epoch = HummockEpoch::MAX;
let mut local_stats = StoreLocalStatistic::default();
Expand All @@ -746,9 +746,7 @@ where
let mut iter_key = iter.key();
compaction_statistics.iter_total_key_counts += 1;

let mut is_new_user_key =
last_key.is_empty() || iter_key.user_key != last_key.user_key.as_ref();

let mut is_new_user_key = full_key_tracker.observe(iter.key()).is_some();
let mut drop = false;

let epoch = iter_key.epoch_with_gap.pure_epoch();
Expand All @@ -757,7 +755,6 @@ where
if !max_key.is_empty() && iter_key >= max_key {
break;
}
last_key.set(iter_key);
watermark_can_see_last_key = false;
user_key_last_delete_epoch = HummockEpoch::MAX;
if value.is_delete() {
Expand All @@ -768,12 +765,12 @@ where
}

if last_table_id.map_or(true, |last_table_id| {
last_table_id != last_key.user_key.table_id.table_id
last_table_id != iter_key.user_key.table_id.table_id
}) {
if let Some(last_table_id) = last_table_id.take() {
table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
}
last_table_id = Some(last_key.user_key.table_id.table_id);
last_table_id = Some(iter_key.user_key.table_id.table_id);
}

let target_extended_user_key = PointRange::from_user_key(iter_key.user_key, false);
Expand Down Expand Up @@ -824,13 +821,13 @@ where

let should_count = match task_config.stats_target_table_ids.as_ref() {
Some(target_table_ids) => {
target_table_ids.contains(&last_key.user_key.table_id.table_id)
target_table_ids.contains(&iter_key.user_key.table_id.table_id)
}
None => true,
};
if should_count {
last_table_stats.total_key_count -= 1;
last_table_stats.total_key_size -= last_key.encoded_len() as i64;
last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
}
iter.next()
Expand Down
84 changes: 56 additions & 28 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use bytes::{Bytes, BytesMut};
use futures::future::try_join_all;
use futures::{stream, StreamExt, TryFutureExt};
use itertools::Itertools;
use more_asserts::debug_assert_lt;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, PointRange, TableKey, UserKey};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::compact_task;
Expand Down Expand Up @@ -355,13 +356,16 @@ pub async fn merge_imms_in_memory(
}

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();
let mut pivot = items
let first_item_key = items
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe off topic: can we assert that items is non-empty instead of creating an implicit default value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to do that before but I later realized it is possible that the items are empty when imm only contains range tombstone.

However, I am planning to disallow imm merge when there are range tombstones in imm, which can simplify the logic a lot.

.first()
.map(|((k, _), _)| k.clone())
.unwrap_or_default();

let mut monotonic_tombstone_events = vec![];
let target_extended_user_key =
PointRange::from_user_key(UserKey::new(table_id, TableKey(pivot.as_ref())), false);
let target_extended_user_key = PointRange::from_user_key(
UserKey::new(table_id, TableKey(first_item_key.as_ref())),
false,
);
while del_iter.is_valid() && del_iter.key().le(&target_extended_user_key) {
let event_key = del_iter.key().to_vec();
del_iter.next().await?;
Expand All @@ -371,19 +375,35 @@ pub async fn merge_imms_in_memory(
});
}

let mut versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

let mut pivot_last_delete_epoch = HummockEpoch::MAX;

for ((key, value), epoch) in items {
assert!(key >= pivot, "key should be in ascending order");
if key != pivot {
merged_payload.push((pivot, versions));
pivot = key;
pivot_last_delete_epoch = HummockEpoch::MAX;
versions = vec![];
let target_extended_user_key =
PointRange::from_user_key(UserKey::new(table_id, TableKey(pivot.as_ref())), false);
// Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
table_id,
first_item_key,
EpochWithGap::new_max_epoch(),
));
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();
let mut table_key_last_delete_epoch = HummockEpoch::MAX;

for ((key, value), epoch_with_gap) in items {
let full_key = FullKey::new_with_gap_epoch(table_id, key, epoch_with_gap);
if let Some(last_full_key) = full_key_tracker.observe(full_key) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

// Record kv entries
merged_payload.push((last_user_key.table_key, table_key_versions));

// Reset state before moving onto the new table key
table_key_versions = vec![];
table_key_last_delete_epoch = HummockEpoch::MAX;

// Record range tombstones if any
let target_extended_user_key = PointRange::from_user_key(
full_key_tracker.latest_full_key.user_key.as_ref(),
false,
);
while del_iter.is_valid() && del_iter.key().le(&target_extended_user_key) {
let event_key = del_iter.key().to_vec();
del_iter.next().await?;
Expand All @@ -394,26 +414,31 @@ pub async fn merge_imms_in_memory(
}
}
let earliest_range_delete_which_can_see_key =
del_iter.earliest_delete_since(epoch.pure_epoch());
del_iter.earliest_delete_since(epoch_with_gap.pure_epoch());
if value.is_delete() {
pivot_last_delete_epoch = epoch.pure_epoch();
} else if earliest_range_delete_which_can_see_key < pivot_last_delete_epoch {
debug_assert!(
epoch.pure_epoch() < earliest_range_delete_which_can_see_key
&& earliest_range_delete_which_can_see_key < pivot_last_delete_epoch
table_key_last_delete_epoch = epoch_with_gap.pure_epoch();
} else if earliest_range_delete_which_can_see_key < table_key_last_delete_epoch {
debug_assert_lt!(
epoch_with_gap.pure_epoch(),
earliest_range_delete_which_can_see_key
);
pivot_last_delete_epoch = earliest_range_delete_which_can_see_key;
debug_assert_lt!(
earliest_range_delete_which_can_see_key,
table_key_last_delete_epoch
);

table_key_last_delete_epoch = earliest_range_delete_which_can_see_key;
// In each merged immutable memtable, since a union set of delete ranges is constructed
// and thus original delete ranges are replaced with the union set and not
// used in read, we lose exact information about whether a key is deleted by
// a delete range in the merged imm which it belongs to. Therefore we need
// to construct a corresponding delete key to represent this.
versions.push((
table_key_versions.push((
EpochWithGap::new_from_epoch(earliest_range_delete_which_can_see_key),
HummockValue::Delete,
));
}
versions.push((epoch, value));
table_key_versions.push((epoch_with_gap, value));
}
while del_iter.is_valid() {
let event_key = del_iter.key().to_vec();
Expand All @@ -425,8 +450,11 @@ pub async fn merge_imms_in_memory(
}

// process the last key
if !versions.is_empty() {
merged_payload.push((pivot, versions));
if !table_key_versions.is_empty() {
merged_payload.push((
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
}

drop(del_iter);
Expand Down
Loading
Loading