Skip to content

Commit

Permalink
Refactoring to prepare for the addition of dynamic fast field (quickw…
Browse files Browse the repository at this point in the history
…it-oss#1730)

* Refactoring to prepare for the addition of dynamic fast field

- Exposing insert_key / insert_value
- Renamed SSTable::{Reader/Writer}-> SSTable::{ValueReader/ValueWriter}
- Added a generic Dictionary object in the sstable crate
- Removing the TermDictionary wrapper from tantivy, relying directly on
  an alias of the generic Dictionary object.
- dropped the use of byteorder in sstable.
- Stopped scanning / reading the entire dictionary when streaming a range.

* Added a benchmark for streaming sstable ranges.

* CR comments.

Rename deserialize_u64 -> deserialize_vint_u64

* Removed needless allocation, split serialize into serialize and clear.
  • Loading branch information
fulmicoton authored Dec 22, 2022
1 parent 3339a3e commit bb48c3e
Show file tree
Hide file tree
Showing 22 changed files with 1,122 additions and 561 deletions.
2 changes: 1 addition & 1 deletion src/core/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl InvertedIndexReader {
///
/// Most users should prefer using [`Self::read_postings()`] instead.
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> {
let term_info_opt = self.get_term_info_async(term).await?;
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
self.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/termdict/fst_termdict/term_info_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
}

impl TermInfoStore {
pub fn open(term_info_store_file: FileSlice) -> crate::Result<TermInfoStore> {
pub fn open(term_info_store_file: FileSlice) -> io::Result<TermInfoStore> {
let (len_slice, main_slice) = term_info_store_file.split(16);
let mut bytes = len_slice.read_bytes()?;
let len = u64::deserialize(&mut bytes)? as usize;
Expand Down
17 changes: 10 additions & 7 deletions src/termdict/fst_termdict/termdict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tantivy_fst::Automaton;
use super::term_info_store::{TermInfoStore, TermInfoStoreWriter};
use super::{TermStreamer, TermStreamerBuilder};
use crate::directory::{FileSlice, OwnedBytes};
use crate::error::DataCorruption;
use crate::postings::TermInfo;
use crate::termdict::TermOrdinal;

Expand Down Expand Up @@ -55,7 +54,7 @@ where W: Write
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
self.fst_builder
.insert(key, self.term_ord)
.map_err(convert_fst_error)?;
Expand All @@ -66,7 +65,7 @@ where W: Write
/// # Warning
///
/// Horribly dangerous internal API. See `.insert_key(...)`.
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
self.term_info_store_writer.write_term_info(term_info)?;
Ok(())
}
Expand All @@ -86,10 +85,14 @@ where W: Write
}
}

fn open_fst_index(fst_file: FileSlice) -> crate::Result<tantivy_fst::Map<OwnedBytes>> {
fn open_fst_index(fst_file: FileSlice) -> io::Result<tantivy_fst::Map<OwnedBytes>> {
let bytes = fst_file.read_bytes()?;
let fst = Fst::new(bytes)
.map_err(|err| DataCorruption::comment_only(format!("Fst data is corrupted: {:?}", err)))?;
let fst = Fst::new(bytes).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Fst data is corrupted: {:?}", err),
)
})?;
Ok(tantivy_fst::Map::from(fst))
}

