Skip to content

Commit

Permalink
support cache block
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Jun 19, 2023
1 parent cc9dc29 commit 95a4a7d
Showing 1 changed file with 121 additions and 172 deletions.
293 changes: 121 additions & 172 deletions src/storage/src/hummock/event_handler/cache_refill_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ impl CacheRefillPolicy {
{
for sst_id in &level_delta.removed_table_ids {
if let Some(sstable) = self.sstable_store.lookup_sstable(sst_id) {
sstable_iters.push(SstableBlockIterator::new(sstable));
sstable_iters.push(SstableBlocksInfo::new(sstable));
}
}
}
}
let iter = MergeSstableBlockIterator::new(sstable_iters);
preload_ssts.push((iter, ssts));
preload_ssts.push((sstable_iters, ssts));
}
}

Expand All @@ -108,8 +107,8 @@ impl CacheRefillPolicy {
if try_join_all(flatten_reqs).await.is_err() {
return;
}
for (iter, ssts) in preload_ssts {
if let Err(e) = preload_l1_data(iter, ssts, &sstable_store).await {
for (iters, ssts) in preload_ssts {
if let Err(e) = preload_l1_data(iters, ssts, &sstable_store).await {
warn!("preload data meet error: {:?}", e);
}
}
Expand All @@ -123,198 +122,148 @@ impl CacheRefillPolicy {
}
}

pub struct SstableBlockIterator {
pub struct SstableBlocksInfo {
sstable: TableHolder,
block_idx: usize,
blocks_in_cache: Vec<bool>,
}

impl SstableBlockIterator {
impl SstableBlocksInfo {
pub fn new(sstable: TableHolder) -> Self {
Self {
sstable,
block_idx: 0,
blocks_in_cache: vec![false; sstable.value().meta.block_metas.len()],
}
}

pub fn seek(&mut self, full_key: &[u8]) {
self.block_idx = self
.sstable
.value()
.meta
.block_metas
.partition_point(|meta| {
KeyComparator::compare_encoded_full_key(&meta.smallest_key, full_key)
!= std::cmp::Ordering::Greater
})
.saturating_sub(1);
}

#[inline(always)]
pub fn next(&mut self) {
self.block_idx += 1;
}

#[inline(always)]
pub fn is_valid(&self) -> bool {
self.block_idx < self.sstable.value().meta.block_metas.len()
}

pub fn current_block_smallest(&self) -> &[u8] {
&self.sstable.value().meta.block_metas[self.block_idx].smallest_key
}

pub fn current_block_largest(&self) -> &[u8] {
if self.block_idx + 1 < self.sstable.value().meta.block_metas.len() {
&self.sstable.value().meta.block_metas[self.block_idx + 1].smallest_key
} else {
&self.sstable.value().meta.largest_key
}
}
}

impl PartialEq for SstableBlockIterator {
fn eq(&self, other: &Self) -> bool {
self.current_block_smallest() == other.current_block_smallest()
}
}

impl Eq for SstableBlockIterator {}

impl PartialOrd for SstableBlockIterator {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if !self.is_valid() || !other.is_valid() {
return None;
}
Some(KeyComparator::compare_encoded_full_key(
self.current_block_smallest(),
other.current_block_smallest(),
))
}
}

impl Ord for SstableBlockIterator {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
}

struct MergeSstableBlockIterator {
heap: BinaryHeap<Reverse<SstableBlockIterator>>,
unused_iters: Vec<SstableBlockIterator>,
}

impl MergeSstableBlockIterator {
pub fn new(unused_iters: Vec<SstableBlockIterator>) -> Self {
Self {
unused_iters,
heap: BinaryHeap::default(),
}
}

pub fn seek(&mut self, full_key: &[u8]) {
self.unused_iters
.extend(self.heap.drain().map(|iter| iter.0));
self.heap = self
.unused_iters
.drain_filter(|iter| {
iter.seek(full_key);
iter.is_valid()
})
.map(|iter| Reverse(iter))
.collect();
}

pub fn is_valid(&self) -> bool {
!self.heap.is_empty()
}

pub fn next(&mut self) {
let mut top = self.heap.peek_mut().unwrap();
top.0.next();
if !top.0.is_valid() {
let iter = PeekMut::pop(top);
self.unused_iters.push(iter.0);
}
}

pub fn smallest_block(&self) -> &[u8] {
self.heap.peek().unwrap().0.current_block_smallest()
}

pub fn current_block(&self) -> (HummockSstableObjectId, usize) {
let top = self.heap.peek().unwrap();
(top.0.sstable.value().id, top.0.block_idx)
}
}

async fn preload_l1_data(
mut iter: MergeSstableBlockIterator,
mut removed_sstables: Vec<SstableBlocksInfo>,
insert_ssts: Vec<SstableInfo>,
sstable_store: &SstableStoreRef,
) -> HummockResult<()> {
let mut stats = StoreLocalStatistic::default();
let mut local_block_cache: HashMap<(HummockSstableObjectId, usize), bool> = HashMap::default();
for iter in &mut removed_sstables {
for idx in 0..iter.blocks_in_cache.len() {
iter.blocks_in_cache[idx] = sstable_store.is_hot_block(iter.sstable.value().id, idx);
}
}

const MIN_OVERLAP_HOT_BLOCK_COUNT: usize = 4;
for sst in insert_ssts {
let sstable = sstable_store.sstable(&sst, &mut stats).await?;
iter.seek(&sstable.value().meta.smallest_key);
let mut blocks = vec![false; sstable.value().meta.block_metas.len()];
let mut insert_iter = SstableBlockIterator::new(sstable);
let mut hot_block_count = 0;
while insert_iter.is_valid() && iter.is_valid() {
let mut need_preload = false;
while iter.is_valid() {
if KeyComparator::compare_encoded_full_key(
insert_iter.current_block_largest(),
iter.smallest_block(),
) != std::cmp::Ordering::Greater
{
break;
}
let block = iter.current_block();
if let Some(ret) = local_block_cache.get(&block) {
if *ret {
need_preload = true;
}
} else {
let ret = sstable_store.is_hot_block(block.0, block.1);
if ret {
need_preload = true;
}
local_block_cache.insert(block, ret);
}
iter.next();
let key_range = sst.key_range.as_ref().unwrap();
let mut replace_hot_block = 0;
for remove_sst in &removed_sstables {
let start_block_idx =
remove_sst
.sstable
.value()
.meta
.block_metas
.partition_point(|meta| {
KeyComparator::compare_encoded_full_key(&meta.smallest_key, &key_range.left)
!= std::cmp::Ordering::Greater
});
let mut end_block_idx =
remove_sst
.sstable
.value()
.meta
.block_metas
.partition_point(|meta| {
KeyComparator::compare_encoded_full_key(
&meta.smallest_key,
&key_range.right,
) != Ordering::Greater
});
if end_block_idx > 0
&& KeyComparator::compare_encoded_full_key(
&remove_sst.sstable.value().meta.largest_key,
&key_range.right,
) == Ordering::Greater
{
end_block_idx -= 1;
}
if need_preload {
hot_block_count += 1;
blocks[insert_iter.block_idx] = true;
for idx in start_block_idx..end_block_idx {
if remove_sst.blocks_in_cache[idx] {
replace_hot_block += 1;
}
}
insert_iter.next();
}
if hot_block_count > 1
&& hot_block_count * 3 > insert_iter.sstable.value().meta.block_metas.len()
{

if replace_hot_block < MIN_OVERLAP_HOT_BLOCK_COUNT {
continue;
}
let sstable = sstable_store.sstable(&sst, &mut stats).await?;
if replace_hot_block < sstable.value().meta.block_metas.len() / 20 {
info!(
"preload sstable-{} because there are {} blocks is hot",
sst.sst_id, hot_block_count
"skip prefetch for sst-{} hot blocks: {}",
sst.sst_id, replace_hot_block
);
let mut block_index = None;
let mut preload_index = 0;
for (index, preload) in blocks.iter().enumerate() {
if *preload {
block_index = Some(index);
preload_index = index;
continue;
}
let mut smallest_key = sstable.value().meta.largest_key.clone();
let mut largest_key = sstable.value().meta.smallest_key.clone();
for info in &removed_sstables {
let remove_meta = &info.sstable.value().meta;
for idx in 0..info.blocks_in_cache.len() {
if info.blocks_in_cache[idx] {
if KeyComparator::encoded_full_key_less_than(
remove_meta.block_metas[idx].smallest_key.as_ref(),
&smallest_key,
) {
smallest_key = remove_meta.block_metas[idx].smallest_key.clone();
}
break;
}
}
let mut preload_iter = sstable_store
.get_stream(insert_iter.sstable.value().as_ref(), block_index)
.await?;
while let Some(block) = preload_iter.next().await? {
if blocks[preload_index] {
sstable_store.insert_block_cache(sst.object_id, preload_index as u64, block);
for idx in (0..info.blocks_in_cache.len()).rev() {
if info.blocks_in_cache[idx] {
let next_key = if idx + 1 < info.blocks_in_cache.len() {
remove_meta.block_metas[idx + 1].smallest_key.as_ref()
} else {
remove_meta.largest_key.as_ref()
};
if KeyComparator::encoded_full_key_less_than(&largest_key, next_key) {
largest_key = next_key.to_vec();
}
break;
}
preload_index += 1;
assert_eq!(preload_iter.get_block_index(), preload_index);
}
}

if !KeyComparator::encoded_full_key_less_than(&smallest_key, &largest_key) {
return Ok(());
}
let mut start_index = 0;
let sstable_meta = sstable.value().as_ref();
for idx in 0..sstable_meta.meta.block_metas.len() {
if KeyComparator::encoded_full_key_less_than(
&smallest_key,
&sstable_meta.meta.block_metas[idx].smallest_key,
) {
break;
}
start_index = idx;
}
info!(
"prefetch for sst-{} hot blocks: {}",
sst.sst_id, replace_hot_block
);
let mut preload_iter = sstable_store
.get_stream(sstable_meta, Some(start_index))
.await?;
let mut index = start_index;
while let Some(block) = preload_iter.next().await? {
sstable_store.insert_block_cache(sstable_meta.id, index as u64, block);
index += 1;
if index >= sstable_meta.meta.block_metas.len()
|| index - start_index > replace_hot_block
|| KeyComparator::encoded_full_key_less_than(
&largest_key,
&sstable_meta.meta.block_metas[index].smallest_key,
)
{
break;
}
}
}
Expand Down

0 comments on commit 95a4a7d

Please sign in to comment.