Skip to content

Commit

Permalink
feat(bloom-filter): add bloom filter reader
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 19, 2024
1 parent c6b7caa commit 10b0fdf
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 8 deletions.
6 changes: 5 additions & 1 deletion src/index/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
use serde::{Deserialize, Serialize};

pub mod creator;
mod error;
pub mod error;
pub mod reader;

pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];

/// The seed used for the Bloom filter.
pub const SEED: u128 = 42;

/// The Meta information of the bloom filter stored in the file.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BloomFilterMeta {
Expand Down
8 changes: 2 additions & 6 deletions src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ use fastbloom::BloomFilter;
use futures::{AsyncWrite, AsyncWriteExt};
use snafu::ResultExt;

use super::error::{IoSnafu, SerdeJsonSnafu};
use crate::bloom_filter::error::Result;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};

/// The seed used for the Bloom filter.
const SEED: u128 = 42;
use crate::bloom_filter::error::{IoSnafu, Result, SerdeJsonSnafu};
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes, SEED};

/// The false positive rate of the Bloom filter.
const FALSE_POSITIVE_RATE: f64 = 0.01;
Expand Down
29 changes: 28 additions & 1 deletion src/index/src/bloom_filter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to deserialize json"))]
DeserializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("File size too small for bloom filter"))]
FileSizeTooSmall {
size: u64,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected bloom filter meta size"))]
UnexpectedMetaSize {
max_meta_size: u64,
actual_meta_size: u64,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("External error"))]
External {
source: BoxedError,
Expand All @@ -52,7 +75,11 @@ impl ErrorExt for Error {
use Error::*;

match self {
Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected,
Io { .. }
| SerdeJson { .. }
| FileSizeTooSmall { .. }
| UnexpectedMetaSize { .. }
| DeserializeJson { .. } => StatusCode::Unexpected,

External { source, .. } => source.status_code(),
}
Expand Down
249 changes: 249 additions & 0 deletions src/index/src/bloom_filter/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;

use async_trait::async_trait;
use bytes::Bytes;
use common_base::range_read::RangeReader;
use fastbloom::BloomFilter;
use snafu::{ensure, ResultExt};

use super::error::{DeserializeJsonSnafu, IoSnafu};
use super::{BloomFilterSegmentLocation, SEED};
use crate::bloom_filter::error::{FileSizeTooSmallSnafu, Result, UnexpectedMetaSizeSnafu};
use crate::bloom_filter::BloomFilterMeta;

/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
const BLOOM_META_LEN_SIZE: u64 = 4;

/// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB

/// `BloomFilterReader` reads the bloom filter from the file.
#[async_trait]
pub trait BloomFilterReader {
/// Reads range of bytes from the file.
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;

/// Reads bunch of ranges from the file.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;

/// Reads the meta information of the bloom filter.
async fn metadata(&mut self) -> Result<BloomFilterMeta>;

/// Reads a bloom filter with the given location.
async fn bloom_filter(&mut self, loc: &BloomFilterSegmentLocation) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _).await?;
let vec = bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
.expected_items(loc.elem_count);
Ok(bm)
}
}

/// `BloomFilterReaderImpl` reads the bloom filter from the file.
pub struct BloomFilterReaderImpl<R: RangeReader> {
/// The underlying reader.
reader: R,
}

impl<R: RangeReader> BloomFilterReaderImpl<R> {
/// Creates a new `BloomFilterReaderImpl` with the given reader.
pub fn new(reader: R) -> Self {
Self { reader }
}
}

#[async_trait]
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
let buf = self
.reader
.read(offset..offset + size as u64)
.await
.context(IoSnafu)?;
Ok(buf.into())
}

async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.reader.read_vec(ranges).await.context(IoSnafu)
}

async fn metadata(&mut self) -> Result<BloomFilterMeta> {
let metadata = self.reader.metadata().await.context(IoSnafu)?;
let file_size = metadata.content_length;

let mut meta_reader =
BloomFilterMetaReader::new(&mut self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
meta_reader.metadata().await
}
}