Expand All @@ -114,7 +117,7 @@ pub struct TermDictionary {

impl TermDictionary {
/// Opens a `TermDictionary`.
pub fn open(file: FileSlice) -> crate::Result<Self> {
pub fn open(file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = file.split_from_end(8);
let mut footer_len_bytes = footer_len_slice.read_bytes()?;
let footer_size = u64::deserialize(&mut footer_len_bytes)?;
Expand Down
88 changes: 46 additions & 42 deletions src/termdict/sstable_termdict/mod.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,64 @@
use std::io;

mod merger;
mod streamer;
mod termdict;

use std::iter::ExactSizeIterator;

use common::VInt;
use sstable::value::{ValueReader, ValueWriter};
use sstable::{BlockReader, SSTable};
use sstable::SSTable;
use tantivy_fst::automaton::AlwaysMatch;

pub use self::merger::TermMerger;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
use crate::postings::TermInfo;

/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
///
/// The `Fst` crate is used to associate terms to their
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub type TermDictionary = sstable::Dictionary<TermSSTable>;

/// Builder for the new term dictionary.
pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;

/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;

/// SSTable used to store TermInfo objects.
pub struct TermSSTable;

impl SSTable for TermSSTable {
type Value = TermInfo;
type Reader = TermInfoReader;
type Writer = TermInfoWriter;
type ValueReader = TermInfoValueReader;
type ValueWriter = TermInfoValueWriter;
}

#[derive(Default)]
pub struct TermInfoReader {
pub struct TermInfoValueReader {
term_infos: Vec<TermInfo>,
}

impl ValueReader for TermInfoReader {
impl ValueReader for TermInfoValueReader {
type Value = TermInfo;

#[inline(always)]
fn value(&self, idx: usize) -> &TermInfo {
&self.term_infos[idx]
}

fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
fn load(&mut self, mut data: &[u8]) -> io::Result<usize> {
let len_before = data.len();
self.term_infos.clear();
let num_els = VInt::deserialize_u64(reader)?;
let mut postings_start = VInt::deserialize_u64(reader)? as usize;
let mut positions_start = VInt::deserialize_u64(reader)? as usize;
let num_els = VInt::deserialize_u64(&mut data)?;
let mut postings_start = VInt::deserialize_u64(&mut data)? as usize;
let mut positions_start = VInt::deserialize_u64(&mut data)? as usize;
for _ in 0..num_els {
let doc_freq = VInt::deserialize_u64(reader)? as u32;
let postings_num_bytes = VInt::deserialize_u64(reader)?;
let positions_num_bytes = VInt::deserialize_u64(reader)?;
let doc_freq = VInt::deserialize_u64(&mut data)? as u32;
let postings_num_bytes = VInt::deserialize_u64(&mut data)?;
let positions_num_bytes = VInt::deserialize_u64(&mut data)?;
let postings_end = postings_start + postings_num_bytes as usize;
let positions_end = positions_start + positions_num_bytes as usize;
let term_info = TermInfo {
Expand All @@ -55,23 +70,24 @@ impl ValueReader for TermInfoReader {
postings_start = postings_end;
positions_start = positions_end;
}
Ok(())
let consumed_len = len_before - data.len();
Ok(consumed_len)
}
}

#[derive(Default)]
pub struct TermInfoWriter {
pub struct TermInfoValueWriter {
term_infos: Vec<TermInfo>,
}

impl ValueWriter for TermInfoWriter {
impl ValueWriter for TermInfoValueWriter {
type Value = TermInfo;

fn write(&mut self, term_info: &TermInfo) {
self.term_infos.push(term_info.clone());
}

fn write_block(&mut self, buffer: &mut Vec<u8>) {
fn serialize_block(&self, buffer: &mut Vec<u8>) {
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
if self.term_infos.is_empty() {
return;
Expand All @@ -83,24 +99,23 @@ impl ValueWriter for TermInfoWriter {
VInt(term_info.postings_range.len() as u64).serialize_into_vec(buffer);
VInt(term_info.positions_range.len() as u64).serialize_into_vec(buffer);
}
}

fn clear(&mut self) {
self.term_infos.clear();
}
}

#[cfg(test)]
mod tests {
use std::io;

use sstable::value::{ValueReader, ValueWriter};

use super::BlockReader;
use crate::directory::OwnedBytes;
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::TermInfoReader;
use crate::termdict::sstable_termdict::TermInfoValueReader;

#[test]
fn test_block_terminfos() -> io::Result<()> {
let mut term_info_writer = super::TermInfoWriter::default();
fn test_block_terminfos() {
let mut term_info_writer = super::TermInfoValueWriter::default();
term_info_writer.write(&TermInfo {
doc_freq: 120u32,
postings_range: 17..45,
Expand All @@ -117,10 +132,9 @@ mod tests {
positions_range: 1100..1302,
});
let mut buffer = Vec::new();
term_info_writer.write_block(&mut buffer);
let mut block_reader = make_block_reader(&buffer[..]);
let mut term_info_reader = TermInfoReader::default();
term_info_reader.read(&mut block_reader)?;
term_info_writer.serialize_block(&mut buffer);
let mut term_info_reader = TermInfoValueReader::default();
let num_bytes: usize = term_info_reader.load(&buffer[..]).unwrap();
assert_eq!(
term_info_reader.value(0),
&TermInfo {
Expand All @@ -129,16 +143,6 @@ mod tests {
positions_range: 10..122
}
);
assert!(block_reader.buffer().is_empty());
Ok(())
}

fn make_block_reader(data: &[u8]) -> BlockReader {
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
buffer.extend_from_slice(data);
let owned_bytes = OwnedBytes::new(buffer);
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
block_reader.read_block().unwrap();
block_reader
assert_eq!(buffer.len(), num_bytes);
}
}
Loading

0 comments on commit bb48c3e

Please sign in to comment.