Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

feat: implement BloomFilter to replace external dependencies #59

Merged
merged 3 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ tracing-subscriber = "0.3"
lz4 = "1.23.1"
integer-encoding = "3.0.4"
clap = { version = "4.4.6", features = ["derive"] }
growable-bloom-filter = "2.0.1"
itertools = "0.10.3"
chrono = "0.4.19"
parking_lot = "0.12.1"
Expand Down
38 changes: 33 additions & 5 deletions src/kernel/lsm/table/ss_table/block.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::kernel::lsm::storage::Config;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::{Buf, BufMut, Bytes};
use futures::future;
use growable_bloom_filter::GrowableBloom;
use integer_encoding::{FixedInt, VarIntReader, VarIntWriter};
use itertools::Itertools;
use lz4::Decoder;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::collections::Bound;
use std::io::{Cursor, Read, Write};
Expand Down Expand Up @@ -195,14 +194,43 @@ pub(crate) enum CompressType {
LZ4,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug)]
pub(crate) struct MetaBlock {
pub(crate) filter: GrowableBloom,
pub(crate) filter: BloomFilter<[u8]>,
pub(crate) len: usize,
pub(crate) index_restart_interval: usize,
pub(crate) data_restart_interval: usize,
}

impl MetaBlock {
pub(crate) fn to_raw(&self) -> Vec<u8> {
let mut bytes = u32::encode_fixed_vec(self.len as u32);

bytes.append(&mut u32::encode_fixed_vec(
self.index_restart_interval as u32,
));
bytes.append(&mut u32::encode_fixed_vec(
self.data_restart_interval as u32,
));
bytes.append(&mut self.filter.to_raw());
bytes
}

pub(crate) fn from_raw(bytes: &[u8]) -> Self {
let len = u32::decode_fixed(&bytes[0..4]) as usize;
let index_restart_interval = u32::decode_fixed(&bytes[4..8]) as usize;
let data_restart_interval = u32::decode_fixed(&bytes[8..12]) as usize;
let filter = BloomFilter::from_raw(&bytes[12..]);

Self {
filter,
len,
index_restart_interval,
data_restart_interval,
}
}
}

/// Block SSTable最小的存储单位
///
/// 分为DataBlock和IndexBlock
Expand Down Expand Up @@ -614,7 +642,7 @@ where
}

/// 批量以restart_interval进行shared_len的获取
fn sharding_shared_len<T>(vec_kv: &Vec<KeyValue<T>>, restart_interval: usize) -> Vec<usize>
fn sharding_shared_len<T>(vec_kv: &[KeyValue<T>], restart_interval: usize) -> Vec<usize>
where
T: BlockItem,
{
Expand Down
11 changes: 6 additions & 5 deletions src/kernel/lsm/table/ss_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::kernel::lsm::table::ss_table::block::{
use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE};
use crate::kernel::lsm::table::ss_table::iter::SSTableIter;
use crate::kernel::lsm::table::Table;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use growable_bloom_filter::GrowableBloom;
use core::slice::SlicePattern;
use itertools::Itertools;
use parking_lot::Mutex;
use std::io::SeekFrom;
Expand Down Expand Up @@ -53,7 +54,7 @@ impl SSTable {
let len = vec_data.len();
let data_restart_interval = config.data_restart_interval;
let index_restart_interval = config.index_restart_interval;
let mut filter = GrowableBloom::new(config.desired_error_prob, len);
let mut filter = BloomFilter::new(len, config.desired_error_prob);

let mut builder = BlockBuilder::new(
BlockOptions::from(config)
Expand All @@ -63,7 +64,7 @@ impl SSTable {
);
for data in vec_data {
let (key, value) = data;
let _ = filter.insert(&key);
filter.insert(key.as_slice());
builder.add((key, Value::from(value)));
}
let meta = MetaBlock {
Expand All @@ -73,7 +74,7 @@ impl SSTable {
data_restart_interval,
};
let (data_bytes, index_bytes) = builder.build().await?;
let meta_bytes = bincode::serialize(&meta)?;
let meta_bytes = meta.to_raw();
let footer = Footer {
level: level as u8,
index_offset: data_bytes.len() as u32,
Expand Down Expand Up @@ -133,7 +134,7 @@ impl SSTable {
let _ = reader.seek(SeekFrom::Start(*meta_offset as u64))?;
let _ = reader.read(&mut buf)?;

let meta = bincode::deserialize(&buf)?;
let meta = MetaBlock::from_raw(&buf);
let reader = Mutex::new(reader);
Ok(SSTable {
footer,
Expand Down
Loading
Loading