diff --git a/Cargo.toml b/Cargo.toml index e9f5c44..efa4412 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,6 @@ tracing-subscriber = "0.3" lz4 = "1.23.1" integer-encoding = "3.0.4" clap = { version = "4.4.6", features = ["derive"] } -growable-bloom-filter = "2.0.1" itertools = "0.10.3" chrono = "0.4.19" parking_lot = "0.12.1" diff --git a/src/kernel/lsm/table/ss_table/block.rs b/src/kernel/lsm/table/ss_table/block.rs index 21cf952..10af548 100644 --- a/src/kernel/lsm/table/ss_table/block.rs +++ b/src/kernel/lsm/table/ss_table/block.rs @@ -1,14 +1,13 @@ use crate::kernel::lsm::storage::Config; +use crate::kernel::utils::bloom_filter::BloomFilter; use crate::kernel::utils::lru_cache::ShardingLruCache; use crate::kernel::KernelResult; use crate::KernelError; use bytes::{Buf, BufMut, Bytes}; use futures::future; -use growable_bloom_filter::GrowableBloom; use integer_encoding::{FixedInt, VarIntReader, VarIntWriter}; use itertools::Itertools; use lz4::Decoder; -use serde::{Deserialize, Serialize}; use std::cmp::min; use std::collections::Bound; use std::io::{Cursor, Read, Write}; @@ -195,14 +194,43 @@ pub(crate) enum CompressType { LZ4, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug)] pub(crate) struct MetaBlock { - pub(crate) filter: GrowableBloom, + pub(crate) filter: BloomFilter<[u8]>, pub(crate) len: usize, pub(crate) index_restart_interval: usize, pub(crate) data_restart_interval: usize, } +impl MetaBlock { + pub(crate) fn to_raw(&self) -> Vec { + let mut bytes = u32::encode_fixed_vec(self.len as u32); + + bytes.append(&mut u32::encode_fixed_vec( + self.index_restart_interval as u32, + )); + bytes.append(&mut u32::encode_fixed_vec( + self.data_restart_interval as u32, + )); + bytes.append(&mut self.filter.to_raw()); + bytes + } + + pub(crate) fn from_raw(bytes: &[u8]) -> Self { + let len = u32::decode_fixed(&bytes[0..4]) as usize; + let index_restart_interval = u32::decode_fixed(&bytes[4..8]) as usize; + let data_restart_interval = u32::decode_fixed(&bytes[8..12]) as usize; + let filter = BloomFilter::from_raw(&bytes[12..]); + + Self { + filter, + len, + index_restart_interval, + data_restart_interval, + } + } +} + /// Block SSTable最小的存储单位 /// /// 分为DataBlock和IndexBlock @@ -614,7 +642,7 @@ where } /// 批量以restart_interval进行shared_len的获取 -fn sharding_shared_len(vec_kv: &Vec>, restart_interval: usize) -> Vec +fn sharding_shared_len(vec_kv: &[KeyValue], restart_interval: usize) -> Vec where T: BlockItem, { diff --git a/src/kernel/lsm/table/ss_table/mod.rs b/src/kernel/lsm/table/ss_table/mod.rs index 705aee7..ba32fc3 100644 --- a/src/kernel/lsm/table/ss_table/mod.rs +++ b/src/kernel/lsm/table/ss_table/mod.rs @@ -9,10 +9,11 @@ use crate::kernel::lsm::table::ss_table::block::{ use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE}; use crate::kernel::lsm::table::ss_table::iter::SSTableIter; use crate::kernel::lsm::table::Table; +use crate::kernel::utils::bloom_filter::BloomFilter; use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; -use growable_bloom_filter::GrowableBloom; +use core::slice::SlicePattern; use itertools::Itertools; use parking_lot::Mutex; use std::io::SeekFrom; @@ -53,7 +54,7 @@ impl SSTable { let len = vec_data.len(); let data_restart_interval = config.data_restart_interval; let index_restart_interval = config.index_restart_interval; - let mut filter = GrowableBloom::new(config.desired_error_prob, len); + let mut filter = BloomFilter::new(len, config.desired_error_prob); let mut builder = BlockBuilder::new( BlockOptions::from(config) @@ -63,7 +64,7 @@ impl SSTable { ); for data in vec_data { let (key, value) = data; - let _ = filter.insert(&key); + filter.insert(key.as_slice()); builder.add((key, Value::from(value))); } let meta = MetaBlock { @@ -73,7 +74,7 @@ impl SSTable { data_restart_interval, }; let (data_bytes, index_bytes) = builder.build().await?; - let meta_bytes = bincode::serialize(&meta)?; + let meta_bytes = meta.to_raw(); let footer = Footer { level: level as u8, index_offset: data_bytes.len() as u32, @@ -133,7 +134,7 @@ impl SSTable { let _ = reader.seek(SeekFrom::Start(*meta_offset as u64))?; let _ = reader.read(&mut buf)?; - let meta = bincode::deserialize(&buf)?; + let meta = MetaBlock::from_raw(&buf); let reader = Mutex::new(reader); Ok(SSTable { footer, diff --git a/src/kernel/utils/bloom_filter.rs b/src/kernel/utils/bloom_filter.rs new file mode 100644 index 0000000..7949d35 --- /dev/null +++ b/src/kernel/utils/bloom_filter.rs @@ -0,0 +1,288 @@ +use integer_encoding::FixedInt; +use itertools::Itertools; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::marker::PhantomData; +use std::slice; + +pub(crate) const DEFAULT_HASH_SEED_1: u64 = 31; + +pub(crate) const DEFAULT_HASH_SEED_2: u64 = 37; + +// https://rust-algo.club/collections/bloom_filter/ +#[derive(Debug, Default)] +pub struct BloomFilter { + bits: BitVector, + hash_fn_count: u64, + hashers: [FixedHasher; 2], + _phantom: PhantomData, +} + +impl BloomFilter { + pub fn new(len: usize, err_rate: f64) -> Self { + let bits_count = Self::optimal_bits_count(len, err_rate); + let hash_fn_count = Self::optimal_hashers_count(err_rate); + let hashers = [ + FixedHasher::new(DEFAULT_HASH_SEED_1), + FixedHasher::new(DEFAULT_HASH_SEED_2), + ]; + + Self { + bits: BitVector::new(bits_count), + hash_fn_count, + hashers, + _phantom: PhantomData, + } + } + + pub fn insert(&mut self, elem: &T) + where + T: Hash, + { + // g_i(x) = h1(x) + i * h2(x) + let hashes = self.make_hash(elem); + for fn_i in 0..self.hash_fn_count { + let index = self.get_index(hashes, fn_i); + self.bits.set_bit(index, true); + } + } + + pub fn contains(&self, elem: &T) -> bool + where + T: Hash, + { + let hashes = self.make_hash(elem); + (0..self.hash_fn_count).all(|fn_i| { + let index = self.get_index(hashes, fn_i); + self.bits.get_bit(index) + }) + } + + fn get_index(&self, (h1, h2): (u64, u64), fn_i: u64) -> usize { + (h1.wrapping_add(fn_i.wrapping_mul(h2)) % self.bits.len() as u64) as usize + } + + fn make_hash(&self, elem: &T) -> (u64, u64) + where + T: Hash, + { + let hasher1 = &mut self.hashers[0].clone(); + let hasher2 = &mut self.hashers[1].clone(); + + elem.hash(hasher1); + elem.hash(hasher2); + + (hasher1.finish(), hasher2.finish()) + } + + /// m = -1 * (n * ln ε) / (ln 2)^2 + fn optimal_bits_count(len: usize, err_rate: f64) -> usize { + let ln_2_2 = std::f64::consts::LN_2.powf(2f64); + (-1f64 * len as f64 * err_rate.ln() / ln_2_2).ceil() as usize + } + + /// k = -log_2 ε + fn optimal_hashers_count(err_rate: f64) -> u64 { + (-1f64 * err_rate.log2()).ceil() as u64 + } + + pub fn to_raw(&self) -> Vec { + let mut bytes = u64::encode_fixed_vec(self.hash_fn_count); + + bytes.append(&mut self.bits.to_raw()); + bytes + } + + pub fn from_raw(bytes: &[u8]) -> Self { + let hash_fn_count = u64::decode_fixed(&bytes[0..8]); + let bits = BitVector::from_raw(&bytes[8..]); + let hashers = [ + FixedHasher::new(DEFAULT_HASH_SEED_1), + FixedHasher::new(DEFAULT_HASH_SEED_2), + ]; + + BloomFilter { + bits, + hash_fn_count, + hashers, + _phantom: PhantomData, + } + } +} + +#[derive(Debug, Default)] +pub struct BitVector { + len: u64, + bit_groups: Vec, +} + +impl BitVector { + pub fn new(len: usize) -> BitVector { + BitVector { + len: len as u64, + bit_groups: vec![0; (len + 7) / 8], + } + } + + pub fn set_bit(&mut self, index: usize, value: bool) { + let byte_index = index / 8; + let bit_index = index % 8; + + if value { + self.bit_groups[byte_index] |= 1 << bit_index; + } else { + self.bit_groups[byte_index] &= !(1 << bit_index); + } + } + + pub fn get_bit(&self, index: usize) -> bool { + self.bit_groups[index / 8] >> (index % 8) & 1 != 0 + } + + pub fn len(&self) -> usize { + self.len as usize + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn to_raw(&self) -> Vec { + let mut bytes = u64::encode_fixed_vec(self.len); + + for bits in &self.bit_groups { + bytes.append(&mut bits.encode_fixed_vec()); + } + bytes + } + + pub fn from_raw(bytes: &[u8]) -> Self { + let len = u64::decode_fixed(&bytes[0..8]); + let bit_groups = bytes[8..] + .iter() + .map(|bit| i8::decode_fixed(slice::from_ref(bit))) + .collect_vec(); + + BitVector { len, bit_groups } + } +} + +#[derive(Debug, Default, Copy, Clone)] +pub struct FixedHasher { + seed: u64, +} + +impl FixedHasher { + pub fn new(seed: u64) -> Self { + FixedHasher { seed } + } +} + +impl Hasher for FixedHasher { + fn write(&mut self, bytes: &[u8]) { + let mut hasher = DefaultHasher::new(); + self.seed.hash(&mut hasher); + bytes.hash(&mut hasher); + self.seed = hasher.finish(); + } + + fn finish(&self) -> u64 { + self.seed + } +} + +#[cfg(test)] +mod tests { + use crate::kernel::utils::bloom_filter::{BitVector, BloomFilter}; + use rand::Rng; + use std::collections::HashSet; + + #[test] + fn bit_vector_serialization() { + let mut vector = BitVector::new(100); + + vector.set_bit(99, true); + + let bytes = vector.to_raw(); + let vector = BitVector::from_raw(&bytes); + + for i in 0..98 { + assert!(!vector.get_bit(i)); + } + assert!(vector.get_bit(99)); + } + + #[test] + fn bit_vector_simple() { + let mut vector = BitVector::new(100); + + vector.set_bit(99, true); + + for i in 0..98 { + assert!(!vector.get_bit(i)); + } + assert!(vector.get_bit(99)); + } + + #[test] + fn bloom_filter_serialization() { + let mut bf = BloomFilter::new(100, 0.01); + + bf.insert(&1); + + let bytes = bf.to_raw(); + let bf = BloomFilter::from_raw(&bytes); + + assert!(bf.contains(&1)); + assert!(!bf.contains(&2)); + } + + #[test] + fn bloom_filter_simple() { + let mut bf = BloomFilter::new(100, 0.01); + + bf.insert(&1); + + assert!(bf.contains(&1)); + assert!(!bf.contains(&2)); + } + + #[test] + fn bloom_filter_fpr_test() { + let cnt = 500000; + let rate = 0.01; + + let mut bf = BloomFilter::new(cnt, rate); + let mut set: HashSet = HashSet::new(); + let mut rng = rand::thread_rng(); + + let mut i = 0; + + while i < cnt { + let v = rng.gen::(); + set.insert(v); + bf.insert(&v); + i += 1; + } + + i = 0; + let mut false_positives = 0; + while i < cnt { + let v = rng.gen::(); + match (bf.contains(&v), set.contains(&v)) { + (true, false) => { + false_positives += 1; + } + (false, true) => { + unreachable!() + } // should never happen + _ => {} + } + i += 1; + } + + // make sure we're not too far off + let actual_rate = false_positives as f64 / cnt as f64; + assert!(actual_rate > (rate - 0.001)); + assert!(actual_rate < (rate + 0.001)); + } +} diff --git a/src/kernel/utils/mod.rs b/src/kernel/utils/mod.rs index 6c350a2..9e1e2e3 100644 --- a/src/kernel/utils/mod.rs +++ b/src/kernel/utils/mod.rs @@ -1 +1,2 @@ +pub mod bloom_filter; pub mod lru_cache; diff --git a/src/lib.rs b/src/lib.rs index ed96651..5da3a4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ #![feature(slice_pattern)] #![feature(bound_map)] +extern crate core; + pub mod config; pub mod error; pub mod kernel;