diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 065e2d29991e..44fd77c413de 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -40,6 +40,27 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to write"))] + Write { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to flush"))] + Flush { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to close"))] + Close { + #[snafu(source)] + error: IoError, + location: Location, + }, + #[snafu(display( "Unexpected inverted index blob size, min: {min_blob_size}, actual: {actual_blob_size}" ))] @@ -115,6 +136,20 @@ pub enum Error { #[snafu(display("index not found, name: {name}"))] IndexNotFound { name: String, location: Location }, + + #[snafu(display("Failed to insert value to FST"))] + FstInsert { + #[snafu(source)] + error: fst::Error, + location: Location, + }, + + #[snafu(display("Failed to compile FST"))] + FstCompile { + #[snafu(source)] + error: fst::Error, + location: Location, + }, } impl ErrorExt for Error { @@ -123,19 +158,24 @@ impl ErrorExt for Error { match self { Seek { .. } | Read { .. } + | Write { .. } + | Flush { .. } + | Close { .. } | UnexpectedFooterPayloadSize { .. } | UnexpectedZeroSegmentRowCount { .. } | UnexpectedOffsetSize { .. } | UnexpectedBlobSize { .. } | DecodeProto { .. } | DecodeFst { .. } - | KeysApplierUnexpectedPredicates { .. } => StatusCode::Unexpected, + | KeysApplierUnexpectedPredicates { .. } + | FstCompile { .. } => StatusCode::Unexpected, ParseRegex { .. } | ParseDFA { .. } | KeysApplierWithoutInList { .. } | IntersectionApplierWithInList { .. } | EmptyPredicates { .. } + | FstInsert { .. } | IndexNotFound { .. } => StatusCode::InvalidArguments, } } diff --git a/src/index/src/inverted_index/format.rs b/src/index/src/inverted_index/format.rs index fe26d9eb0330..104092c729b9 100644 --- a/src/index/src/inverted_index/format.rs +++ b/src/index/src/inverted_index/format.rs @@ -27,10 +27,10 @@ //! //! An inverted index comprises a collection of bitmaps, a null bitmap, and a finite state transducer (FST) indicating tag values' positions: //! -//! `bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ null_bitmap fst` +//! `null_bitmap bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ fst` //! -//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group. //! - `null_bitmap`: Bitset tracking the presence of null values within the tag column. +//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group. //! - `fst`: Finite State Transducer providing an ordered map of bytes, representing the tag values. //! //! ## Footer Details @@ -51,6 +51,7 @@ //! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md pub mod reader; +pub mod writer; const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4; const MIN_BLOB_SIZE: u64 = FOOTER_PAYLOAD_SIZE_SIZE; diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 705f4b409844..de78800ca623 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -20,6 +20,7 @@ use common_base::BitVec; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use crate::inverted_index::error::Result; +pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader; use crate::inverted_index::FstMap; /// InvertedIndexReader defines an asynchronous reader of inverted index data diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index ba8231194055..b0a5e77db5fb 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -34,7 +34,6 @@ pub struct InvertedIndexBlobReader { } impl InvertedIndexBlobReader { - #[allow(dead_code)] pub fn new(source: R) -> Self { Self { source } } diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs new file mode 100644 index 000000000000..8cb9e408df38 --- /dev/null +++ b/src/index/src/inverted_index/format/writer.rs @@ -0,0 +1,41 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod blob; +mod single; + +use async_trait::async_trait; +use common_base::BitVec; +use futures::Stream; + +use crate::inverted_index::error::Result; +pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; +use crate::inverted_index::Bytes; + +/// Trait for writing inverted index data to underlying storage. +#[async_trait] +pub trait InvertedIndexWriter { + /// Adds entries to an index. + /// + /// * `name` is the index identifier. + /// * `null_bitmap` marks positions of null entries. + /// * `values` is a stream of values and their locations, yielded lexicographically. + /// Errors occur if the values are out of order. + async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> + where + S: Stream> + Send + Unpin; + + /// Finalizes the index writing process, ensuring all data is written. + async fn finish(&mut self) -> Result<()>; +} diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs new file mode 100644 index 000000000000..e38319f48a27 --- /dev/null +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_base::BitVec; +use futures::{AsyncWrite, AsyncWriteExt, Stream}; +use greptime_proto::v1::index::InvertedIndexMetas; +use prost::Message; +use snafu::ResultExt; + +use super::single::SingleIndexWriter; +use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; +use crate::inverted_index::format::writer::InvertedIndexWriter; +use crate::inverted_index::Bytes; + +/// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages +/// writing of an inverted index to a blob storage. +pub struct InvertedIndexBlobWriter { + /// The underlying blob storage + blob_writer: W, + + /// Tracks the total number of bytes written to the storage so far + written_size: u64, + + /// Metadata about each index that has been written + metas: InvertedIndexMetas, +} + +#[async_trait] +impl InvertedIndexWriter for InvertedIndexBlobWriter { + async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> + where + S: Stream> + Send + Unpin, + { + let single_writer = SingleIndexWriter::new( + name.clone(), + self.written_size, + null_bitmap, + values, + &mut self.blob_writer, + ); + let metadata = single_writer.write().await?; + + self.written_size += metadata.inverted_index_size; + self.metas.metas.insert(name, metadata); + + Ok(()) + } + + async fn finish(&mut self) -> Result<()> { + let metas_bytes = self.metas.encode_to_vec(); + self.blob_writer + .write_all(&metas_bytes) + .await + .context(WriteSnafu)?; + + let footer_size = metas_bytes.len() as u32; + self.blob_writer + .write_all(&footer_size.to_le_bytes()) + .await + .context(WriteSnafu)?; + + self.blob_writer.flush().await.context(FlushSnafu)?; + self.blob_writer.close().await.context(CloseSnafu)?; + Ok(()) + } +} + +impl InvertedIndexBlobWriter { + pub fn new( + blob_writer: W, + total_row_count: u64, + segment_row_count: u64, + ) -> InvertedIndexBlobWriter { + InvertedIndexBlobWriter { + blob_writer, + written_size: 0, + metas: InvertedIndexMetas { + total_row_count, + segment_row_count, + ..Default::default() + }, + } + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + + use super::*; + use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + + fn unpack(fst_value: u64) -> [u32; 2] { + bytemuck::cast::(fst_value) + } + + #[tokio::test] + async fn test_inverted_index_blob_writer_write_empty() { + let mut blob = Vec::new(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + writer.finish().await.unwrap(); + + let cursor = Cursor::new(blob); + let mut reader = InvertedIndexBlobReader::new(cursor); + let metadata = reader.metadata().await.unwrap(); + assert_eq!(metadata.total_row_count, 8); + assert_eq!(metadata.segment_row_count, 1); + assert_eq!(metadata.metas.len(), 0); + } + + #[tokio::test] + async fn test_inverted_index_blob_writer_write_basic() { + let mut blob = Vec::new(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + writer + .add_index( + "tag0".to_string(), + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + futures::stream::iter(vec![ + Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), + Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + ]), + ) + .await + .unwrap(); + writer + .add_index( + "tag1".to_string(), + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + futures::stream::iter(vec![ + Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), + Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), + ]), + ) + .await + .unwrap(); + writer.finish().await.unwrap(); + + let cursor = Cursor::new(blob); + let mut reader = InvertedIndexBlobReader::new(cursor); + let metadata = reader.metadata().await.unwrap(); + assert_eq!(metadata.total_row_count, 8); + assert_eq!(metadata.segment_row_count, 1); + assert_eq!(metadata.metas.len(), 2); + + // tag0 + let tag0 = metadata.metas.get("tag0").unwrap(); + let stats0 = tag0.stats.as_ref().unwrap(); + assert_eq!(stats0.distinct_count, 3); + assert_eq!(stats0.null_count, 1); + assert_eq!(stats0.min_value, Bytes::from("a")); + assert_eq!(stats0.max_value, Bytes::from("c")); + let fst0 = reader.fst(tag0).await.unwrap(); + assert_eq!(fst0.len(), 3); + let [offset, size] = unpack(fst0.get(b"a").unwrap()); + let bitmap = reader.bitmap(tag0, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + let [offset, size] = unpack(fst0.get(b"b").unwrap()); + let bitmap = reader.bitmap(tag0, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + let [offset, size] = unpack(fst0.get(b"c").unwrap()); + let bitmap = reader.bitmap(tag0, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + + // tag1 + let tag1 = metadata.metas.get("tag1").unwrap(); + let stats1 = tag1.stats.as_ref().unwrap(); + assert_eq!(stats1.distinct_count, 3); + assert_eq!(stats1.null_count, 1); + assert_eq!(stats1.min_value, Bytes::from("x")); + assert_eq!(stats1.max_value, Bytes::from("z")); + let fst1 = reader.fst(tag1).await.unwrap(); + assert_eq!(fst1.len(), 3); + let [offset, size] = unpack(fst1.get(b"x").unwrap()); + let bitmap = reader.bitmap(tag1, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + let [offset, size] = unpack(fst1.get(b"y").unwrap()); + let bitmap = reader.bitmap(tag1, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + let [offset, size] = unpack(fst1.get(b"z").unwrap()); + let bitmap = reader.bitmap(tag1, offset, size).await.unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + } +} diff --git a/src/index/src/inverted_index/format/writer/single.rs b/src/index/src/inverted_index/format/writer/single.rs new file mode 100644 index 000000000000..c652c76390d4 --- /dev/null +++ b/src/index/src/inverted_index/format/writer/single.rs @@ -0,0 +1,213 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::BitVec; +use fst::MapBuilder; +use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt}; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats}; +use snafu::ResultExt; + +use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu}; +use crate::inverted_index::Bytes; + +/// `SingleIndexWriter` writes values to the blob storage for an individual inverted index +pub struct SingleIndexWriter { + /// The underlying blob storage + blob_writer: W, + + /// The null bitmap to be written + null_bitmap: BitVec, + + /// The stream of values to be written, yielded lexicographically + values: S, + + /// Builder for constructing the FST + fst: MapBuilder>, + + /// Metadata about the index + meta: InvertedIndexMeta, +} + +impl SingleIndexWriter +where + W: AsyncWrite + Send + Unpin, + S: Stream> + Send + Unpin, +{ + /// Constructs a new `SingleIndexWriter` + pub fn new( + name: String, + base_offset: u64, + null_bitmap: BitVec, + values: S, + blob_writer: W, + ) -> SingleIndexWriter { + SingleIndexWriter { + blob_writer, + null_bitmap, + values, + fst: MapBuilder::memory(), + meta: InvertedIndexMeta { + name, + base_offset, + stats: Some(InvertedIndexStats::default()), + ..Default::default() + }, + } + } + + /// Writes the null bitmap, values with their bitmaps, and constructs the FST map. + pub async fn write(mut self) -> Result { + self.write_null_bitmap().await?; + + while let Some(result) = self.values.next().await { + let (bytes, bitmap) = result?; + self.append_value(bytes, bitmap).await?; + } + + self.finish_fst_construction().await + } + + /// Writes the null bitmap to the blob and updates the metadata accordingly + async fn write_null_bitmap(&mut self) -> Result<()> { + let null_bitmap_bytes = self.null_bitmap.as_raw_slice(); + self.blob_writer + .write_all(null_bitmap_bytes) + .await + .context(WriteSnafu)?; + + self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _; + self.meta.null_bitmap_size = null_bitmap_bytes.len() as _; + self.meta.inverted_index_size += self.meta.null_bitmap_size as u64; + + // update stats + if let Some(stats) = self.meta.stats.as_mut() { + let null_count = self.null_bitmap.count_ones(); + stats.null_count = null_count as u64; + } + + Ok(()) + } + + /// Appends a value and its bitmap to the blob, updates the FST, and the metadata + async fn append_value(&mut self, value: Bytes, bitmap: BitVec) -> Result<()> { + let bitmap_bytes = bitmap.into_vec(); + self.blob_writer + .write_all(&bitmap_bytes) + .await + .context(WriteSnafu)?; + + let offset = self.meta.inverted_index_size as u32; + let size = bitmap_bytes.len() as u32; + self.meta.inverted_index_size += size as u64; + + let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]); + self.fst.insert(&value, packed).context(FstInsertSnafu)?; + + // update stats + if let Some(stats) = self.meta.stats.as_mut() { + stats.distinct_count += 1; + + // update min/max, assume values are appended in lexicographic order + if stats.distinct_count == 1 { + stats.min_value = value.clone(); + } + stats.max_value = value; + } + + Ok(()) + } + + /// Writes the compiled FST to the blob and finalizes the metadata + async fn finish_fst_construction(mut self) -> Result { + let fst_bytes = self.fst.into_inner().context(FstCompileSnafu)?; + self.blob_writer + .write_all(&fst_bytes) + .await + .context(WriteSnafu)?; + + self.meta.relative_fst_offset = self.meta.inverted_index_size as _; + self.meta.fst_size = fst_bytes.len() as _; + self.meta.inverted_index_size += self.meta.fst_size as u64; + Ok(self.meta) + } +} + +#[cfg(test)] +mod tests { + use futures::stream; + + use super::*; + use crate::inverted_index::error::Error; + use crate::inverted_index::Bytes; + + #[tokio::test] + async fn test_single_index_writer_write_empty() { + let mut blob = Vec::new(); + let writer = SingleIndexWriter::new( + "test".to_string(), + 0, + BitVec::new(), + stream::empty(), + &mut blob, + ); + + let meta = writer.write().await.unwrap(); + assert_eq!(meta.name, "test"); + assert_eq!(meta.base_offset, 0); + assert_eq!(meta.stats, Some(InvertedIndexStats::default())); + } + + #[tokio::test] + async fn test_single_index_writer_write_basic() { + let mut blob = Vec::new(); + let writer = SingleIndexWriter::new( + "test".to_string(), + 0, + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + stream::iter(vec![ + Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))), + Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + ]), + &mut blob, + ); + let meta = writer.write().await.unwrap(); + + assert_eq!(meta.name, "test"); + assert_eq!(meta.base_offset, 0); + let stats = meta.stats.as_ref().unwrap(); + assert_eq!(stats.distinct_count, 3); + assert_eq!(stats.null_count, 1); + assert_eq!(stats.min_value, Bytes::from("a")); + assert_eq!(stats.max_value, Bytes::from("c")); + } + + #[tokio::test] + async fn test_single_index_writer_write_out_of_order() { + let mut blob = Vec::new(); + let writer = SingleIndexWriter::new( + "test".to_string(), + 0, + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + stream::iter(vec![ + Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))), + Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + ]), + &mut blob, + ); + let res = writer.write().await; + assert!(matches!(res, Err(Error::FstInsert { .. }))); + } +}