Skip to content

Commit

Permalink
feat(storage): pass per-vnode watermark to hummock (#13429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Dec 25, 2023
1 parent ee6378b commit 4ad1940
Show file tree
Hide file tree
Showing 13 changed files with 774 additions and 90 deletions.
115 changes: 113 additions & 2 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark};
use tracing::debug;
use tracing::{debug, warn};

use crate::key::{prefix_slice_with_vnode, vnode_range, TableKey, TableKeyRange};
use crate::HummockEpoch;
Expand Down Expand Up @@ -82,6 +82,10 @@ impl TableWatermarksIndex {
}
}

pub fn index(&self) -> &HashMap<VirtualNode, BTreeMap<HummockEpoch, Bytes>> {
&self.index
}

pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
self.index.get(&vnode).and_then(|epoch_watermarks| {
epoch_watermarks
Expand Down Expand Up @@ -171,6 +175,54 @@ impl TableWatermarksIndex {
}
}

pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
let mut ret = Vec::with_capacity(watermarks.len());
for watermark in watermarks.drain(..) {
let mut regress_vnodes = None;
for vnode in watermark.vnode_bitmap.iter_vnodes() {
if let Some(prev_watermark) = self.latest_watermark(vnode) {
let is_regress = match self.direction() {
WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
WatermarkDirection::Descending => prev_watermark < watermark.watermark,
};
if is_regress {
warn!(
"table watermark regress: {:?} {} {:?} {:?}",
self.direction(),
vnode,
watermark.watermark,
prev_watermark
);
regress_vnodes
.get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT))
.set(vnode.to_index(), true);
}
}
}
if let Some(regress_vnodes) = regress_vnodes {
let mut bitmap_builder = None;
for vnode in watermark.vnode_bitmap.iter_vnodes() {
let vnode_index = vnode.to_index();
if !regress_vnodes.is_set(vnode_index) {
bitmap_builder
.get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT))
.set(vnode_index, true);
}
}
if let Some(bitmap_builder) = bitmap_builder {
ret.push(VnodeWatermark::new(
Arc::new(bitmap_builder.finish()),
watermark.watermark,
));
}
} else {
// no vnode has regress watermark
ret.push(watermark);
}
}
*watermarks = ret;
}

pub fn direction(&self) -> WatermarkDirection {
self.watermark_direction
}
Expand Down Expand Up @@ -238,7 +290,7 @@ impl WatermarkDirection {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct VnodeWatermark {
vnode_bitmap: Arc<Bitmap>,
watermark: Bytes,
Expand Down Expand Up @@ -826,6 +878,9 @@ mod tests {
prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
}

/// Build and return a watermark index with the following watermarks
/// EPOCH1 bitmap(0, 1, 2, 3) watermark1
/// EPOCH2 bitmap(1, 2, 3, 4) watermark2
fn build_and_test_watermark_index(
direction: WatermarkDirection,
watermark1: Bytes,
Expand Down Expand Up @@ -1030,4 +1085,60 @@ mod tests {
}
}
}

#[test]
fn test_filter_regress_watermark() {
let watermark1 = Bytes::from_static(b"watermark1");
let watermark2 = Bytes::from_static(b"watermark2");
let watermark3 = Bytes::from_static(b"watermark3");
let index = build_and_test_watermark_index(
WatermarkDirection::Ascending,
watermark1.clone(),
watermark2.clone(),
watermark3.clone(),
);

let mut new_watermarks = vec![
// Partial regress
VnodeWatermark {
vnode_bitmap: build_bitmap(0..2),
watermark: watermark1.clone(),
},
// All not regress
VnodeWatermark {
vnode_bitmap: build_bitmap(2..4),
watermark: watermark3.clone(),
},
// All regress
VnodeWatermark {
vnode_bitmap: build_bitmap(4..5),
watermark: watermark1.clone(),
},
// All newly set vnode
VnodeWatermark {
vnode_bitmap: build_bitmap(5..6),
watermark: watermark3.clone(),
},
];

index.filter_regress_watermarks(&mut new_watermarks);

assert_eq!(
new_watermarks,
vec![
VnodeWatermark {
vnode_bitmap: build_bitmap(0..1),
watermark: watermark1,
},
VnodeWatermark {
vnode_bitmap: build_bitmap(2..4),
watermark: watermark3.clone(),
},
VnodeWatermark {
vnode_bitmap: build_bitmap(5..6),
watermark: watermark3,
},
]
);
}
}
Loading

0 comments on commit 4ad1940

Please sign in to comment.