From bb48c3e48854c36a43eb85b2e56c89e1c05597bf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 22 Dec 2022 12:25:46 +0900 Subject: [PATCH] Refactoring to prepare for the addition of dynamic fast field (#1730) * Refactoring to prepare for the addition of dynamic fast field - Exposing insert_key / insert_value - Renamed SSTable::{Reader/Writer}-> SSTable::{ValueReader/ValueWriter} - Added a generic Dictionary object in the sstable crate - Removing the TermDictionary wrapper from tantivy, relying directly on an alias of the generic Dictionary object. - dropped the use of byteorder in sstable. - Stopped scanning / reading the entire dictionary when streaming a range. * Added a benchmark for streaming sstable ranges. * CR comments. Rename deserialize_u64 -> deserialize_vint_u64 * Removed needless allocation, split serialize into serialize and clear. --- src/core/inverted_index_reader.rs | 2 +- src/termdict/fst_termdict/term_info_store.rs | 2 +- src/termdict/fst_termdict/termdict.rs | 17 +- src/termdict/sstable_termdict/mod.rs | 88 +++--- src/termdict/sstable_termdict/termdict.rs | 255 ----------------- src/termdict/tests.rs | 4 +- sstable/Cargo.toml | 9 +- sstable/README.md | 28 ++ sstable/benches/stream_bench.rs | 87 ++++++ sstable/src/block_reader.rs | 21 +- sstable/src/delta.rs | 60 ++-- sstable/src/dictionary.rs | 261 +++++++++++++++++ sstable/src/lib.rs | 266 ++++++++++++++---- sstable/src/merge/heap_merge.rs | 14 +- sstable/src/merge/mod.rs | 16 +- sstable/src/sstable_index.rs | 4 +- .../src}/streamer.rs | 128 +++++---- sstable/src/value.rs | 95 ------- sstable/src/value/mod.rs | 82 ++++++ sstable/src/value/range.rs | 113 ++++++++ sstable/src/value/u64_monotonic.rs | 83 ++++++ sstable/src/value/void.rs | 48 ++++ 22 files changed, 1122 insertions(+), 561 deletions(-) delete mode 100644 src/termdict/sstable_termdict/termdict.rs create mode 100644 sstable/README.md create mode 100644 sstable/benches/stream_bench.rs create mode 100644 sstable/src/dictionary.rs rename {src/termdict/sstable_termdict => sstable/src}/streamer.rs (65%) delete mode 100644 sstable/src/value.rs create mode 100644 sstable/src/value/mod.rs create mode 100644 sstable/src/value/range.rs create mode 100644 sstable/src/value/u64_monotonic.rs create mode 100644 sstable/src/value/void.rs diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index e51421a16d..b986b74959 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -209,7 +209,7 @@ impl InvertedIndexReader { /// /// Most users should prefer using [`Self::read_postings()`] instead. pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> { - let term_info_opt = self.get_term_info_async(term).await?; + let term_info_opt: Option = self.get_term_info_async(term).await?; if let Some(term_info) = term_info_opt { self.postings_file_slice .read_bytes_slice_async(term_info.postings_range.clone()) diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index fabe47ed1b..799479e7d6 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -121,7 +121,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { } impl TermInfoStore { - pub fn open(term_info_store_file: FileSlice) -> crate::Result { + pub fn open(term_info_store_file: FileSlice) -> io::Result { let (len_slice, main_slice) = term_info_store_file.split(16); let mut bytes = len_slice.read_bytes()?; let len = u64::deserialize(&mut bytes)? as usize; diff --git a/src/termdict/fst_termdict/termdict.rs b/src/termdict/fst_termdict/termdict.rs index 563c9a3bc4..01fb9a9181 100644 --- a/src/termdict/fst_termdict/termdict.rs +++ b/src/termdict/fst_termdict/termdict.rs @@ -8,7 +8,6 @@ use tantivy_fst::Automaton; use super::term_info_store::{TermInfoStore, TermInfoStoreWriter}; use super::{TermStreamer, TermStreamerBuilder}; use crate::directory::{FileSlice, OwnedBytes}; -use crate::error::DataCorruption; use crate::postings::TermInfo; use crate::termdict::TermOrdinal; @@ -55,7 +54,7 @@ where W: Write /// to insert_key and insert_value. /// /// Prefer using `.insert(key, value)` - pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { self.fst_builder .insert(key, self.term_ord) .map_err(convert_fst_error)?; @@ -66,7 +65,7 @@ where W: Write /// # Warning /// /// Horribly dangerous internal API. See `.insert_key(...)`. - pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { + pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { self.term_info_store_writer.write_term_info(term_info)?; Ok(()) } @@ -86,10 +85,14 @@ where W: Write } } -fn open_fst_index(fst_file: FileSlice) -> crate::Result> { +fn open_fst_index(fst_file: FileSlice) -> io::Result> { let bytes = fst_file.read_bytes()?; - let fst = Fst::new(bytes) - .map_err(|err| DataCorruption::comment_only(format!("Fst data is corrupted: {:?}", err)))?; + let fst = Fst::new(bytes).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Fst data is corrupted: {:?}", err), + ) + })?; Ok(tantivy_fst::Map::from(fst)) } @@ -114,7 +117,7 @@ pub struct TermDictionary { impl TermDictionary { /// Opens a `TermDictionary`. - pub fn open(file: FileSlice) -> crate::Result { + pub fn open(file: FileSlice) -> io::Result { let (main_slice, footer_len_slice) = file.split_from_end(8); let mut footer_len_bytes = footer_len_slice.read_bytes()?; let footer_size = u64::deserialize(&mut footer_len_bytes)?; diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs index 0c08d88c19..ce329c916f 100644 --- a/src/termdict/sstable_termdict/mod.rs +++ b/src/termdict/sstable_termdict/mod.rs @@ -1,49 +1,64 @@ use std::io; mod merger; -mod streamer; -mod termdict; use std::iter::ExactSizeIterator; use common::VInt; use sstable::value::{ValueReader, ValueWriter}; -use sstable::{BlockReader, SSTable}; +use sstable::SSTable; +use tantivy_fst::automaton::AlwaysMatch; pub use self::merger::TermMerger; -pub use self::streamer::{TermStreamer, TermStreamerBuilder}; -pub use self::termdict::{TermDictionary, TermDictionaryBuilder}; use crate::postings::TermInfo; +/// The term dictionary contains all of the terms in +/// `tantivy index` in a sorted manner. +/// +/// The `Fst` crate is used to associate terms to their +/// respective `TermOrdinal`. The `TermInfoStore` then makes it +/// possible to fetch the associated `TermInfo`. +pub type TermDictionary = sstable::Dictionary; + +/// Builder for the new term dictionary. +pub type TermDictionaryBuilder = sstable::Writer; + +/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// Terms are guaranteed to be sorted. +pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>; + +/// SSTable used to store TermInfo objects. pub struct TermSSTable; impl SSTable for TermSSTable { type Value = TermInfo; - type Reader = TermInfoReader; - type Writer = TermInfoWriter; + type ValueReader = TermInfoValueReader; + type ValueWriter = TermInfoValueWriter; } #[derive(Default)] -pub struct TermInfoReader { +pub struct TermInfoValueReader { term_infos: Vec, } -impl ValueReader for TermInfoReader { +impl ValueReader for TermInfoValueReader { type Value = TermInfo; + #[inline(always)] fn value(&self, idx: usize) -> &TermInfo { &self.term_infos[idx] } - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> { + fn load(&mut self, mut data: &[u8]) -> io::Result { + let len_before = data.len(); self.term_infos.clear(); - let num_els = VInt::deserialize_u64(reader)?; - let mut postings_start = VInt::deserialize_u64(reader)? as usize; - let mut positions_start = VInt::deserialize_u64(reader)? as usize; + let num_els = VInt::deserialize_u64(&mut data)?; + let mut postings_start = VInt::deserialize_u64(&mut data)? as usize; + let mut positions_start = VInt::deserialize_u64(&mut data)? as usize; for _ in 0..num_els { - let doc_freq = VInt::deserialize_u64(reader)? as u32; - let postings_num_bytes = VInt::deserialize_u64(reader)?; - let positions_num_bytes = VInt::deserialize_u64(reader)?; + let doc_freq = VInt::deserialize_u64(&mut data)? as u32; + let postings_num_bytes = VInt::deserialize_u64(&mut data)?; + let positions_num_bytes = VInt::deserialize_u64(&mut data)?; let postings_end = postings_start + postings_num_bytes as usize; let positions_end = positions_start + positions_num_bytes as usize; let term_info = TermInfo { @@ -55,23 +70,24 @@ impl ValueReader for TermInfoReader { postings_start = postings_end; positions_start = positions_end; } - Ok(()) + let consumed_len = len_before - data.len(); + Ok(consumed_len) } } #[derive(Default)] -pub struct TermInfoWriter { +pub struct TermInfoValueWriter { term_infos: Vec, } -impl ValueWriter for TermInfoWriter { +impl ValueWriter for TermInfoValueWriter { type Value = TermInfo; fn write(&mut self, term_info: &TermInfo) { self.term_infos.push(term_info.clone()); } - fn write_block(&mut self, buffer: &mut Vec) { + fn serialize_block(&self, buffer: &mut Vec) { VInt(self.term_infos.len() as u64).serialize_into_vec(buffer); if self.term_infos.is_empty() { return; @@ -83,24 +99,23 @@ impl ValueWriter for TermInfoWriter { VInt(term_info.postings_range.len() as u64).serialize_into_vec(buffer); VInt(term_info.positions_range.len() as u64).serialize_into_vec(buffer); } + } + + fn clear(&mut self) { self.term_infos.clear(); } } #[cfg(test)] mod tests { - use std::io; - use sstable::value::{ValueReader, ValueWriter}; - use super::BlockReader; - use crate::directory::OwnedBytes; use crate::postings::TermInfo; - use crate::termdict::sstable_termdict::TermInfoReader; + use crate::termdict::sstable_termdict::TermInfoValueReader; #[test] - fn test_block_terminfos() -> io::Result<()> { - let mut term_info_writer = super::TermInfoWriter::default(); + fn test_block_terminfos() { + let mut term_info_writer = super::TermInfoValueWriter::default(); term_info_writer.write(&TermInfo { doc_freq: 120u32, postings_range: 17..45, @@ -117,10 +132,9 @@ mod tests { positions_range: 1100..1302, }); let mut buffer = Vec::new(); - term_info_writer.write_block(&mut buffer); - let mut block_reader = make_block_reader(&buffer[..]); - let mut term_info_reader = TermInfoReader::default(); - term_info_reader.read(&mut block_reader)?; + term_info_writer.serialize_block(&mut buffer); + let mut term_info_reader = TermInfoValueReader::default(); + let num_bytes: usize = term_info_reader.load(&buffer[..]).unwrap(); assert_eq!( term_info_reader.value(0), &TermInfo { @@ -129,16 +143,6 @@ mod tests { positions_range: 10..122 } ); - assert!(block_reader.buffer().is_empty()); - Ok(()) - } - - fn make_block_reader(data: &[u8]) -> BlockReader { - let mut buffer = (data.len() as u32).to_le_bytes().to_vec(); - buffer.extend_from_slice(data); - let owned_bytes = OwnedBytes::new(buffer); - let mut block_reader = BlockReader::new(Box::new(owned_bytes)); - block_reader.read_block().unwrap(); - block_reader + assert_eq!(buffer.len(), num_bytes); } } diff --git a/src/termdict/sstable_termdict/termdict.rs b/src/termdict/sstable_termdict/termdict.rs deleted file mode 100644 index b1a5d4c4b4..0000000000 --- a/src/termdict/sstable_termdict/termdict.rs +++ /dev/null @@ -1,255 +0,0 @@ -use std::io; -use std::sync::Arc; - -use common::BinarySerializable; -use once_cell::sync::Lazy; -use sstable::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, Writer}; -use tantivy_fst::automaton::AlwaysMatch; -use tantivy_fst::Automaton; - -use crate::directory::{FileSlice, OwnedBytes}; -use crate::postings::TermInfo; -use crate::termdict::sstable_termdict::{ - TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder, -}; -use crate::termdict::TermOrdinal; - -pub struct TermInfoSSTable; -impl SSTable for TermInfoSSTable { - type Value = TermInfo; - type Reader = TermInfoReader; - type Writer = TermInfoWriter; -} - -/// Builder for the new term dictionary. -pub struct TermDictionaryBuilder { - sstable_writer: Writer, -} - -impl TermDictionaryBuilder { - /// Creates a new `TermDictionaryBuilder` - pub fn create(w: W) -> io::Result { - let sstable_writer = TermSSTable::writer(w); - Ok(TermDictionaryBuilder { sstable_writer }) - } - - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - pub fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { - let key = key_ref.as_ref(); - self.insert_key(key)?; - self.insert_value(value)?; - Ok(()) - } - - /// # Warning - /// Horribly dangerous internal API - /// - /// If used, it must be used by systematically alternating calls - /// to insert_key and insert_value. - /// - /// Prefer using `.insert(key, value)` - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { - self.sstable_writer.write_key(key); - Ok(()) - } - - /// # Warning - /// - /// Horribly dangerous internal API. See `.insert_key(...)`. - pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { - self.sstable_writer.write_value(term_info) - } - - /// Finalize writing the builder, and returns the underlying - /// `Write` object. - pub fn finish(self) -> io::Result { - self.sstable_writer.finalize() - } -} - -static EMPTY_TERM_DICT_FILE: Lazy = Lazy::new(|| { - let term_dictionary_data: Vec = TermDictionaryBuilder::create(Vec::::new()) - .expect("Creating a TermDictionaryBuilder in a Vec should never fail") - .finish() - .expect("Writing in a Vec should never fail"); - FileSlice::from(term_dictionary_data) -}); - -/// The term dictionary contains all of the terms in -/// `tantivy index` in a sorted manner. -/// -/// The `Fst` crate is used to associate terms to their -/// respective `TermOrdinal`. The `TermInfoStore` then makes it -/// possible to fetch the associated `TermInfo`. -pub struct TermDictionary { - sstable_slice: FileSlice, - sstable_index: SSTableIndex, - num_terms: u64, -} - -impl TermDictionary { - pub(crate) fn sstable_reader(&self) -> io::Result> { - let data = self.sstable_slice.read_bytes()?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) fn sstable_reader_block( - &self, - block_addr: BlockAddr, - ) -> io::Result> { - let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) async fn sstable_reader_block_async( - &self, - block_addr: BlockAddr, - ) -> io::Result> { - let data = self - .sstable_slice - .read_bytes_slice_async(block_addr.byte_range) - .await?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) fn sstable_delta_reader(&self) -> io::Result> { - let data = self.sstable_slice.read_bytes()?; - Ok(TermInfoSSTable::delta_reader(data)) - } - - /// Opens a `TermDictionary`. - pub fn open(term_dictionary_file: FileSlice) -> crate::Result { - let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16); - let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; - let index_offset = u64::deserialize(&mut footer_len_bytes)?; - let num_terms = u64::deserialize(&mut footer_len_bytes)?; - let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); - let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) - .map_err(|_| crate::error::DataCorruption::comment_only("SSTable corruption"))?; - Ok(TermDictionary { - sstable_slice, - sstable_index, - num_terms, - }) - } - - /// Creates a term dictionary from the supplied bytes. - pub fn from_bytes(owned_bytes: OwnedBytes) -> crate::Result { - TermDictionary::open(FileSlice::new(Arc::new(owned_bytes))) - } - - /// Creates an empty term dictionary which contains no terms. - pub fn empty() -> Self { - TermDictionary::open(EMPTY_TERM_DICT_FILE.clone()).unwrap() - } - - /// Returns the number of terms in the dictionary. - /// Term ordinals range from 0 to `num_terms() - 1`. - pub fn num_terms(&self) -> usize { - self.num_terms as usize - } - - /// Returns the ordinal associated with a given term. - pub fn term_ord>(&self, key: K) -> io::Result> { - let mut term_ord = 0u64; - let key_bytes = key.as_ref(); - let mut sstable_reader = self.sstable_reader()?; - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - return Ok(Some(term_ord)); - } - term_ord += 1; - } - Ok(None) - } - - /// Returns the term associated with a given term ordinal. - /// - /// Term ordinals are defined as the position of the term in - /// the sorted list of terms. - /// - /// Returns true if and only if the term has been found. - /// - /// Regardless of whether the term is found or not, - /// the buffer may be modified. - pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { - let mut sstable_reader = self.sstable_reader()?; - bytes.clear(); - for _ in 0..(ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { - return Ok(false); - } - } - bytes.extend_from_slice(sstable_reader.key()); - Ok(true) - } - - /// Returns the number of terms in the dictionary. - pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result { - let mut sstable_reader = self.sstable_reader()?; - for _ in 0..(term_ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { - return Ok(TermInfo::default()); - } - } - Ok(sstable_reader.value().clone()) - } - - /// Lookups the value corresponding to the key. - pub fn get>(&self, key: K) -> io::Result> { - if let Some(block_addr) = self.sstable_index.search(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block(block_addr)?; - let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - let term_info = sstable_reader.value().clone(); - return Ok(Some(term_info)); - } - } - } - Ok(None) - } - - /// Lookups the value corresponding to the key. - pub async fn get_async>(&self, key: K) -> io::Result> { - if let Some(block_addr) = self.sstable_index.search(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; - let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - let term_info = sstable_reader.value().clone(); - return Ok(Some(term_info)); - } - } - } - Ok(None) - } - - /// Returns a range builder, to stream all of the terms - /// within an interval. - pub fn range(&self) -> TermStreamerBuilder<'_> { - TermStreamerBuilder::new(self, AlwaysMatch) - } - - /// A stream of all the sorted terms. - pub fn stream(&self) -> io::Result> { - self.range().into_stream() - } - - /// Returns a search builder, to stream all of the terms - /// within the Automaton - pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A> - where A::State: Clone { - TermStreamerBuilder::::new(self, automaton) - } - - #[doc(hidden)] - pub async fn warm_up_dictionary(&self) -> io::Result<()> { - self.sstable_slice.read_bytes_async().await?; - Ok(()) - } -} diff --git a/src/termdict/tests.rs b/src/termdict/tests.rs index 50d040998c..44f8dbe03a 100644 --- a/src/termdict/tests.rs +++ b/src/termdict/tests.rs @@ -1,5 +1,5 @@ use std::path::PathBuf; -use std::str; +use std::{io, str}; use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use crate::directory::{Directory, FileSlice, RamDirectory, TerminatingWrite}; @@ -247,7 +247,7 @@ fn test_empty_string() -> crate::Result<()> { Ok(()) } -fn stream_range_test_dict() -> crate::Result { +fn stream_range_test_dict() -> io::Result { let buffer: Vec = { let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?; for i in 0u8..10u8 { diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 1f50efa7de..3f14d9047d 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -6,8 +6,15 @@ edition = "2021" [dependencies] common = {path="../common", package="tantivy-common"} ciborium = "0.2" -byteorder = "1" serde = "1" +tantivy-fst = "0.4" [dev-dependencies] proptest = "1" +criterion = "0.4" +names = "0.14" +rand = "0.8" + +[[bench]] +name = "stream_bench" +harness = false diff --git a/sstable/README.md b/sstable/README.md new file mode 100644 index 0000000000..b13523618b --- /dev/null +++ b/sstable/README.md @@ -0,0 +1,28 @@ +# SSTable + +The `tantivy-sstable` crate is yet another sstable crate. + +It has been designed to be used in `quickwit`: +- as an alternative to the default tantivy fst dictionary. +- as a way to store the column index for dynamic fast fields. + +The benefit compared to the fst crate is locality. +Searching a key in the fst crate requires downloading the entire dictionary. + +Once the sstable index is downloaded, running a `get` in the sstable +crate only requires a single fetch. + +Right now, the block index and the default block size have been thought +for quickwit, and the performance of a get is very bad. + +# Sorted strings? + +SSTable stands for Sorted String Table. +Strings have to be insert in sorted order. + +That sorted order is used in different ways: +- it makes gets and streaming ranges of keys +possible. +- it allows incremental encoding of the keys +- the front compression is leveraged to optimize +the intersection with an automaton diff --git a/sstable/benches/stream_bench.rs b/sstable/benches/stream_bench.rs new file mode 100644 index 0000000000..2b29a5e993 --- /dev/null +++ b/sstable/benches/stream_bench.rs @@ -0,0 +1,87 @@ +use std::collections::BTreeSet; +use std::io; + +use common::file_slice::FileSlice; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tantivy_sstable::{self, Dictionary, MonotonicU64SSTable}; + +const CHARSET: &'static [u8] = b"abcdefghij"; + +fn generate_key(rng: &mut impl Rng) -> String { + let len = rng.gen_range(3..12); + std::iter::from_fn(|| { + let idx = rng.gen_range(0..CHARSET.len()); + Some(CHARSET[idx] as char) + }) + .take(len) + .collect() +} + +fn prepare_sstable() -> io::Result> { + let mut rng = StdRng::from_seed([3u8; 32]); + let mut els = BTreeSet::new(); + while els.len() < 100_000 { + els.insert(generate_key(&mut rng)); + } + let mut dictionary_builder = Dictionary::::builder(Vec::new())?; + for (ord, word) in els.iter().enumerate() { + dictionary_builder.insert(word, &(ord as u64))?; + } + let buffer = dictionary_builder.finish()?; + let dictionary = Dictionary::open(FileSlice::from(buffer))?; + Ok(dictionary) +} + +fn stream_bench( + dictionary: &Dictionary, + lower: &[u8], + upper: &[u8], + do_scan: bool, +) -> usize { + let mut stream = dictionary + .range() + .ge(lower) + .lt(upper) + .into_stream() + .unwrap(); + if !do_scan { + return 0; + } + let mut count = 0; + while stream.advance() { + count += 1; + } + count +} + +pub fn criterion_benchmark(c: &mut Criterion) { + let dict = prepare_sstable().unwrap(); + c.bench_function("short_scan_init", |b| { + b.iter(|| stream_bench(&dict, b"fa", b"fana", false)) + }); + c.bench_function("short_scan_init_and_scan", |b| { + b.iter(|| { + assert_eq!(stream_bench(&dict, b"fa", b"faz", true), 971); + }) + }); + c.bench_function("full_scan_init_and_scan_full_with_bound", |b| { + b.iter(|| { + assert_eq!(stream_bench(&dict, b"", b"z", true), 100_000); + }) + }); + c.bench_function("full_scan_init_and_scan_full_no_bounds", |b| { + b.iter(|| { + let mut stream = dict.stream().unwrap(); + let mut count = 0; + while stream.advance() { + count += 1; + } + count + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index 58ccf457a6..c16440afc1 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -1,6 +1,5 @@ -use std::io::{self, Read}; - -use byteorder::{LittleEndian, ReadBytesExt}; +use std::io; +use std::ops::Range; pub struct BlockReader<'a> { buffer: Vec, @@ -8,6 +7,13 @@ pub struct BlockReader<'a> { offset: usize, } +#[inline] +fn read_u32(read: &mut dyn io::Read) -> io::Result { + let mut buf = [0u8; 4]; + read.read_exact(&mut buf)?; + Ok(u32::from_le_bytes(buf)) +} + impl<'a> BlockReader<'a> { pub fn new(reader: Box) -> BlockReader<'a> { BlockReader { @@ -24,13 +30,13 @@ impl<'a> BlockReader<'a> { } #[inline(always)] - pub fn buffer_from_to(&self, start: usize, end: usize) -> &[u8] { - &self.buffer[start..end] + pub fn buffer_from_to(&self, range: Range) -> &[u8] { + &self.buffer[range] } pub fn read_block(&mut self) -> io::Result { self.offset = 0; - let block_len_res = self.reader.read_u32::(); + let block_len_res = read_u32(self.reader.as_mut()); if let Err(err) = &block_len_res { if err.kind() == io::ErrorKind::UnexpectedEof { return Ok(false); @@ -46,14 +52,17 @@ impl<'a> BlockReader<'a> { Ok(true) } + #[inline(always)] pub fn offset(&self) -> usize { self.offset } + #[inline(always)] pub fn advance(&mut self, num_bytes: usize) { self.offset += num_bytes; } + #[inline(always)] pub fn buffer(&self) -> &[u8] { &self.buffer[self.offset..] } diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 775ebe09bd..ebcadf0a02 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -16,6 +16,8 @@ where W: io::Write block: Vec, write: CountingWriter>, value_writer: TValueWriter, + // Only here to avoid allocations. + stateless_buffer: Vec, } impl DeltaWriter @@ -28,6 +30,7 @@ where block: Vec::with_capacity(BLOCK_LEN * 2), write: CountingWriter::wrap(BufWriter::new(wrt)), value_writer: TValueWriter::default(), + stateless_buffer: Vec::new(), } } } @@ -42,15 +45,16 @@ where return Ok(None); } let start_offset = self.write.written_bytes() as usize; - // TODO avoid buffer allocation - let mut buffer = Vec::new(); - self.value_writer.write_block(&mut buffer); + let buffer: &mut Vec = &mut self.stateless_buffer; + self.value_writer.serialize_block(buffer); + self.value_writer.clear(); let block_len = buffer.len() + self.block.len(); self.write.write_all(&(block_len as u32).to_le_bytes())?; self.write.write_all(&buffer[..])?; self.write.write_all(&self.block[..])?; let end_offset = self.write.written_bytes() as usize; self.block.clear(); + buffer.clear(); Ok(Some(start_offset..end_offset)) } @@ -84,15 +88,14 @@ where Ok(None) } - pub fn finalize(self) -> CountingWriter> { + pub fn finish(self) -> CountingWriter> { self.write } } pub struct DeltaReader<'a, TValueReader> { common_prefix_len: usize, - suffix_start: usize, - suffix_end: usize, + suffix_range: Range, value_reader: TValueReader, block_reader: BlockReader<'a>, idx: usize, @@ -105,13 +108,16 @@ where TValueReader: value::ValueReader DeltaReader { idx: 0, common_prefix_len: 0, - suffix_start: 0, - suffix_end: 0, + suffix_range: 0..0, value_reader: TValueReader::default(), block_reader: BlockReader::new(Box::new(reader)), } } + pub fn empty() -> Self { + DeltaReader::new(&b""[..]) + } + fn deserialize_vint(&mut self) -> u64 { self.block_reader.deserialize_u64() } @@ -140,15 +146,14 @@ where TValueReader: value::ValueReader } fn read_delta_key(&mut self) -> bool { - if let Some((keep, add)) = self.read_keep_add() { - self.common_prefix_len = keep; - self.suffix_start = self.block_reader.offset(); - self.suffix_end = self.suffix_start + add; - self.block_reader.advance(add); - true - } else { - false - } + let Some((keep, add)) = self.read_keep_add() else { + return false; + }; + self.common_prefix_len = keep; + let suffix_start = self.block_reader.offset(); + self.suffix_range = suffix_start..(suffix_start + add); + self.block_reader.advance(add); + true } pub fn advance(&mut self) -> io::Result { @@ -156,7 +161,8 @@ where TValueReader: value::ValueReader if !self.block_reader.read_block()? { return Ok(false); } - self.value_reader.read(&mut self.block_reader)?; + let consumed_len = self.value_reader.load(self.block_reader.buffer())?; + self.block_reader.advance(consumed_len); self.idx = 0; } else { self.idx += 1; @@ -167,16 +173,30 @@ where TValueReader: value::ValueReader Ok(true) } + #[inline(always)] pub fn common_prefix_len(&self) -> usize { self.common_prefix_len } + #[inline(always)] pub fn suffix(&self) -> &[u8] { - self.block_reader - .buffer_from_to(self.suffix_start, self.suffix_end) + self.block_reader.buffer_from_to(self.suffix_range.clone()) } + #[inline(always)] pub fn value(&self) -> &TValueReader::Value { self.value_reader.value(self.idx) } } + +#[cfg(test)] +mod tests { + use super::DeltaReader; + use crate::value::U64MonotonicValueReader; + + #[test] + fn test_empty() { + let mut delta_reader: DeltaReader = DeltaReader::empty(); + assert!(!delta_reader.advance().unwrap()); + } +} diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs new file mode 100644 index 0000000000..0ec3d6ab17 --- /dev/null +++ b/sstable/src/dictionary.rs @@ -0,0 +1,261 @@ +use std::io; +use std::marker::PhantomData; +use std::ops::{Bound, RangeBounds}; +use std::sync::Arc; + +use common::file_slice::FileSlice; +use common::{BinarySerializable, OwnedBytes}; +use tantivy_fst::automaton::AlwaysMatch; +use tantivy_fst::Automaton; + +use crate::streamer::{Streamer, StreamerBuilder}; +use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal}; + +/// An SSTable is a sorted map that associates sorted `&[u8]` keys +/// to any kind of typed values. +/// +/// The SSTable is organized in blocks. +/// In each block, keys and values are encoded separately. +/// +/// The keys are encoded using incremental encoding. +/// The values on the other hand, are encoded according to a value-specific +/// codec defined in the TSSTable generic argument. +/// +/// Finally, an index is joined to the Dictionary to make it possible, +/// given a key to identify which block contains this key. +/// +/// The codec was designed in such a way that the sstable +/// reader is not aware of block, and yet can read any sequence of blocks, +/// as long as the slice of bytes it is given starts and stops at +/// block boundary. +/// +/// (See also README.md) +pub struct Dictionary { + pub sstable_slice: FileSlice, + pub sstable_index: SSTableIndex, + num_terms: u64, + phantom_data: PhantomData, +} + +impl Dictionary { + pub fn builder(wrt: W) -> io::Result> { + Ok(TSSTable::writer(wrt)) + } + + pub(crate) fn sstable_reader(&self) -> io::Result> { + let data = self.sstable_slice.read_bytes()?; + Ok(TSSTable::reader(data)) + } + + pub(crate) fn sstable_reader_block( + &self, + block_addr: BlockAddr, + ) -> io::Result> { + let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; + Ok(TSSTable::reader(data)) + } + + pub(crate) async fn sstable_reader_block_async( + &self, + block_addr: BlockAddr, + ) -> io::Result> { + let data = self + .sstable_slice + .read_bytes_slice_async(block_addr.byte_range) + .await?; + Ok(TSSTable::reader(data)) + } + + pub(crate) fn sstable_delta_reader_for_key_range( + &self, + key_range: impl RangeBounds<[u8]>, + ) -> io::Result> { + let slice = self.file_slice_for_range(key_range); + let data = slice.read_bytes()?; + Ok(TSSTable::delta_reader(data)) + } + + /// This function returns a file slice covering a set of sstable blocks + /// that include the key range passed in arguments. + /// + /// It works by identifying + /// - `first_block`: the block containing the start boudary key + /// - `last_block`: the block containing the end boundary key. + /// + /// And then returning the range that spans over all blocks between. + /// and including first_block and last_block, aka: + /// `[first_block.start_offset .. last_block.end_offset)` + /// + /// Technically this function does not provide the tightest fit, as + /// for simplification, it treats the start bound of the `key_range` + /// as if it was inclusive, even if it is exclusive. + /// On the rare edge case where a user asks for `(start_key, end_key]` + /// and `start_key` happens to be the last key of a block, we return a + /// slice that is the first block was not necessary. + fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice { + let start_bound: Bound = match key_range.start_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + let Some(first_block_addr) = self.sstable_index.search_block(key) else { + return FileSlice::empty(); + }; + Bound::Included(first_block_addr.byte_range.start) + } + Bound::Unbounded => Bound::Unbounded, + }; + let end_bound: Bound = match key_range.end_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + if let Some(block_addr) = self.sstable_index.search_block(key) { + Bound::Excluded(block_addr.byte_range.end) + } else { + Bound::Unbounded + } + } + Bound::Unbounded => Bound::Unbounded, + }; + self.sstable_slice.slice((start_bound, end_bound)) + } + + /// Opens a `TermDictionary`. + pub fn open(term_dictionary_file: FileSlice) -> io::Result { + let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16); + let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; + let index_offset = u64::deserialize(&mut footer_len_bytes)?; + let num_terms = u64::deserialize(&mut footer_len_bytes)?; + let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); + let sstable_index_bytes = index_slice.read_bytes()?; + let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; + Ok(Dictionary { + sstable_slice, + sstable_index, + num_terms, + phantom_data: PhantomData, + }) + } + + /// Creates a term dictionary from the supplied bytes. + pub fn from_bytes(owned_bytes: OwnedBytes) -> io::Result { + Dictionary::open(FileSlice::new(Arc::new(owned_bytes))) + } + + /// Creates an empty term dictionary which contains no terms. + pub fn empty() -> Self { + let term_dictionary_data: Vec = Self::builder(Vec::::new()) + .expect("Creating a TermDictionaryBuilder in a Vec should never fail") + .finish() + .expect("Writing in a Vec should never fail"); + let empty_dict_file = FileSlice::from(term_dictionary_data); + Dictionary::open(empty_dict_file).unwrap() + } + + /// Returns the number of terms in the dictionary. + /// Term ordinals range from 0 to `num_terms() - 1`. + pub fn num_terms(&self) -> usize { + self.num_terms as usize + } + + /// Returns the ordinal associated with a given term. + pub fn term_ord>(&self, key: K) -> io::Result> { + let mut term_ord = 0u64; + let key_bytes = key.as_ref(); + let mut sstable_reader = self.sstable_reader()?; + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + return Ok(Some(term_ord)); + } + term_ord += 1; + } + Ok(None) + } + + /// Returns the term associated with a given term ordinal. + /// + /// Term ordinals are defined as the position of the term in + /// the sorted list of terms. + /// + /// Returns true if and only if the term has been found. + /// + /// Regardless of whether the term is found or not, + /// the buffer may be modified. + pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { + let mut sstable_reader = self.sstable_reader()?; + bytes.clear(); + for _ in 0..(ord + 1) { + if !sstable_reader.advance().unwrap_or(false) { + return Ok(false); + } + } + bytes.extend_from_slice(sstable_reader.key()); + Ok(true) + } + + /// Returns the number of terms in the dictionary. + pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result> { + let mut sstable_reader = self.sstable_reader()?; + for _ in 0..(term_ord + 1) { + if !sstable_reader.advance().unwrap_or(false) { + return Ok(None); + } + } + Ok(Some(sstable_reader.value().clone())) + } + + /// Lookups the value corresponding to the key. + pub fn get>(&self, key: K) -> io::Result> { + if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + let mut sstable_reader = self.sstable_reader_block(block_addr)?; + let key_bytes = key.as_ref(); + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + let value = sstable_reader.value().clone(); + return Ok(Some(value)); + } + } + } + Ok(None) + } + + /// Lookups the value corresponding to the key. + pub async fn get_async>(&self, key: K) -> io::Result> { + if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; + let key_bytes = key.as_ref(); + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + let value = sstable_reader.value().clone(); + return Ok(Some(value)); + } + } + } + Ok(None) + } + + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range(&self) -> StreamerBuilder<'_, TSSTable> { + StreamerBuilder::new(self, AlwaysMatch) + } + + /// A stream of all the sorted terms. + pub fn stream(&self) -> io::Result> { + self.range().into_stream() + } + + /// Returns a search builder, to stream all of the terms + /// within the Automaton + pub fn search<'a, A: Automaton + 'a>( + &'a self, + automaton: A, + ) -> StreamerBuilder<'a, TSSTable, A> + where + A::State: Clone, + { + StreamerBuilder::::new(self, automaton) + } + + #[doc(hidden)] + pub async fn warm_up_dictionary(&self) -> io::Result<()> { + self.sstable_slice.read_bytes_async().await?; + Ok(()) + } +} diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 59288f165a..c01a58acb2 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -1,24 +1,34 @@ use std::io::{self, Write}; +use std::ops::Range; use std::usize; use merge::ValueMerger; mod delta; +mod dictionary; pub mod merge; +mod streamer; pub mod value; mod sstable_index; pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; pub(crate) mod vint; +pub use dictionary::Dictionary; +pub use streamer::{Streamer, StreamerBuilder}; mod block_reader; pub use self::block_reader::BlockReader; pub use self::delta::{DeltaReader, DeltaWriter}; pub use self::merge::VoidMerge; -use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter}; +use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter}; +use crate::value::{RangeValueReader, RangeValueWriter}; + +pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; +/// Given two byte string returns the length of +/// the longest common prefix. fn common_prefix_len(left: &[u8], right: &[u8]) -> usize { left.iter() .cloned() @@ -30,36 +40,37 @@ fn common_prefix_len(left: &[u8], right: &[u8]) -> usize { #[derive(Debug, Copy, Clone)] pub struct SSTableDataCorruption; +/// SSTable makes it possible to read and write +/// sstables with typed values. pub trait SSTable: Sized { - type Value; - type Reader: ValueReader; - type Writer: ValueWriter; + type Value: Clone; + type ValueReader: ValueReader; + type ValueWriter: ValueWriter; - fn delta_writer(write: W) -> DeltaWriter { + fn delta_writer(write: W) -> DeltaWriter { DeltaWriter::new(write) } - fn writer(write: W) -> Writer { - Writer { - previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), - num_terms: 0u64, - index_builder: SSTableIndexBuilder::default(), - delta_writer: Self::delta_writer(write), - first_ordinal_of_the_block: 0u64, - } + fn writer(wrt: W) -> Writer { + Writer::new(wrt) } - fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> { + fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> { DeltaReader::new(reader) } - fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> { + fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> { Reader { key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), delta_reader: Self::delta_reader(reader), } } + /// Returns an empty static reader. + fn create_empty_reader() -> Reader<'static, Self::ValueReader> { + Self::reader(&b""[..]) + } + fn merge>( io_readers: Vec, w: W, @@ -76,21 +87,47 @@ pub struct VoidSSTable; impl SSTable for VoidSSTable { type Value = (); - type Reader = value::VoidReader; - type Writer = value::VoidWriter; + type ValueReader = value::VoidValueReader; + type ValueWriter = value::VoidValueWriter; } +/// SSTable associated keys to u64 +/// sorted in order. +/// +/// In other words, two keys `k1` and `k2` +/// such that `k1` <= `k2`, are required to observe +/// `range_sstable[k1] <= range_sstable[k2]`. #[allow(dead_code)] -pub struct SSTableMonotonicU64; +pub struct MonotonicU64SSTable; -impl SSTable for SSTableMonotonicU64 { +impl SSTable for MonotonicU64SSTable { type Value = u64; - type Reader = U64MonotonicReader; + type ValueReader = U64MonotonicValueReader; - type Writer = U64MonotonicWriter; + type ValueWriter = U64MonotonicValueWriter; } +/// SSTable associating keys to ranges. +/// The range are required to partition the +/// space. +/// +/// In other words, two consecutive keys `k1` and `k2` +/// are required to observe +/// `range_sstable[k1].end == range_sstable[k2].start`. +/// +/// The first range is not required to start at `0`. +pub struct RangeSSTable; + +impl SSTable for RangeSSTable { + type Value = Range; + + type ValueReader = RangeValueReader; + + type ValueWriter = RangeValueWriter; +} + +/// SSTable reader. pub struct Reader<'a, TValueReader> { key: Vec, delta_reader: DeltaReader<'a, TValueReader>, @@ -111,16 +148,19 @@ where TValueReader: ValueReader Ok(true) } + #[inline(always)] pub fn key(&self) -> &[u8] { &self.key } + #[inline(always)] pub fn value(&self) -> &TValueReader::Value { self.delta_reader.value() } } impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> { + #[inline(always)] fn as_ref(&self) -> &[u8] { &self.key } @@ -141,11 +181,53 @@ where W: io::Write, TValueWriter: value::ValueWriter, { - pub(crate) fn current_key(&self) -> &[u8] { + /// Use `Self::new`. This method only exists to match its + /// equivalent in fst. + /// TODO remove this function. (See Issue #1727) + #[doc(hidden)] + pub fn create(wrt: W) -> io::Result { + Ok(Self::new(wrt)) + } + + /// Creates a new `TermDictionaryBuilder`. + pub fn new(wrt: W) -> Self { + Writer { + previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), + num_terms: 0u64, + index_builder: SSTableIndexBuilder::default(), + delta_writer: DeltaWriter::new(wrt), + first_ordinal_of_the_block: 0u64, + } + } + + /// Returns the last inserted key. + /// If no key has been inserted yet, or the block was just + /// flushed, this function returns "". + #[inline(always)] + pub(crate) fn last_inserted_key(&self) -> &[u8] { &self.previous_key[..] } - pub fn write_key(&mut self, key: &[u8]) { + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + #[inline] + pub fn insert>( + &mut self, + key: K, + value: &TValueWriter::Value, + ) -> io::Result<()> { + self.insert_key(key.as_ref())?; + self.insert_value(value)?; + Ok(()) + } + + /// # Warning + /// + /// Horribly dangerous internal API. See `.insert(...)`. + #[doc(hidden)] + #[inline] + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { // If this is the first key in the block, we use it to // shorten the last term in the last block. if self.first_ordinal_of_the_block == self.num_terms { @@ -165,16 +247,15 @@ where self.previous_key.resize(key.len(), 0u8); self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]); self.delta_writer.write_suffix(keep_len, &key[keep_len..]); - } - - #[allow(dead_code)] - pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> { - self.write_key(key); - self.write_value(value)?; Ok(()) } - pub fn write_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> { + /// # Warning + /// + /// Horribly dangerous internal API. See `.insert(...)`. + #[doc(hidden)] + #[inline] + pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> { self.delta_writer.write_value(value); self.num_terms += 1u64; self.flush_block_if_required() @@ -193,7 +274,7 @@ where Ok(()) } - pub fn finalize(mut self) -> io::Result { + pub fn finish(mut self) -> io::Result { if let Some(byte_range) = self.delta_writer.flush_block()? { self.index_builder.add_block( &self.previous_key[..], @@ -202,7 +283,7 @@ where ); self.first_ordinal_of_the_block = self.num_terms; } - let mut wrt = self.delta_writer.finalize(); + let mut wrt = self.delta_writer.finish(); wrt.write_all(&0u32.to_le_bytes())?; let offset = wrt.written_bytes(); @@ -217,8 +298,9 @@ where #[cfg(test)] mod test { use std::io; + use std::ops::Bound; - use super::{common_prefix_len, SSTable, SSTableMonotonicU64, VoidMerge, VoidSSTable}; + use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable}; fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) { assert_eq!( @@ -246,10 +328,10 @@ mod test { let mut buffer = vec![]; { let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&long_key[..], &()).is_ok()); - assert!(sstable_writer.write(&[0, 3, 4], &()).is_ok()); - assert!(sstable_writer.write(&long_key2[..], &()).is_ok()); - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.insert(&long_key[..], &()).is_ok()); + assert!(sstable_writer.insert(&[0, 3, 4], &()).is_ok()); + assert!(sstable_writer.insert(&long_key2[..], &()).is_ok()); + assert!(sstable_writer.finish().is_ok()); } let mut sstable_reader = VoidSSTable::reader(&buffer[..]); assert!(sstable_reader.advance().unwrap()); @@ -266,10 +348,10 @@ mod test { let mut buffer = vec![]; { let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&[17u8], &()).is_ok()); - assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok()); - assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok()); - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.insert(&[17u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8, 18u8, 19u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8, 20u8], &()).is_ok()); + assert!(sstable_writer.finish().is_ok()); } assert_eq!( &buffer, @@ -304,8 +386,8 @@ mod test { fn test_simple_sstable_non_increasing_key() { let mut buffer = vec![]; let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&[17u8], &()).is_ok()); - assert!(sstable_writer.write(&[16u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8], &()).is_ok()); + assert!(sstable_writer.insert(&[16u8], &()).is_ok()); } #[test] @@ -313,9 +395,9 @@ mod test { let mut buffer = Vec::new(); { let mut writer = VoidSSTable::writer(&mut buffer); - writer.write(b"abcd", &()).unwrap(); - writer.write(b"abe", &()).unwrap(); - writer.finalize().unwrap(); + writer.insert(b"abcd", &()).unwrap(); + writer.insert(b"abe", &()).unwrap(); + writer.finish().unwrap(); } let mut output = Vec::new(); assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); @@ -327,9 +409,12 @@ mod test { let mut buffer = Vec::new(); { let mut writer = VoidSSTable::writer(&mut buffer); - writer.write(b"abcd", &()).unwrap(); - writer.write(b"abe", &()).unwrap(); - writer.finalize().unwrap(); + assert_eq!(writer.last_inserted_key(), b""); + writer.insert(b"abcd", &()).unwrap(); + assert_eq!(writer.last_inserted_key(), b"abcd"); + writer.insert(b"abe", &()).unwrap(); + assert_eq!(writer.last_inserted_key(), b"abe"); + writer.finish().unwrap(); } let mut output = Vec::new(); assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); @@ -339,12 +424,12 @@ mod test { #[test] fn test_sstable_u64() -> io::Result<()> { let mut buffer = Vec::new(); - let mut writer = SSTableMonotonicU64::writer(&mut buffer); - writer.write(b"abcd", &1u64)?; - writer.write(b"abe", &4u64)?; - writer.write(b"gogo", &4324234234234234u64)?; - writer.finalize()?; - let mut reader = SSTableMonotonicU64::reader(&buffer[..]); + let mut writer = MonotonicU64SSTable::writer(&mut buffer); + writer.insert(b"abcd", &1u64)?; + writer.insert(b"abe", &4u64)?; + writer.insert(b"gogo", &4324234234234234u64)?; + writer.finish()?; + let mut reader = MonotonicU64SSTable::reader(&buffer[..]); assert!(reader.advance()?); assert_eq!(reader.key(), b"abcd"); assert_eq!(reader.value(), &1u64); @@ -357,4 +442,75 @@ mod test { assert!(!reader.advance()?); Ok(()) } + + #[test] + fn test_sstable_empty() { + let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader(); + assert!(!sstable_range_empty.advance().unwrap()); + } + + use common::file_slice::FileSlice; + use proptest::prelude::*; + + use crate::Dictionary; + + fn bound_strategy() -> impl Strategy> { + prop_oneof![ + Just(Bound::::Unbounded), + "[a-d]*".prop_map(|key| Bound::Included(key)), + "[a-d]*".prop_map(|key| Bound::Excluded(key)), + ] + } + + fn extract_key(bound: Bound<&String>) -> Option<&str> { + match bound.as_ref() { + Bound::Included(key) => Some(key.as_str()), + Bound::Excluded(key) => Some(key.as_str()), + Bound::Unbounded => None, + } + } + + fn bounds_strategy() -> impl Strategy, Bound)> { + (bound_strategy(), bound_strategy()).prop_filter( + "Lower bound <= Upper bound", + |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) { + (None, _) => true, + (_, None) => true, + (left, right) => left <= right, + }, + ) + } + + proptest! { + #[test] + fn test_prop_test_ranges(words in prop::collection::btree_set("[a-d]*", 1..100), + (lower_bound, upper_bound) in bounds_strategy(), + ) { + // TODO tweak block size. + let mut builder = Dictionary::::builder(Vec::new()).unwrap(); + for word in &words { + builder.insert(word.as_bytes(), &()).unwrap(); + } + let buffer: Vec = builder.finish().unwrap(); + let dictionary: Dictionary = Dictionary::open(FileSlice::from(buffer)).unwrap(); + let mut range_builder = dictionary.range(); + range_builder = match lower_bound.as_ref() { + Bound::Included(key) => range_builder.ge(key.as_bytes()), + Bound::Excluded(key) => range_builder.gt(key.as_bytes()), + Bound::Unbounded => range_builder, + }; + range_builder = match upper_bound.as_ref() { + Bound::Included(key) => range_builder.le(key.as_bytes()), + Bound::Excluded(key) => range_builder.lt(key.as_bytes()), + Bound::Unbounded => range_builder, + }; + let mut stream = range_builder.into_stream().unwrap(); + let mut btree_set_range = words.range((lower_bound, upper_bound)); + while stream.advance() { + let val = btree_set_range.next().unwrap(); + assert_eq!(val.as_bytes(), stream.key()); + } + assert!(btree_set_range.next().is_none()); + } + } } diff --git a/sstable/src/merge/heap_merge.rs b/sstable/src/merge/heap_merge.rs index 9e852918d6..39840b0881 100644 --- a/sstable/src/merge/heap_merge.rs +++ b/sstable/src/merge/heap_merge.rs @@ -28,11 +28,11 @@ impl> PartialEq for HeapItem { #[allow(dead_code)] pub fn merge_sstable>( - readers: Vec>, - mut writer: Writer, + readers: Vec>, + mut writer: Writer, mut merger: M, ) -> io::Result<()> { - let mut heap: BinaryHeap>> = + let mut heap: BinaryHeap>> = BinaryHeap::with_capacity(readers.len()); for mut reader in readers { if reader.advance()? { @@ -43,7 +43,7 @@ pub fn merge_sstable>( let len = heap.len(); let mut value_merger; if let Some(mut head) = heap.peek_mut() { - writer.write_key(head.0.key()); + writer.insert_key(head.0.key()).unwrap(); value_merger = merger.new_value(head.0.value()); if !head.0.advance()? { PeekMut::pop(head); @@ -53,7 +53,7 @@ pub fn merge_sstable>( } for _ in 0..len - 1 { if let Some(mut head) = heap.peek_mut() { - if head.0.key() == writer.current_key() { + if head.0.key() == writer.last_inserted_key() { value_merger.add(head.0.value()); if !head.0.advance()? { PeekMut::pop(head); @@ -64,9 +64,9 @@ pub fn merge_sstable>( break; } let value = value_merger.finish(); - writer.write_value(&value)?; + writer.insert_value(&value)?; writer.flush_block_if_required()?; } - writer.finalize()?; + writer.finish()?; Ok(()) } diff --git a/sstable/src/merge/mod.rs b/sstable/src/merge/mod.rs index 3170500b4e..083efb9ad9 100644 --- a/sstable/src/merge/mod.rs +++ b/sstable/src/merge/mod.rs @@ -71,7 +71,7 @@ mod tests { use std::collections::{BTreeMap, BTreeSet}; use std::str; - use super::super::{SSTable, SSTableMonotonicU64, VoidSSTable}; + use super::super::{MonotonicU64SSTable, SSTable, VoidSSTable}; use super::{U64Merge, VoidMerge}; fn write_sstable(keys: &[&'static str]) -> Vec { @@ -79,9 +79,9 @@ mod tests { { let mut sstable_writer = VoidSSTable::writer(&mut buffer); for &key in keys { - assert!(sstable_writer.write(key.as_bytes(), &()).is_ok()); + assert!(sstable_writer.insert(key.as_bytes(), &()).is_ok()); } - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.finish().is_ok()); } buffer } @@ -89,11 +89,11 @@ mod tests { fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec { let mut buffer: Vec = vec![]; { - let mut sstable_writer = SSTableMonotonicU64::writer(&mut buffer); + let mut sstable_writer = MonotonicU64SSTable::writer(&mut buffer); for (key, val) in keys { - assert!(sstable_writer.write(key.as_bytes(), val).is_ok()); + assert!(sstable_writer.insert(key.as_bytes(), val).is_ok()); } - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.finish().is_ok()); } buffer } @@ -132,8 +132,8 @@ mod tests { } } let mut w = Vec::new(); - assert!(SSTableMonotonicU64::merge(sstables_ref, &mut w, U64Merge).is_ok()); - let mut reader = SSTableMonotonicU64::reader(&w[..]); + assert!(MonotonicU64SSTable::merge(sstables_ref, &mut w, U64Merge).is_ok()); + let mut reader = MonotonicU64SSTable::reader(&w[..]); for (k, v) in merged { assert!(reader.advance().unwrap()); assert_eq!(reader.key(), k.as_bytes()); diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index b283b961c8..8e73918f97 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -15,7 +15,7 @@ impl SSTableIndex { ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption) } - pub fn search(&self, key: &[u8]) -> Option { + pub fn search_block(&self, key: &[u8]) -> Option { self.blocks .iter() .find(|block| &block.last_key_or_greater[..] >= key) @@ -105,7 +105,7 @@ mod tests { sstable_builder.serialize(&mut buffer).unwrap(); let sstable_index = SSTableIndex::load(&buffer[..]).unwrap(); assert_eq!( - sstable_index.search(b"bbbde"), + sstable_index.search_block(b"bbbde"), Some(BlockAddr { first_ordinal: 10u64, byte_range: 30..40 diff --git a/src/termdict/sstable_termdict/streamer.rs b/sstable/src/streamer.rs similarity index 65% rename from src/termdict/sstable_termdict/streamer.rs rename to sstable/src/streamer.rs index 8289862ec7..752b39b645 100644 --- a/src/termdict/sstable_termdict/streamer.rs +++ b/sstable/src/streamer.rs @@ -4,31 +4,39 @@ use std::ops::Bound; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; -use super::TermDictionary; -use crate::postings::TermInfo; -use crate::termdict::sstable_termdict::TermInfoReader; -use crate::termdict::TermOrdinal; +use crate::dictionary::Dictionary; +use crate::{SSTable, TermOrdinal}; -/// `TermStreamerBuilder` is a helper object used to define +/// `StreamerBuilder` is a helper object used to define /// a range of terms that should be streamed. -pub struct TermStreamerBuilder<'a, A = AlwaysMatch> +pub struct StreamerBuilder<'a, TSSTable, A = AlwaysMatch> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { - term_dict: &'a TermDictionary, + term_dict: &'a Dictionary, automaton: A, lower: Bound>, upper: Bound>, } -impl<'a, A> TermStreamerBuilder<'a, A> +fn bound_as_byte_slice(bound: &Bound>) -> Bound<&[u8]> { + match bound.as_ref() { + Bound::Included(key) => Bound::Included(key.as_slice()), + Bound::Excluded(key) => Bound::Excluded(key.as_slice()), + Bound::Unbounded => Bound::Unbounded, + } +} + +impl<'a, TSSTable, A> StreamerBuilder<'a, TSSTable, A> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { - pub(crate) fn new(term_dict: &'a TermDictionary, automaton: A) -> Self { - TermStreamerBuilder { + pub(crate) fn new(term_dict: &'a Dictionary, automaton: A) -> Self { + StreamerBuilder { term_dict, automaton, lower: Bound::Unbounded, @@ -61,12 +69,18 @@ where } /// Creates the stream corresponding to the range - /// of terms defined using the `TermStreamerBuilder`. - pub fn into_stream(self) -> io::Result> { + /// of terms defined using the `StreamerBuilder`. + pub fn into_stream(self) -> io::Result> { // TODO Optimize by skipping to the right first block. let start_state = self.automaton.start(); - let delta_reader = self.term_dict.sstable_delta_reader()?; - Ok(TermStreamer { + let key_range = ( + bound_as_byte_slice(&self.lower), + bound_as_byte_slice(&self.upper), + ); + let delta_reader = self + .term_dict + .sstable_delta_reader_for_key_range(key_range)?; + Ok(Streamer { automaton: self.automaton, states: vec![start_state], delta_reader, @@ -78,26 +92,28 @@ where } } -/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// `Streamer` acts as a cursor over a range of terms of a segment. /// Terms are guaranteed to be sorted. -pub struct TermStreamer<'a, A = AlwaysMatch> +pub struct Streamer<'a, TSSTable, A = AlwaysMatch> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { automaton: A, states: Vec, - delta_reader: sstable::DeltaReader<'a, TermInfoReader>, + delta_reader: crate::DeltaReader<'a, TSSTable::ValueReader>, key: Vec, term_ord: Option, lower_bound: Bound>, upper_bound: Bound>, } -impl<'a, A> TermStreamer<'a, A> +impl<'a, TSSTable, A> Streamer<'a, TSSTable, A> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { /// Advance position the stream on the next item. /// Before the first call to `.advance()`, the stream @@ -174,13 +190,13 @@ where /// /// Calling `.value()` before the first call to `.advance()` returns /// `V::default()`. - pub fn value(&self) -> &TermInfo { + pub fn value(&self) -> &TSSTable::Value { self.delta_reader.value() } /// Return the next `(key, value)` pair. #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> { + pub fn next(&mut self) -> Option<(&[u8], &TSSTable::Value)> { if self.advance() { Some((self.key(), self.value())) } else { @@ -191,60 +207,54 @@ where #[cfg(test)] mod tests { - use super::super::TermDictionary; - use crate::directory::OwnedBytes; - use crate::postings::TermInfo; - - fn make_term_info(i: usize) -> TermInfo { - TermInfo { - doc_freq: 1000u32 + i as u32, - postings_range: (i + 10) * (i * 10)..((i + 1) + 10) * ((i + 1) * 10), - positions_range: i * 500..(i + 1) * 500, - } - } + use std::io; + + use common::OwnedBytes; + + use crate::{Dictionary, MonotonicU64SSTable}; - fn create_test_term_dictionary() -> crate::Result { - let mut term_dict_builder = super::super::TermDictionaryBuilder::create(Vec::new())?; - term_dict_builder.insert(b"abaisance", &make_term_info(0))?; - term_dict_builder.insert(b"abalation", &make_term_info(1))?; - term_dict_builder.insert(b"abalienate", &make_term_info(2))?; - term_dict_builder.insert(b"abandon", &make_term_info(3))?; - let buffer = term_dict_builder.finish()?; + fn create_test_dictionary() -> io::Result> { + let mut dict_builder = Dictionary::::builder(Vec::new())?; + dict_builder.insert(b"abaisance", &0)?; + dict_builder.insert(b"abalation", &1)?; + dict_builder.insert(b"abalienate", &2)?; + dict_builder.insert(b"abandon", &3)?; + let buffer = dict_builder.finish()?; let owned_bytes = OwnedBytes::new(buffer); - TermDictionary::from_bytes(owned_bytes) + Dictionary::from_bytes(owned_bytes) } #[test] - fn test_sstable_stream() -> crate::Result<()> { - let term_dict = create_test_term_dictionary()?; - let mut term_streamer = term_dict.stream()?; - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abaisance"); - assert_eq!(term_streamer.value().doc_freq, 1000u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abalation"); - assert_eq!(term_streamer.value().doc_freq, 1001u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abalienate"); - assert_eq!(term_streamer.value().doc_freq, 1002u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abandon"); - assert_eq!(term_streamer.value().doc_freq, 1003u32); - assert!(!term_streamer.advance()); + fn test_sstable_stream() -> io::Result<()> { + let dict = create_test_dictionary()?; + let mut streamer = dict.stream()?; + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abaisance"); + assert_eq!(streamer.value(), &0); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abalation"); + assert_eq!(streamer.value(), &1); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abalienate"); + assert_eq!(streamer.value(), &2); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abandon"); + assert_eq!(streamer.value(), &3); + assert!(!streamer.advance()); Ok(()) } #[test] - fn test_sstable_search() -> crate::Result<()> { - let term_dict = create_test_term_dictionary()?; + fn test_sstable_search() -> io::Result<()> { + let term_dict = create_test_dictionary()?; let ptn = tantivy_fst::Regex::new("ab.*t.*").unwrap(); let mut term_streamer = term_dict.search(ptn).into_stream()?; assert!(term_streamer.advance()); assert_eq!(term_streamer.key(), b"abalation"); - assert_eq!(term_streamer.value().doc_freq, 1001u32); + assert_eq!(term_streamer.value(), &1u64); assert!(term_streamer.advance()); assert_eq!(term_streamer.key(), b"abalienate"); - assert_eq!(term_streamer.value().doc_freq, 1002u32); + assert_eq!(term_streamer.value(), &2u64); assert!(!term_streamer.advance()); Ok(()) } diff --git a/sstable/src/value.rs b/sstable/src/value.rs deleted file mode 100644 index 05d0d12dec..0000000000 --- a/sstable/src/value.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::io; - -use super::{vint, BlockReader}; - -pub trait ValueReader: Default { - type Value; - - fn value(&self, idx: usize) -> &Self::Value; - - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>; -} - -pub trait ValueWriter: Default { - type Value; - - fn write(&mut self, val: &Self::Value); - - fn write_block(&mut self, writer: &mut Vec); -} - -#[derive(Default)] -pub struct VoidReader; - -impl ValueReader for VoidReader { - type Value = (); - - fn value(&self, _idx: usize) -> &() { - &() - } - - fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> { - Ok(()) - } -} - -#[derive(Default)] -pub struct VoidWriter; - -impl ValueWriter for VoidWriter { - type Value = (); - - fn write(&mut self, _val: &()) {} - - fn write_block(&mut self, _writer: &mut Vec) {} -} - -#[derive(Default)] -pub struct U64MonotonicWriter { - vals: Vec, -} - -impl ValueWriter for U64MonotonicWriter { - type Value = u64; - - fn write(&mut self, val: &Self::Value) { - self.vals.push(*val); - } - - fn write_block(&mut self, writer: &mut Vec) { - let mut prev_val = 0u64; - vint::serialize_into_vec(self.vals.len() as u64, writer); - for &val in &self.vals { - let delta = val - prev_val; - vint::serialize_into_vec(delta, writer); - prev_val = val; - } - self.vals.clear(); - } -} - -#[derive(Default)] -pub struct U64MonotonicReader { - vals: Vec, -} - -impl ValueReader for U64MonotonicReader { - type Value = u64; - - fn value(&self, idx: usize) -> &Self::Value { - &self.vals[idx] - } - - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> { - let len = reader.deserialize_u64() as usize; - self.vals.clear(); - let mut prev_val = 0u64; - for _ in 0..len { - let delta = reader.deserialize_u64(); - let val = prev_val + delta; - self.vals.push(val); - prev_val = val; - } - Ok(()) - } -} diff --git a/sstable/src/value/mod.rs b/sstable/src/value/mod.rs new file mode 100644 index 0000000000..54106ec92e --- /dev/null +++ b/sstable/src/value/mod.rs @@ -0,0 +1,82 @@ +mod range; +mod u64_monotonic; +mod void; + +use std::io; + +/// `ValueReader` is a trait describing the contract of something +/// reading blocks of value, and offering random access within this values. +pub trait ValueReader: Default { + /// Type of the value being read. + type Value; + + /// Access the value at index `idx`, in the last block that was read + /// via a call to `ValueReader::read`. + fn value(&self, idx: usize) -> &Self::Value; + + /// Loads a block. + /// + /// Returns the number of bytes that were written. + fn load(&mut self, data: &[u8]) -> io::Result; +} + +/// `ValueWriter` is a trait to make it possible to write blocks +/// of value. +pub trait ValueWriter: Default { + /// Type of the value being written. + type Value; + + /// Records a new value. + /// This method usually just accumulates data in a `Vec`, + /// only to be serialized on the call to `ValueWriter::serialize_block`. + fn write(&mut self, val: &Self::Value); + + /// Serializes the accumulated values into the output buffer. + fn serialize_block(&self, output: &mut Vec); + + /// Clears the `ValueWriter`. After a call to clear, the `ValueWriter` + /// should behave like a fresh `ValueWriter::default()`. + fn clear(&mut self); +} + +pub use range::{RangeValueReader, RangeValueWriter}; +pub use u64_monotonic::{U64MonotonicValueReader, U64MonotonicValueWriter}; +pub use void::{VoidValueReader, VoidValueWriter}; + +fn deserialize_vint_u64(data: &mut &[u8]) -> u64 { + let (num_bytes, val) = super::vint::deserialize_read(data); + *data = &data[num_bytes..]; + val +} + +#[cfg(test)] +pub(crate) mod tests { + use std::fmt; + + use super::{ValueReader, ValueWriter}; + + pub(crate) fn test_value_reader_writer< + V: Eq + fmt::Debug, + TReader: ValueReader, + TWriter: ValueWriter, + >( + value_block: &[V], + ) { + let mut buffer = Vec::new(); + { + let mut writer = TWriter::default(); + for value in value_block { + writer.write(value); + } + writer.serialize_block(&mut buffer); + writer.clear(); + } + let data_len = buffer.len(); + buffer.extend_from_slice(&b"extradata"[..]); + let mut reader = TReader::default(); + assert_eq!(reader.load(&buffer[..]).unwrap(), data_len); + for (i, val) in value_block.iter().enumerate() { + assert_eq!(reader.value(i), val); + } + } +} diff --git a/sstable/src/value/range.rs b/sstable/src/value/range.rs new file mode 100644 index 0000000000..f2591c54bb --- /dev/null +++ b/sstable/src/value/range.rs @@ -0,0 +1,113 @@ +use std::io; +use std::ops::Range; + +use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter}; + +/// See module comment. +#[derive(Default)] +pub struct RangeValueReader { + vals: Vec>, +} + +impl ValueReader for RangeValueReader { + type Value = Range; + + #[inline(always)] + fn value(&self, idx: usize) -> &Range { + &self.vals[idx] + } + + fn load(&mut self, mut data: &[u8]) -> io::Result { + self.vals.clear(); + let original_num_bytes = data.len(); + let len = deserialize_vint_u64(&mut data) as usize; + if len != 0 { + let mut prev_val = deserialize_vint_u64(&mut data); + for _ in 1..len { + let next_val = prev_val + deserialize_vint_u64(&mut data); + self.vals.push(prev_val..next_val); + prev_val = next_val; + } + } + Ok(original_num_bytes - data.len()) + } +} + +/// Range writer. The range are required to partition the +/// space. +/// +/// In other words, two consecutive keys `k1` and `k2` +/// are required to observe +/// `range_sstable[k1].end == range_sstable[k2].start`. +/// +/// The writer will panic if the inserted value do not follow +/// this property. +/// +/// The first range is not required to start at `0`. +#[derive(Default)] +pub struct RangeValueWriter { + vals: Vec, +} + +impl ValueWriter for RangeValueWriter { + type Value = Range; + + fn write(&mut self, val: &Range) { + if let Some(previous_offset) = self.vals.last().copied() { + assert_eq!(previous_offset, val.start); + self.vals.push(val.end); + } else { + self.vals.push(val.start); + self.vals.push(val.end) + } + } + + fn serialize_block(&self, writer: &mut Vec) { + let mut prev_val = 0u64; + crate::vint::serialize_into_vec(self.vals.len() as u64, writer); + for &val in &self.vals { + let delta = val - prev_val; + crate::vint::serialize_into_vec(delta, writer); + prev_val = val; + } + } + + fn clear(&mut self) { + self.vals.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_range_reader_writer() { + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[]); + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[ + 0..3, + ]); + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[ + 0..3, + 3..10, + ]); + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[ + 0..0, + 0..10, + ]); + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[ + 100..110, + 110..121, + 121..1250, + ]); + } + + #[test] + #[should_panic] + fn test_range_reader_writer_panics() { + crate::value::tests::test_value_reader_writer::<_, RangeValueReader, RangeValueWriter>(&[ + 1..3, + 4..10, + ]); + } +} diff --git a/sstable/src/value/u64_monotonic.rs b/sstable/src/value/u64_monotonic.rs new file mode 100644 index 0000000000..7c660fb3b4 --- /dev/null +++ b/sstable/src/value/u64_monotonic.rs @@ -0,0 +1,83 @@ +use std::io; + +use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter}; +use crate::vint; + +#[derive(Default)] +pub struct U64MonotonicValueReader { + vals: Vec, +} + +impl ValueReader for U64MonotonicValueReader { + type Value = u64; + + #[inline(always)] + fn value(&self, idx: usize) -> &Self::Value { + &self.vals[idx] + } + + fn load(&mut self, mut data: &[u8]) -> io::Result { + let original_num_bytes = data.len(); + let num_vals = deserialize_vint_u64(&mut data) as usize; + self.vals.clear(); + let mut prev_val = 0u64; + for _ in 0..num_vals { + let delta = deserialize_vint_u64(&mut data); + let val = prev_val + delta; + self.vals.push(val); + prev_val = val; + } + Ok(original_num_bytes - data.len()) + } +} + +#[derive(Default)] +pub struct U64MonotonicValueWriter { + vals: Vec, +} + +impl ValueWriter for U64MonotonicValueWriter { + type Value = u64; + + fn write(&mut self, val: &Self::Value) { + self.vals.push(*val); + } + + fn serialize_block(&self, output: &mut Vec) { + let mut prev_val = 0u64; + vint::serialize_into_vec(self.vals.len() as u64, output); + for &val in &self.vals { + let delta = val - prev_val; + vint::serialize_into_vec(delta, output); + prev_val = val; + } + } + + fn clear(&mut self) { + self.vals.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_u64_monotonic_reader_writer() { + crate::value::tests::test_value_reader_writer::< + _, + U64MonotonicValueReader, + U64MonotonicValueWriter, + >(&[]); + crate::value::tests::test_value_reader_writer::< + _, + U64MonotonicValueReader, + U64MonotonicValueWriter, + >(&[5]); + crate::value::tests::test_value_reader_writer::< + _, + U64MonotonicValueReader, + U64MonotonicValueWriter, + >(&[1u64, 30u64]); + } +} diff --git a/sstable/src/value/void.rs b/sstable/src/value/void.rs new file mode 100644 index 0000000000..15597a224f --- /dev/null +++ b/sstable/src/value/void.rs @@ -0,0 +1,48 @@ +use std::io; + +use crate::value::{ValueReader, ValueWriter}; + +#[derive(Default)] +pub struct VoidValueReader; + +impl ValueReader for VoidValueReader { + type Value = (); + + #[inline(always)] + fn value(&self, _idx: usize) -> &() { + &() + } + + fn load(&mut self, _data: &[u8]) -> io::Result { + Ok(0) + } +} + +#[derive(Default)] +pub struct VoidValueWriter; + +impl ValueWriter for VoidValueWriter { + type Value = (); + + fn write(&mut self, _val: &()) {} + + fn serialize_block(&self, _output: &mut Vec) {} + + fn clear(&mut self) {} +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_range_reader_writer() { + crate::value::tests::test_value_reader_writer::<_, VoidValueReader, VoidValueWriter>(&[]); + crate::value::tests::test_value_reader_writer::<_, VoidValueReader, VoidValueWriter>(&[()]); + crate::value::tests::test_value_reader_writer::<_, VoidValueReader, VoidValueWriter>(&[ + (), + (), + (), + ]); + } +}