Skip to content

Commit

Permalink
fix(obserivation): fix missed hitmap updates, fix drop order (#14788)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jan 25, 2024
1 parent 372c2d7 commit 996a4ba
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 33 deletions.
8 changes: 4 additions & 4 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
pub const DEFAULT_RESTART_INTERVAL: usize = 16;
pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64)

pub const HITMAP_ELEMS: usize = 4;

#[allow(non_camel_case_types)]
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum LenType {
Expand Down Expand Up @@ -154,7 +152,7 @@ pub struct Block {
/// Restart points.
restart_points: Vec<RestartPoint>,

hitmap: Hitmap<HITMAP_ELEMS>,
hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
}

impl Clone for Block {
Expand All @@ -180,6 +178,8 @@ impl Debug for Block {
}

impl Block {
pub const HITMAP_ELEMS: usize = 4;

pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
Ok(compression)
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Block {
&self.data[..]
}

pub fn hitmap(&self) -> &Hitmap<HITMAP_ELEMS> {
pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
&self.hitmap
}

Expand Down
53 changes: 36 additions & 17 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::BytesMut;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;

use super::{KeyPrefix, LenType, RestartPoint, HITMAP_ELEMS};
use super::{Block, KeyPrefix, LenType, RestartPoint};
use crate::hummock::BlockHolder;
use crate::monitor::LocalHitmap;

Expand All @@ -41,13 +41,22 @@ pub struct BlockIterator {
last_key_len_type: LenType,
last_value_len_type: LenType,

/// NOTE: `hitmap` is supposed be updated every time when `value_range` is updated.
hitmap: LocalHitmap<HITMAP_ELEMS>,
/// NOTE:
///
/// - `hitmap` is supposed to be updated each time accessing the block data in a new position.
/// - `hitmap` must be reported to the block hitmap before drop.
hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
}

impl Drop for BlockIterator {
fn drop(&mut self) {
self.block.hitmap().report(&mut self.hitmap);
}
}

impl BlockIterator {
pub fn new(block: BlockHolder) -> Self {
let hitmap = block.hitmap().local();
let hitmap = LocalHitmap::default();
Self {
block,
offset: usize::MAX,
Expand Down Expand Up @@ -175,7 +184,8 @@ impl BlockIterator {
self.offset = offset;
self.entry_len = prefix.entry_len();

self.update_hitmap();
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());

true
}
Expand Down Expand Up @@ -246,27 +256,41 @@ impl BlockIterator {
}

/// Searches the restart point index that the given `key` belongs to.
fn search_restart_point_index_by_key(&self, key: FullKey<&[u8]>) -> usize {
fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
// Find the largest restart point that restart key equals or less than the given key.
self.block
let res = self
.block
.search_restart_partition_point(
|&RestartPoint {
offset: probe,
key_len_type,
value_len_type,
}| {
let prefix =
self.decode_prefix_at(probe as usize, key_len_type, value_len_type);
let probe = probe as usize;
let prefix = KeyPrefix::decode(
&mut &self.block.data()[probe..],
probe,
key_len_type,
value_len_type,
);
let probe_key = &self.block.data()[prefix.diff_key_range()];
let full_probe_key =
FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
self.hitmap.fill_with_range(
probe,
prefix.diff_key_range().end,
self.block.len(),
);
match full_probe_key.cmp(&key) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
},
)
.saturating_sub(1) // Prevent from underflowing when given is smaller than the first.
// Prevent from underflowing when given is smaller than the first.
.saturating_sub(1);

res
}

/// Seeks to the restart point that the given `key` belongs to.
Expand All @@ -291,7 +315,8 @@ impl BlockIterator {
self.entry_len = prefix.entry_len();
self.update_restart_point(index);

self.update_hitmap();
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());
}

fn update_restart_point(&mut self, index: usize) {
Expand All @@ -301,12 +326,6 @@ impl BlockIterator {
self.last_key_len_type = restart_point.key_len_type;
self.last_value_len_type = restart_point.value_len_type;
}

/// Update the local hitmap of the block based on the current iterator position.
fn update_hitmap(&mut self) {
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());
}
}

#[cfg(test)]
Expand Down
41 changes: 29 additions & 12 deletions src/storage/src/monitor/hitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@ impl<const N: usize> Hitmap<N> {
N * 8
}

pub fn local(&self) -> LocalHitmap<N> {
LocalHitmap::new(self)
}

pub fn apply(&self, local: &LocalHitmap<N>) {
pub fn report(&self, local: &mut LocalHitmap<N>) {
for (global, local) in self.data.iter().zip_eq_fast(local.data.iter()) {
global.fetch_or(*local, Ordering::Relaxed);
}
local.reset();
}

pub fn ones(&self) -> usize {
Expand Down Expand Up @@ -80,10 +77,15 @@ impl<const N: usize> Hitmap<N> {

#[derive(Debug)]
pub struct LocalHitmap<const N: usize> {
owner: Hitmap<N>,
data: Box<[u64; N]>,
}

impl<const N: usize> Default for LocalHitmap<N> {
fn default() -> Self {
Self::new()
}
}

impl<const N: usize> LocalHitmap<N> {
pub const fn bits() -> usize {
N * u64::BITS as usize
Expand All @@ -93,17 +95,29 @@ impl<const N: usize> LocalHitmap<N> {
N * 8
}

pub fn new(owner: &Hitmap<N>) -> Self {
pub fn new() -> Self {
Self {
owner: owner.clone(),
data: Box::new([0; N]),
}
}

pub fn reset(&mut self) {
for elem in &mut *self.data {
*elem = 0;
}
}

pub fn merge(&mut self, other: &mut Self) {
for (elem, e) in self.data.iter_mut().zip_eq_fast(other.data.iter()) {
*elem |= *e;
}
other.reset();
}

pub fn fill(&mut self, start_bit: usize, end_bit: usize) {
const MASK: usize = (1 << 6) - 1;

let end_bit = std::cmp::min(end_bit, Self::bits());
let end_bit = end_bit.clamp(start_bit + 1, Self::bits());

let head_bits = start_bit & MASK;
let tail_bits_rev = end_bit & MASK;
Expand Down Expand Up @@ -153,9 +167,12 @@ impl<const N: usize> LocalHitmap<N> {
}
}

#[cfg(debug_assertions)]
impl<const N: usize> Drop for LocalHitmap<N> {
fn drop(&mut self) {
self.owner.apply(self);
if self.ones() > 0 {
panic!("LocalHitmap is not reported!");
}
}
}

Expand All @@ -168,7 +185,7 @@ mod tests {
// hex: high <== low
let g = Hitmap::<4>::default();

let mut h = g.local();
let mut h = LocalHitmap::new();
assert_eq!(
h.to_hex_vec(),
vec![
Expand Down Expand Up @@ -223,7 +240,7 @@ mod tests {
]
);
assert_eq!(h.ones(), 256);
drop(h);
g.report(&mut h);
assert_eq!(
g.to_hex_vec(),
vec![
Expand Down

0 comments on commit 996a4ba

Please sign in to comment.