/// `BloomFilterMetaReader` reads the metadata of the bloom filter.
struct BloomFilterMetaReader<R: RangeReader> {
reader: R,
file_size: u64,
prefetch_size: u64,
}

impl<R: RangeReader> BloomFilterMetaReader<R> {
pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
Self {
reader,
file_size,
prefetch_size: prefetch_size
.unwrap_or(BLOOM_META_LEN_SIZE)
.max(BLOOM_META_LEN_SIZE),
}
}

/// Reads the metadata of the bloom filter.
///
/// It will first prefetch some bytes from the end of the file,
/// then parse the metadata from the prefetch bytes.
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
ensure!(
self.file_size >= BLOOM_META_LEN_SIZE,
FileSizeTooSmallSnafu {
size: self.file_size,
}
);

let footer_start = self.file_size.saturating_sub(self.prefetch_size);
let suffix = self
.reader
.read(footer_start..self.file_size)
.await
.context(IoSnafu)?;
let suffix_len = suffix.len();
let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
self.validate_meta_size(length)?;

if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
let meta = self
.reader
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
.await
.context(IoSnafu)?;
serde_json::from_slice(&meta).context(DeserializeJsonSnafu)
} else {
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
serde_json::from_slice(meta).context(DeserializeJsonSnafu)
}
}

fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
let suffix_len = suffix.len();
ensure!(
suffix_len >= 4,
FileSizeTooSmallSnafu {
size: suffix_len as u64
}
);
let mut bytes = [0; 4];
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);

Ok(bytes)
}

fn validate_meta_size(&self, length: u64) -> Result<()> {
let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
ensure!(
length <= max_meta_size,
UnexpectedMetaSizeSnafu {
max_meta_size,
actual_meta_size: length,
}
);
Ok(())
}
}

#[cfg(test)]
mod tests {
use futures::io::Cursor;

use super::*;
use crate::bloom_filter::creator::BloomFilterCreator;

async fn mock_bloom_filter_bytes() -> Vec<u8> {
let mut writer = Cursor::new(vec![]);
let mut creator = BloomFilterCreator::new(2);

creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]);
creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]);
creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]);

creator.finish(&mut writer).await.unwrap();

writer.into_inner()
}

#[tokio::test]
async fn test_bloom_filter_meta_reader() {
let bytes = mock_bloom_filter_bytes().await;
let file_size = bytes.len() as u64;

for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
let mut reader =
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
let meta = reader.metadata().await.unwrap();

assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.seg_count, 2);
assert_eq!(meta.row_count, 3);
assert_eq!(meta.bloom_filter_segments.len(), 2);

assert_eq!(meta.bloom_filter_segments[0].offset, 0);
assert_eq!(meta.bloom_filter_segments[0].elem_count, 4);
assert_eq!(
meta.bloom_filter_segments[1].offset,
meta.bloom_filter_segments[0].size
);
assert_eq!(meta.bloom_filter_segments[1].elem_count, 2);
}
}

#[tokio::test]
async fn test_bloom_filter_reader() {
let bytes = mock_bloom_filter_bytes().await;

let mut reader = BloomFilterReaderImpl::new(bytes);
let meta = reader.metadata().await.unwrap();

assert_eq!(meta.bloom_filter_segments.len(), 2);
let bf = reader
.bloom_filter(&meta.bloom_filter_segments[0])
.await
.unwrap();
assert!(bf.contains(&b"a"));
assert!(bf.contains(&b"b"));
assert!(bf.contains(&b"c"));
assert!(bf.contains(&b"d"));

let bf = reader
.bloom_filter(&meta.bloom_filter_segments[1])
.await
.unwrap();
assert!(bf.contains(&b"e"));
assert!(bf.contains(&b"f"));
}
}

0 comments on commit 10b0fdf

Please sign in to comment.