From 534774fa41f4f1f7eeb486f1f21c2db7f8394ff5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 19 Dec 2023 09:22:17 +0000 Subject: [PATCH] feat: add push_with_name_n Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 24 +++++++- .../src/inverted_index/create/sort_create.rs | 60 +++++++++++++++---- src/index/src/inverted_index/format/writer.rs | 4 +- .../src/inverted_index/format/writer/blob.rs | 10 ++-- 4 files changed, 76 insertions(+), 22 deletions(-) diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index 98996765da16..db6bf1ad2595 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -29,9 +29,27 @@ pub trait InvertedIndexCreator { /// - `index_name`: Identifier for the index being built /// - `value`: The data to be indexed, or `None` for a null entry /// - /// Note: Caller should call this method for each row in the dataset - async fn push_with_name(&mut self, index_name: &str, value: Option>) - -> Result<()>; + /// It should be equivalent to calling `push_with_name_n` with `n = 1` + async fn push_with_name( + &mut self, + index_name: &str, + value: Option>, + ) -> Result<()> { + self.push_with_name_n(index_name, value, 1).await + } + + /// Adds `n` identical values to the named index. `None` values represent absence of data (null) + /// + /// - `index_name`: Identifier for the index being built + /// - `value`: The data to be indexed, or `None` for a null entry + /// + /// It should be equivalent to calling `push_with_name` `n` times + async fn push_with_name_n( + &mut self, + index_name: &str, + value: Option>, + n: usize, + ) -> Result<()>; /// Finalizes the index creation process, ensuring all data is properly indexed and stored /// in the provided writer diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index 54274f4a7ac3..b491c0a8b444 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -45,19 +45,23 @@ pub struct SortIndexCreator { #[async_trait] impl InvertedIndexCreator for SortIndexCreator { - /// Inserts a value or null into the sorter for the specified index - async fn push_with_name( + /// Inserts `n` values or nulls into the sorter for the specified index. + /// + /// If the index does not exist, a new index is created even if `n` is 0. + /// Caller may leverage this behavior to create indexes with no data. + async fn push_with_name_n( &mut self, index_name: &str, value: Option>, + n: usize, ) -> Result<()> { match self.sorters.get_mut(index_name) { - Some(sorter) => sorter.push(value).await, + Some(sorter) => sorter.push_n(value, n).await, None => { - let mut sorter = - (self.sorter_factory)(index_name.to_owned(), self.segment_row_count); - sorter.push(value).await?; - self.sorters.insert(index_name.to_owned(), sorter); + let index_name = index_name.to_string(); + let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count); + sorter.push_n(value, n).await?; + self.sorters.insert(index_name, sorter); Ok(()) } } @@ -118,12 +122,6 @@ mod tests { use crate::inverted_index::format::writer::MockInvertedIndexWriter; use crate::inverted_index::Bytes; - fn stream_to_values(stream: SortedStream) -> Vec { - futures::executor::block_on(async { - stream.map(|r| r.unwrap().0).collect::>().await - }) - } - #[tokio::test] async fn test_sort_index_creator_basic() { let mut creator = @@ -209,6 +207,36 @@ mod tests { assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); } + #[tokio::test] + async fn test_sort_index_creator_create_indexes_without_data() { + let mut creator = + SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap()); + + creator.push_with_name_n("a", None, 0).await.unwrap(); + creator.push_with_name_n("b", None, 0).await.unwrap(); + creator.push_with_name_n("c", None, 0).await.unwrap(); + + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + assert!(matches!(name.as_str(), "a" | "b" | "c")); + assert!(stream_to_values(stream).is_empty()); + Ok(()) + }); + mock_writer + .expect_finish() + .times(1) + .returning(|total_row_count, segment_row_count| { + assert_eq!(total_row_count, 0); + assert_eq!(segment_row_count.get(), 1); + Ok(()) + }); + + creator.finish(&mut mock_writer).await.unwrap(); + } + fn set_bit(bit_vec: &mut BitVec, index: usize) { if index >= bit_vec.len() { bit_vec.resize(index + 1, false); @@ -267,4 +295,10 @@ mod tests { }) } } + + fn stream_to_values(stream: SortedStream) -> Vec { + futures::executor::block_on(async { + stream.map(|r| r.unwrap().0).collect::>().await + }) + } } diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 32115ec1816a..176b1f1561f1 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -25,6 +25,8 @@ use crate::inverted_index::error::Result; pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; use crate::inverted_index::Bytes; +pub type ValueStream = Box> + Send + Unpin>; + /// Trait for writing inverted index data to underlying storage. #[mockall::automock] #[async_trait] @@ -39,7 +41,7 @@ pub trait InvertedIndexWriter: Send { &mut self, name: String, null_bitmap: BitVec, - values: Box> + Send + Unpin>, + values: ValueStream, ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 21f45b687827..07f39af46ecc 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -16,15 +16,14 @@ use std::num::NonZeroUsize; use async_trait::async_trait; use common_base::BitVec; -use futures::{AsyncWrite, AsyncWriteExt, Stream}; +use futures::{AsyncWrite, AsyncWriteExt}; use greptime_proto::v1::index::InvertedIndexMetas; use prost::Message; use snafu::ResultExt; -use super::single::SingleIndexWriter; use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; -use crate::inverted_index::format::writer::InvertedIndexWriter; -use crate::inverted_index::Bytes; +use crate::inverted_index::format::writer::single::SingleIndexWriter; +use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream}; /// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages /// writing of an inverted index to a blob storage. @@ -45,7 +44,7 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit &mut self, name: String, null_bitmap: BitVec, - values: Box> + Send + Unpin>, + values: ValueStream, ) -> Result<()> { let single_writer = SingleIndexWriter::new( name.clone(), @@ -105,6 +104,7 @@ mod tests { use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + use crate::inverted_index::Bytes; fn unpack(fst_value: u64) -> [u32; 2] { bytemuck::cast::(fst_value)