diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index a57c2c36b7e6..db6bf1ad2595 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -13,3 +13,45 @@ // limitations under the License. mod sort; +mod sort_create; + +use async_trait::async_trait; + +use crate::inverted_index::error::Result; +use crate::inverted_index::format::writer::InvertedIndexWriter; +use crate::inverted_index::BytesRef; + +/// `InvertedIndexCreator` provides functionality to construct an inverted index +#[async_trait] +pub trait InvertedIndexCreator { + /// Adds a value to the named index. A `None` value represents an absence of data (null) + /// + /// - `index_name`: Identifier for the index being built + /// - `value`: The data to be indexed, or `None` for a null entry + /// + /// It should be equivalent to calling `push_with_name_n` with `n = 1` + async fn push_with_name( + &mut self, + index_name: &str, + value: Option>, + ) -> Result<()> { + self.push_with_name_n(index_name, value, 1).await + } + + /// Adds `n` identical values to the named index. `None` values represent absence of data (null) + /// + /// - `index_name`: Identifier for the index being built + /// - `value`: The data to be indexed, or `None` for a null entry + /// + /// It should be equivalent to calling `push_with_name` `n` times + async fn push_with_name_n( + &mut self, + index_name: &str, + value: Option>, + n: usize, + ) -> Result<()>; + + /// Finalizes the index creation process, ensuring all data is properly indexed and stored + /// in the provided writer + async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()>; +} diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 2e530f3e45e4..e43b65bfcea5 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -29,6 +29,7 @@ use crate::inverted_index::create::sort::intermediate_rw::{ }; use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; +use crate::inverted_index::create::sort_create::SorterFactory; use crate::inverted_index::error::Result; use crate::inverted_index::{Bytes, BytesRef}; @@ -137,6 +138,21 @@ impl ExternalSorter { } } + /// Generates a factory function that creates new `ExternalSorter` instances + pub fn factory( + temp_file_provider: Arc, + memory_usage_threshold: Option, + ) -> SorterFactory { + Box::new(move |index_name, segment_row_count| { + Box::new(Self::new( + index_name, + temp_file_provider.clone(), + segment_row_count, + memory_usage_threshold, + )) + }) + } + /// Pushes the non-null values to the values buffer and sets the bits within /// the specified range in the given BitVec to true. /// Returns the memory usage difference of the buffer after the operation. diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs new file mode 100644 index 000000000000..b491c0a8b444 --- /dev/null +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -0,0 +1,304 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::num::NonZeroUsize; + +use async_trait::async_trait; +use snafu::ensure; + +use crate::inverted_index::create::sort::{SortOutput, Sorter}; +use crate::inverted_index::create::InvertedIndexCreator; +use crate::inverted_index::error::{InconsistentRowCountSnafu, Result}; +use crate::inverted_index::format::writer::InvertedIndexWriter; +use crate::inverted_index::BytesRef; + +type IndexName = String; +type SegmentRowCount = NonZeroUsize; + +/// Factory type to produce `Sorter` instances associated with an index name and segment row count +pub type SorterFactory = Box Box + Send>; + +/// `SortIndexCreator` orchestrates indexing by sorting input data for each named index +/// and writing to an inverted index writer +pub struct SortIndexCreator { + /// Factory for producing `Sorter` instances + sorter_factory: SorterFactory, + + /// Map of index names to sorters + sorters: HashMap>, + + /// Number of rows in each segment, used to produce sorters + segment_row_count: NonZeroUsize, +} + +#[async_trait] +impl InvertedIndexCreator for SortIndexCreator { + /// Inserts `n` values or nulls into the sorter for the specified index. + /// + /// If the index does not exist, a new index is created even if `n` is 0. + /// Caller may leverage this behavior to create indexes with no data. + async fn push_with_name_n( + &mut self, + index_name: &str, + value: Option>, + n: usize, + ) -> Result<()> { + match self.sorters.get_mut(index_name) { + Some(sorter) => sorter.push_n(value, n).await, + None => { + let index_name = index_name.to_string(); + let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count); + sorter.push_n(value, n).await?; + self.sorters.insert(index_name, sorter); + Ok(()) + } + } + } + + /// Finalizes the sorting for all indexes and writes them using the inverted index writer + async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()> { + let mut output_row_count = None; + for (index_name, mut sorter) in self.sorters.drain() { + let SortOutput { + segment_null_bitmap, + sorted_stream, + total_row_count, + } = sorter.output().await?; + + let expected_row_count = *output_row_count.get_or_insert(total_row_count); + ensure!( + expected_row_count == total_row_count, + InconsistentRowCountSnafu { + index_name, + total_row_count, + expected_row_count, + } + ); + + writer + .add_index(index_name, segment_null_bitmap, sorted_stream) + .await?; + } + + let total_row_count = output_row_count.unwrap_or_default() as _; + let segment_row_count = self.segment_row_count as _; + writer.finish(total_row_count, segment_row_count).await + } +} + +impl SortIndexCreator { + /// Creates a new `SortIndexCreator` with the given sorter factory and index writer + pub fn new(sorter_factory: SorterFactory, segment_row_count: NonZeroUsize) -> Self { + Self { + sorter_factory, + sorters: HashMap::new(), + segment_row_count, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use common_base::BitVec; + use futures::{stream, StreamExt}; + + use super::*; + use crate::inverted_index::create::sort::SortedStream; + use crate::inverted_index::error::Error; + use crate::inverted_index::format::writer::MockInvertedIndexWriter; + use crate::inverted_index::Bytes; + + #[tokio::test] + async fn test_sort_index_creator_basic() { + let mut creator = + SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap()); + + let index_values = vec![ + ("a", vec![b"3", b"2", b"1"]), + ("b", vec![b"6", b"5", b"4"]), + ("c", vec![b"1", b"2", b"3"]), + ]; + + for (index_name, values) in index_values { + for value in values { + creator + .push_with_name(index_name, Some(value)) + .await + .unwrap(); + } + } + + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .times(3) + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + match name.as_str() { + "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), + "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + _ => panic!("unexpected index name: {}", name), + } + Ok(()) + }); + mock_writer + .expect_finish() + .times(1) + .returning(|total_row_count, segment_row_count| { + assert_eq!(total_row_count, 3); + assert_eq!(segment_row_count.get(), 1); + Ok(()) + }); + + creator.finish(&mut mock_writer).await.unwrap(); + } + + #[tokio::test] + async fn test_sort_index_creator_inconsistent_row_count() { + let mut creator = + SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap()); + + let index_values = vec![ + ("a", vec![b"3", b"2", b"1"]), + ("b", vec![b"6", b"5", b"4"]), + ("c", vec![b"1", b"2"]), + ]; + + for (index_name, values) in index_values { + for value in values { + creator + .push_with_name(index_name, Some(value)) + .await + .unwrap(); + } + } + + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + match name.as_str() { + "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), + "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]), + _ => panic!("unexpected index name: {}", name), + } + Ok(()) + }); + mock_writer.expect_finish().never(); + + let res = creator.finish(&mut mock_writer).await; + assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); + } + + #[tokio::test] + async fn test_sort_index_creator_create_indexes_without_data() { + let mut creator = + SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap()); + + creator.push_with_name_n("a", None, 0).await.unwrap(); + creator.push_with_name_n("b", None, 0).await.unwrap(); + creator.push_with_name_n("c", None, 0).await.unwrap(); + + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + assert!(matches!(name.as_str(), "a" | "b" | "c")); + assert!(stream_to_values(stream).is_empty()); + Ok(()) + }); + mock_writer + .expect_finish() + .times(1) + .returning(|total_row_count, segment_row_count| { + assert_eq!(total_row_count, 0); + assert_eq!(segment_row_count.get(), 1); + Ok(()) + }); + + creator.finish(&mut mock_writer).await.unwrap(); + } + + fn set_bit(bit_vec: &mut BitVec, index: usize) { + if index >= bit_vec.len() { + bit_vec.resize(index + 1, false); + } + bit_vec.set(index, true); + } + + struct NaiveSorter { + total_row_count: usize, + segment_row_count: NonZeroUsize, + values: BTreeMap, BitVec>, + } + + impl NaiveSorter { + fn factory() -> SorterFactory { + Box::new(|_index_name, segment_row_count| { + Box::new(NaiveSorter { + total_row_count: 0, + segment_row_count, + values: BTreeMap::new(), + }) + }) + } + } + + #[async_trait] + impl Sorter for NaiveSorter { + async fn push(&mut self, value: Option>) -> Result<()> { + let segment_index = self.total_row_count / self.segment_row_count; + self.total_row_count += 1; + + let bitmap = self.values.entry(value.map(Into::into)).or_default(); + set_bit(bitmap, segment_index); + + Ok(()) + } + + async fn push_n(&mut self, value: Option>, n: usize) -> Result<()> { + for _ in 0..n { + self.push(value).await?; + } + Ok(()) + } + + async fn output(&mut self) -> Result { + let segment_null_bitmap = self.values.remove(&None).unwrap_or_default(); + + Ok(SortOutput { + segment_null_bitmap, + sorted_stream: Box::new(stream::iter( + std::mem::take(&mut self.values) + .into_iter() + .map(|(v, b)| Ok((v.unwrap(), b))), + )), + total_row_count: self.total_row_count, + }) + } + } + + fn stream_to_values(stream: SortedStream) -> Vec { + futures::executor::block_on(async { + stream.map(|r| r.unwrap().0).collect::>().await + }) + } +} diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index afb8ae12838b..b795e33003b7 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -160,6 +160,13 @@ pub enum Error { #[snafu(display("Unknown intermediate codec magic: {magic:?}"))] UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location }, + + #[snafu(display("Inconsistent row count, index_name: {index_name}, total_row_count: {total_row_count}, expected: {expected_row_count}"))] + InconsistentRowCount { + index_name: String, + total_row_count: usize, + expected_row_count: usize, + }, } impl ErrorExt for Error { @@ -188,6 +195,7 @@ impl ErrorExt for Error { | IntersectionApplierWithInList { .. } | EmptyPredicates { .. } | FstInsert { .. } + | InconsistentRowCount { .. } | IndexNotFound { .. } => StatusCode::InvalidArguments, } } diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 8cb9e408df38..176b1f1561f1 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -15,6 +15,8 @@ mod blob; mod single; +use std::num::NonZeroUsize; + use async_trait::async_trait; use common_base::BitVec; use futures::Stream; @@ -23,19 +25,27 @@ use crate::inverted_index::error::Result; pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; use crate::inverted_index::Bytes; +pub type ValueStream = Box> + Send + Unpin>; + /// Trait for writing inverted index data to underlying storage. +#[mockall::automock] #[async_trait] -pub trait InvertedIndexWriter { +pub trait InvertedIndexWriter: Send { /// Adds entries to an index. /// /// * `name` is the index identifier. /// * `null_bitmap` marks positions of null entries. /// * `values` is a stream of values and their locations, yielded lexicographically. /// Errors occur if the values are out of order. - async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> - where - S: Stream> + Send + Unpin; + async fn add_index( + &mut self, + name: String, + null_bitmap: BitVec, + values: ValueStream, + ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. - async fn finish(&mut self) -> Result<()>; + /// `total_row_count` and `segment_row_count` is used to fill in the metadata. + async fn finish(&mut self, total_row_count: u64, segment_row_count: NonZeroUsize) + -> Result<()>; } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index e38319f48a27..07f39af46ecc 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroUsize; + use async_trait::async_trait; use common_base::BitVec; -use futures::{AsyncWrite, AsyncWriteExt, Stream}; +use futures::{AsyncWrite, AsyncWriteExt}; use greptime_proto::v1::index::InvertedIndexMetas; use prost::Message; use snafu::ResultExt; -use super::single::SingleIndexWriter; use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; -use crate::inverted_index::format::writer::InvertedIndexWriter; -use crate::inverted_index::Bytes; +use crate::inverted_index::format::writer::single::SingleIndexWriter; +use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream}; /// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages /// writing of an inverted index to a blob storage. @@ -39,10 +40,12 @@ pub struct InvertedIndexBlobWriter { #[async_trait] impl InvertedIndexWriter for InvertedIndexBlobWriter { - async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> - where - S: Stream> + Send + Unpin, - { + async fn add_index( + &mut self, + name: String, + null_bitmap: BitVec, + values: ValueStream, + ) -> Result<()> { let single_writer = SingleIndexWriter::new( name.clone(), self.written_size, @@ -58,7 +61,14 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit Ok(()) } - async fn finish(&mut self) -> Result<()> { + async fn finish( + &mut self, + total_row_count: u64, + segment_row_count: NonZeroUsize, + ) -> Result<()> { + self.metas.segment_row_count = segment_row_count.get() as _; + self.metas.total_row_count = total_row_count; + let metas_bytes = self.metas.encode_to_vec(); self.blob_writer .write_all(&metas_bytes) @@ -78,19 +88,11 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit } impl InvertedIndexBlobWriter { - pub fn new( - blob_writer: W, - total_row_count: u64, - segment_row_count: u64, - ) -> InvertedIndexBlobWriter { + pub fn new(blob_writer: W) -> InvertedIndexBlobWriter { InvertedIndexBlobWriter { blob_writer, written_size: 0, - metas: InvertedIndexMetas { - total_row_count, - segment_row_count, - ..Default::default() - }, + metas: InvertedIndexMetas::default(), } } } @@ -98,9 +100,11 @@ impl InvertedIndexBlobWriter { #[cfg(test)] mod tests { use futures::io::Cursor; + use futures::stream; use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + use crate::inverted_index::Bytes; fn unpack(fst_value: u64) -> [u32; 2] { bytemuck::cast::(fst_value) @@ -109,8 +113,11 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_empty() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); - writer.finish().await.unwrap(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); + writer + .finish(8, NonZeroUsize::new(1).unwrap()) + .await + .unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); @@ -123,16 +130,16 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_basic() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); writer .add_index( "tag0".to_string(), BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - futures::stream::iter(vec![ + Box::new(stream::iter(vec![ Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), - ]), + ])), ) .await .unwrap(); @@ -140,15 +147,18 @@ mod tests { .add_index( "tag1".to_string(), BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - futures::stream::iter(vec![ + Box::new(stream::iter(vec![ Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), - ]), + ])), ) .await .unwrap(); - writer.finish().await.unwrap(); + writer + .finish(8, NonZeroUsize::new(1).unwrap()) + .await + .unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor);