Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ensure each storage read only involves one vnode #15289

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
(left, right)
}

// Ensure there is only one vnode involved in table key range and return the vnode
pub fn vnode(range: &TableKeyRange) -> VirtualNode {
let (l, r_exclusive) = vnode_range(range);
assert_eq!(r_exclusive - l, 1);
VirtualNode::from_index(l)
}

/// Converts user key to full key by appending `epoch` to the user key.
pub fn key_with_epoch(mut user_key: Vec<u8>, epoch: HummockEpoch) -> Vec<u8> {
let res = epoch.to_be();
Expand Down
138 changes: 40 additions & 98 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark};
use tracing::{debug, warn};

use crate::key::{prefix_slice_with_vnode, vnode_range, TableKey, TableKeyRange};
use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange};
use crate::HummockEpoch;

#[derive(Clone)]
Expand Down Expand Up @@ -102,79 +102,54 @@ impl TableWatermarksIndex {
self.read_watermark(vnode, HummockEpoch::MAX)
}

pub fn range_watermarks(
pub fn rewrite_range_with_table_watermark(
&self,
epoch: HummockEpoch,
key_range: &mut TableKeyRange,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just question: why not return a new key_range after rewrite instead of "mut ref "?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we also do in-place update prior to this PR. I am okay both ways so I can also change that if you think it is important.

) -> Option<ReadTableWatermark> {
let mut ret = BTreeMap::new();
let (left, right) = vnode_range(key_range);
if right - left == 1 {
// the table key range falls in a single vnode. No table watermark will be returned, and instead the key range
// will be modified.
let vnode = VirtualNode::from_index(left);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
match self.watermark_direction {
WatermarkDirection::Ascending => {
let overwrite_start_key = match &key_range.0 {
Included(start_key) | Excluded(start_key) => {
start_key.key_part() < watermark
}
Unbounded => true,
) {
let vnode = vnode(key_range);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
match self.watermark_direction {
WatermarkDirection::Ascending => {
let overwrite_start_key = match &key_range.0 {
Included(start_key) | Excluded(start_key) => {
start_key.key_part() < watermark
}
Unbounded => true,
};
if overwrite_start_key {
let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.1 {
Included(end_key) => end_key < &watermark_key,
Excluded(end_key) => end_key <= &watermark_key,
Unbounded => false,
};
if overwrite_start_key {
let watermark_key =
TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.1 {
Included(end_key) => end_key < &watermark_key,
Excluded(end_key) => end_key <= &watermark_key,
Unbounded => false,
};
if fully_filtered {
key_range.1 = Excluded(watermark_key.clone());
}
key_range.0 = Included(watermark_key);
if fully_filtered {
key_range.1 = Excluded(watermark_key.clone());
}
key_range.0 = Included(watermark_key);
}
WatermarkDirection::Descending => {
let overwrite_end_key = match &key_range.1 {
Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
Unbounded => true,
}
WatermarkDirection::Descending => {
let overwrite_end_key = match &key_range.1 {
Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
Unbounded => true,
};
if overwrite_end_key {
let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.0 {
Included(start_key) => start_key > &watermark_key,
Excluded(start_key) => start_key >= &watermark_key,
Unbounded => false,
};
if overwrite_end_key {
let watermark_key =
TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.0 {
Included(start_key) => start_key > &watermark_key,
Excluded(start_key) => start_key >= &watermark_key,
Unbounded => false,
};
if fully_filtered {
*key_range =
(Included(watermark_key.clone()), Excluded(watermark_key));
} else {
key_range.1 = Included(watermark_key);
}
if fully_filtered {
*key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
} else {
key_range.1 = Included(watermark_key);
}
}
}
}
None
} else {
for i in left..right {
let vnode = VirtualNode::from_index(i);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
assert!(ret.insert(vnode, watermark).is_none());
}
}
if ret.is_empty() {
None
} else {
Some(ReadTableWatermark {
direction: self.direction(),
vnode_watermarks: ret,
})
}
}
}

Expand Down Expand Up @@ -606,10 +581,7 @@ mod tests {
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;

use crate::key::{
is_empty_key_range, map_table_key_range, prefix_slice_with_vnode,
prefixed_range_with_vnode, TableKeyRange,
};
use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange};
use crate::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark,
WatermarkDirection,
Expand Down Expand Up @@ -969,42 +941,12 @@ mod tests {
Some(watermark2.clone())
);

// test read from multiple vnodes
{
let range = map_table_key_range((
Included(prefix_slice_with_vnode(
VirtualNode::from_index(1),
b"begin",
)),
Excluded(prefix_slice_with_vnode(VirtualNode::from_index(2), b"end")),
));
let mut range_mut = range.clone();
let read_watermarks = index.range_watermarks(EPOCH2, &mut range_mut).unwrap();
assert_eq!(range_mut, range);
assert_eq!(direction, read_watermarks.direction);
assert_eq!(2, read_watermarks.vnode_watermarks.len());
assert_eq!(
&watermark2,
read_watermarks
.vnode_watermarks
.get(&VirtualNode::from_index(1))
.unwrap()
);
assert_eq!(
&watermark2,
read_watermarks
.vnode_watermarks
.get(&VirtualNode::from_index(2))
.unwrap()
);
}

// watermark is watermark2
let check_watermark_range =
|query_range: (Bound<Bytes>, Bound<Bytes>),
output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
let mut range = build_watermark_range(direction, query_range);
assert!(index.range_watermarks(EPOCH2, &mut range).is_none());
index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
if let Some(output_range) = output_range {
assert_eq!(range, build_watermark_range(direction, output_range));
} else {
Expand Down
24 changes: 18 additions & 6 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub(crate) mod tests {
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{
next_key, prefix_slice_with_vnode, FullKey, TableKey, TABLE_PREFIX_LEN,
next_key, prefix_slice_with_vnode, prefixed_range_with_vnode, FullKey, TableKey,
TABLE_PREFIX_LEN,
};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
Expand Down Expand Up @@ -156,7 +157,9 @@ pub(crate) mod tests {
value_size: usize,
epochs: Vec<u64>,
) {
let mut local = storage.new_local(Default::default()).await;
let mut local = storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;
// 1. add sstables
let val = b"0"[..].repeat(value_size);
local.init_for_test(epochs[0]).await.unwrap();
Expand Down Expand Up @@ -730,6 +733,8 @@ pub(crate) mod tests {
StaticCompactionGroupId::StateDefault.into(),
)
.await;

let vnode = VirtualNode::from_index(1);
for index in 0..kv_count {
epoch += 1;
let next_epoch = epoch + 1;
Expand All @@ -746,7 +751,7 @@ pub(crate) mod tests {

let mut prefix = BytesMut::default();
let random_key = rand::thread_rng().gen::<[u8; 32]>();
prefix.put_u16(1);
prefix.extend_from_slice(&vnode.to_be_bytes());
prefix.put_slice(random_key.as_slice());

storage
Expand Down Expand Up @@ -852,7 +857,10 @@ pub(crate) mod tests {
// 7. scan kv to check key table_id
let scan_result = global_storage
.scan(
(Bound::Unbounded, Bound::Unbounded),
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
vnode,
),
epoch,
None,
ReadOptions {
Expand Down Expand Up @@ -922,6 +930,7 @@ pub(crate) mod tests {
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
let vnode = VirtualNode::from_index(1);
let mut epoch_set = BTreeSet::new();

let mut local = storage
Expand All @@ -936,7 +945,7 @@ pub(crate) mod tests {
epoch_set.insert(epoch);
let mut prefix = BytesMut::default();
let random_key = rand::thread_rng().gen::<[u8; 32]>();
prefix.put_u16(1);
prefix.extend_from_slice(&vnode.to_be_bytes());
prefix.put_slice(random_key.as_slice());

local
Expand Down Expand Up @@ -1047,7 +1056,10 @@ pub(crate) mod tests {
// 6. scan kv to check key table_id
let scan_result = storage
.scan(
(Bound::Unbounded, Bound::Unbounded),
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
vnode,
),
epoch,
None,
ReadOptions {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ async fn test_failpoints_state_store_read_upload() {
.await
.unwrap();

let mut local = hummock_storage.new_local(NewLocalOptions::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;

let anchor = gen_key_from_str(VirtualNode::ZERO, "aa");
let mut batch1 = vec![
Expand Down
Loading
Loading