From 0451d411253c900b8b4f800b581c4b82be8a67ec Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 23 Nov 2023 11:18:03 +0000 Subject: [PATCH 1/6] feat(inverted_index): add reader Signed-off-by: Zhenchi --- Cargo.lock | 25 +- Cargo.toml | 7 +- src/index/Cargo.toml | 20 ++ src/index/src/inverted_index.rs | 16 ++ src/index/src/inverted_index/error.rs | 99 ++++++++ src/index/src/inverted_index/format.rs | 55 +++++ src/index/src/inverted_index/format/reader.rs | 38 +++ .../src/inverted_index/format/reader/blob.rs | 225 ++++++++++++++++++ .../inverted_index/format/reader/footer.rs | 182 ++++++++++++++ src/index/src/lib.rs | 15 ++ src/puffin/src/file_format.rs | 2 + 11 files changed, 682 insertions(+), 2 deletions(-) create mode 100644 src/index/Cargo.toml create mode 100644 src/index/src/inverted_index.rs create mode 100644 src/index/src/inverted_index/error.rs create mode 100644 src/index/src/inverted_index/format.rs create mode 100644 src/index/src/inverted_index/format/reader.rs create mode 100644 src/index/src/inverted_index/format/reader/blob.rs create mode 100644 src/index/src/inverted_index/format/reader/footer.rs create mode 100644 src/index/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 5da1245210c1..4c3410b95092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3344,6 +3344,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fst" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a" + [[package]] name = "funty" version = "2.0.0" @@ -3538,7 +3544,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab" +source = "git+https://github.com/zhongzc/greptime-proto.git?branch=zhongzc/inverted-index-base-offset#3610e9ea16a3606c79962c6c497f3f1fb90fffc6" dependencies = [ "prost 0.12.2", "serde", @@ -3903,6 +3909,23 @@ dependencies = [ "quote", ] +[[package]] +name = "index" +version = "0.4.3" +dependencies = [ + "async-trait", + "common-base", + "common-error", + "common-macro", + "fst", + "futures", + "greptime-proto", + "prost 0.12.2", + "snafu", + "tokio", + "tokio-util", +] + [[package]] name = "indexmap" version = "1.9.3" diff --git a/Cargo.toml b/Cargo.toml index ba46247cf922..21269cff475a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/sql", "src/store-api", "src/table", + "src/index", "tests-integration", "tests/runner", ] @@ -83,9 +84,10 @@ datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" etcd-client = "0.12" +fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2ffdcae9f9816e41f261b24502e6603d68442466" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" @@ -192,3 +194,6 @@ strip = true lto = "thin" debug = false incremental = false + +[patch."https://github.com/GreptimeTeam/greptime-proto.git"] +greptime-proto = { git = "https://github.com/zhongzc/greptime-proto.git", branch = "zhongzc/inverted-index-base-offset" } diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml new file mode 100644 index 000000000000..9eb2583ec3e5 --- /dev/null +++ b/src/index/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "index" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +fst.workspace = true +greptime-proto.workspace = true +snafu.workspace = true +common-error.workspace = true +common-macro.workspace = true +async-trait.workspace = true +common-base.workspace = true +futures.workspace = true +prost.workspace = true + +[dev-dependencies] +tokio-util.workspace = true +tokio.workspace = true diff --git a/src/index/src/inverted_index.rs b/src/index/src/inverted_index.rs new file mode 100644 index 000000000000..43a2234fdde3 --- /dev/null +++ b/src/index/src/inverted_index.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod format; diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs new file mode 100644 index 000000000000..e1c650a3637b --- /dev/null +++ b/src/index/src/inverted_index/error.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Error as IoError; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to seek"))] + Seek { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to read"))] + Read { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display( + "Unexpected inverted index blob size, min: {min_blob_size}, actual: {actual_blob_size}" + ))] + UnexpectedBlobSize { + min_blob_size: u64, + actual_blob_size: u64, + location: Location, + }, + + #[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))] + UnexpectedFooterPayloadSize { + max_payload_size: u64, + actual_payload_size: u64, + location: Location, + }, + + #[snafu(display("Unexpected inverted index offset size, offset: {offset}, size: {size}, blob_size: {blob_size}, payload_size: {payload_size}"))] + UnexpectedOffsetSize { + offset: u64, + size: u64, + blob_size: u64, + payload_size: u64, + }, + + #[snafu(display("Failed to decode fst"))] + DecodeFst { + #[snafu(source)] + error: fst::Error, + location: Location, + }, + + #[snafu(display("Failed to decode protobuf"))] + DecodeProto { + #[snafu(source)] + error: prost::DecodeError, + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + Seek { .. } + | Read { .. } + | UnexpectedFooterPayloadSize { .. } + | UnexpectedOffsetSize { .. } + | UnexpectedBlobSize { .. } + | DecodeProto { .. } + | DecodeFst { .. } => StatusCode::Unexpected, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/index/src/inverted_index/format.rs b/src/index/src/inverted_index/format.rs new file mode 100644 index 000000000000..07d68be61b22 --- /dev/null +++ b/src/index/src/inverted_index/format.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # SST Files with Inverted Index Format Specification +//! +//! ## File Structure +//! +//! Each SST file includes a series of inverted indices followed by a footer. +//! +//! `inverted_index₀ inverted_index₁ ... inverted_indexₙ footer` +//! +//! - Each `inverted_indexᵢ` represents an index entry corresponding to tag values and their locations within the file. +//! - `footer`: Contains metadata about the inverted indices, encoded as a protobuf message. +//! +//! ## Inverted Index Internals +//! +//! An inverted index comprises a collection of bitmaps, a null bitmap, and a finite state transducer (FST) indicating tag values' positions: +//! +//! `bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ null_bitmap fst` +//! +//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group. +//! - `null_bitmap`: Bitset tracking the presence of null values within the tag column. +//! - `fst`: Finite State Transducer providing an ordered map of bytes, representing the tag values. +//! +//! ## Footer Details +//! +//! The footer encapsulates the metadata for inversion mappings: +//! +//! `footer_payload footer_payload_size` +//! +//! - `footer_payload`: Protobuf-encoded `InvertedIndexFooter` information describing the metadata of each inverted index. +//! - `footer_payload_size`: Size in bytes of the `footer_payload`, displayed as a `u32` integer. +//! - The footer aids in the interpretation of the inverted indices, providing necessary offset and count information. +//! +//! ## Reference +//! +//! More detailed information regarding the encoding of the inverted indices can be found in the [RFC]. +//! +//! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md + +pub mod reader; + +const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4; +const MIN_BLOB_SIZE: u64 = FOOTER_PAYLOAD_SIZE_SIZE; diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs new file mode 100644 index 000000000000..2d515444f51c --- /dev/null +++ b/src/index/src/inverted_index/format/reader.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod blob; +mod footer; + +use async_trait::async_trait; +use common_base::BitVec; +use fst::Map; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; + +use crate::inverted_index::error::Result; + +pub type FstMap = Map>; + +/// InvertedIndexReader defines an asynchronous reader of inverted index data +#[async_trait] +pub trait InvertedIndexReader { + /// Retrieve metadata of all inverted indices stored within the blob. + async fn metadata(&mut self) -> Result; + + /// Retrieve the finite state transducer (FST) map for a given inverted index metadata entry. + async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result; + + /// Retrieve the bitmap for a given inverted index metadata entry at the specified offset and size. + async fn bitmap(&mut self, meta: &InvertedIndexMeta, offset: u32, size: u32) -> Result; +} diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs new file mode 100644 index 000000000000..302bf7d41d59 --- /dev/null +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -0,0 +1,225 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::SeekFrom; + +use async_trait::async_trait; +use common_base::BitVec; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; +use snafu::{ensure, ResultExt}; + +use crate::inverted_index::error::{ + DecodeFstSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu, +}; +use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; +use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader}; +use crate::inverted_index::format::MIN_BLOB_SIZE; + +pub struct InvertedIndexBlobReader { + source: R, +} + +impl InvertedIndexBlobReader { + #[allow(dead_code)] + pub fn new(source: R) -> Self { + Self { source } + } +} + +#[async_trait] +impl InvertedIndexReader for InvertedIndexBlobReader { + async fn metadata(&mut self) -> Result { + let end = SeekFrom::End(0); + let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + + ensure!( + blob_size >= MIN_BLOB_SIZE, + UnexpectedBlobSizeSnafu { + min_blob_size: MIN_BLOB_SIZE, + actual_blob_size: blob_size, + } + ); + + InvertedIndeFooterReader::new(&mut self.source, blob_size) + .metadata() + .await + } + + async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result { + let offset = SeekFrom::Start(meta.base_offset + meta.relative_fst_offset as u64); + self.source.seek(offset).await.context(SeekSnafu)?; + let mut buf = vec![0u8; meta.fst_size as usize]; + self.source.read_exact(&mut buf).await.context(ReadSnafu)?; + + FstMap::new(buf).context(DecodeFstSnafu) + } + + async fn bitmap(&mut self, meta: &InvertedIndexMeta, offset: u32, size: u32) -> Result { + let offset = SeekFrom::Start(meta.base_offset + offset as u64); + self.source.seek(offset).await.context(SeekSnafu)?; + let mut buf = vec![0u8; size as usize]; + self.source.read_exact(&mut buf).await.context(ReadSnafu)?; + + Ok(BitVec::from_vec(buf)) + } +} + +#[cfg(test)] +mod tests { + use common_base::bit_vec::prelude::*; + use fst::MapBuilder; + use futures::io::Cursor; + use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; + use prost::Message; + + use super::*; + + fn create_fake_fst() -> Vec { + let mut fst_buf = Vec::new(); + let mut build = MapBuilder::new(&mut fst_buf).unwrap(); + build.insert("key1".as_bytes(), 1).unwrap(); + build.insert("key2".as_bytes(), 2).unwrap(); + build.finish().unwrap(); + fst_buf + } + + fn create_fake_bitmap() -> Vec { + bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0, 1, 0].into_vec() + } + + fn create_inverted_index_blob() -> Vec { + let bitmap_size = create_fake_bitmap().len(); + let fst_size = create_fake_fst().len(); + + // first index + let mut inverted_index = Vec::new(); + inverted_index.extend_from_slice(&create_fake_bitmap()); // value bitmap + inverted_index.extend_from_slice(&create_fake_bitmap()); // null bitmap + inverted_index.extend_from_slice(&create_fake_fst()); // fst + + let meta = InvertedIndexMeta { + name: "index0".to_string(), + base_offset: 0, + inverted_index_size: inverted_index.len() as _, + relative_null_bitmap_offset: bitmap_size as _, + null_bitmap_size: bitmap_size as _, + relative_fst_offset: (bitmap_size * 2) as _, + fst_size: fst_size as _, + ..Default::default() + }; + + // second index + let meta1 = InvertedIndexMeta { + name: "index1".to_string(), + base_offset: meta.inverted_index_size, + inverted_index_size: inverted_index.len() as _, + relative_null_bitmap_offset: bitmap_size as _, + null_bitmap_size: bitmap_size as _, + relative_fst_offset: (bitmap_size * 2) as _, + fst_size: fst_size as _, + ..Default::default() + }; + + // metas + let mut metas = InvertedIndexMetas::default(); + metas.metas.insert(meta.name.clone(), meta); + metas.metas.insert(meta1.name.clone(), meta1); + let mut meta_buf = Vec::new(); + metas.encode(&mut meta_buf).unwrap(); + + let mut blob = vec![]; + + // first index + blob.extend_from_slice(&inverted_index); + + // second index + blob.extend_from_slice(&inverted_index); + + // footer + blob.extend_from_slice(&meta_buf); + blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes()); + + blob + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_metadata() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + assert_eq!(metas.metas.len(), 2); + + let meta0 = metas.metas.get("index0").unwrap(); + assert_eq!(meta0.name, "index0"); + assert_eq!(meta0.base_offset, 0); + assert_eq!(meta0.inverted_index_size, 54); + assert_eq!(meta0.relative_null_bitmap_offset, 2); + assert_eq!(meta0.null_bitmap_size, 2); + assert_eq!(meta0.relative_fst_offset, 4); + assert_eq!(meta0.fst_size, 50); + + let meta1 = metas.metas.get("index1").unwrap(); + assert_eq!(meta1.name, "index1"); + assert_eq!(meta1.base_offset, 54); + assert_eq!(meta1.inverted_index_size, 54); + assert_eq!(meta1.relative_null_bitmap_offset, 2); + assert_eq!(meta1.null_bitmap_size, 2); + assert_eq!(meta1.relative_fst_offset, 4); + assert_eq!(meta1.fst_size, 50); + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_fst() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("index0").unwrap(); + + let fst_map = blob_reader.fst(meta).await.unwrap(); + assert_eq!(fst_map.len(), 2); + assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); + assert_eq!(fst_map.get("key2".as_bytes()), Some(2)); + + let meta = metas.metas.get("index1").unwrap(); + let fst_map = blob_reader.fst(meta).await.unwrap(); + assert_eq!(fst_map.len(), 2); + assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); + assert_eq!(fst_map.get("key2".as_bytes()), Some(2)); + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_bitmap() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("index0").unwrap(); + + let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("index1").unwrap(); + + let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + } +} diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs new file mode 100644 index 000000000000..e1e4547bb79c --- /dev/null +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -0,0 +1,182 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::SeekFrom; + +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; +use prost::Message; +use snafu::{ensure, ResultExt}; + +use crate::inverted_index::error::{ + DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + UnexpectedOffsetSizeSnafu, +}; +use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; + +/// InvertedIndeFooterReader is for reading the footer section of the blob. +pub struct InvertedIndeFooterReader { + source: R, + blob_size: u64, +} + +impl InvertedIndeFooterReader { + pub fn new(source: R, blob_size: u64) -> Self { + Self { source, blob_size } + } +} + +impl InvertedIndeFooterReader { + pub async fn metadata(&mut self) -> Result { + let payload_size = self.read_payload_size().await?; + let metas = self.read_payload(payload_size).await?; + Ok(metas) + } + + async fn read_payload_size(&mut self) -> Result { + let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE); + self.source.seek(size_offset).await.context(SeekSnafu)?; + let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; + self.source.read_exact(size_buf).await.context(ReadSnafu)?; + + let payload_size = u32::from_le_bytes(*size_buf) as u64; + + let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; + ensure!( + payload_size <= max_payload_size, + UnexpectedFooterPayloadSizeSnafu { + max_payload_size, + actual_payload_size: payload_size, + } + ); + + Ok(payload_size) + } + + async fn read_payload(&mut self, payload_size: u64) -> Result { + let payload_offset = + SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size); + self.source.seek(payload_offset).await.context(SeekSnafu)?; + + let payload = &mut vec![0u8; payload_size as usize]; + self.source.read_exact(payload).await.context(ReadSnafu)?; + + let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?; + self.validate_metas(&metas, payload_size)?; + + Ok(metas) + } + + /// Check if the read metadata is consistent with expected sizes and offsets. + fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> { + for meta in metas.metas.values() { + let InvertedIndexMeta { + base_offset, + inverted_index_size, + .. + } = meta; + + let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size; + ensure!( + *base_offset + *inverted_index_size <= limit, + UnexpectedOffsetSizeSnafu { + offset: *base_offset, + size: *inverted_index_size, + blob_size: self.blob_size, + payload_size, + } + ); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use prost::Message; + + use super::*; + + // Helper function to construct the test payload + fn create_test_payload(meta: InvertedIndexMeta) -> Vec { + let mut metas = InvertedIndexMetas::default(); + metas.metas.insert("test".to_string(), meta); + + let mut payload_buf = vec![]; + metas.encode(&mut payload_buf).unwrap(); + + let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec(); + payload_buf.extend_from_slice(&footer_payload_size); + payload_buf + } + + #[tokio::test] + async fn test_read_payload() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + segment_row_count: 4096, + ..Default::default() + }; + + let payload_buf = create_test_payload(meta); + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size = reader.read_payload_size().await.unwrap(); + let metas = reader.read_payload(payload_size).await.unwrap(); + + assert_eq!(metas.metas.len(), 1); + let index_meta = &metas.metas.get("test").unwrap(); + assert_eq!(index_meta.name, "test"); + assert_eq!(index_meta.segment_row_count, 4096); + } + + #[tokio::test] + async fn test_invalid_footer_payload_size() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + segment_row_count: 4096, + ..Default::default() + }; + + let mut payload_buf = create_test_payload(meta); + payload_buf.push(0xff); // Add an extra byte to corrupt the footer + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size_result = reader.read_payload_size().await; + assert!(payload_size_result.is_err()); + } + + #[tokio::test] + async fn test_invalid_offset_size() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + base_offset: 0, + inverted_index_size: 1, // Set size to 1 to make ecceed the blob size + segment_row_count: 4096, + ..Default::default() + }; + + let payload_buf = create_test_payload(meta); + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size = reader.read_payload_size().await.unwrap(); + let payload_result = reader.read_payload(payload_size).await; + assert!(payload_result.is_err()); + } +} diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs new file mode 100644 index 000000000000..efed1e963bf0 --- /dev/null +++ b/src/index/src/lib.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod inverted_index; diff --git a/src/puffin/src/file_format.rs b/src/puffin/src/file_format.rs index 075a06c96d50..2cb77c8c242d 100644 --- a/src/puffin/src/file_format.rs +++ b/src/puffin/src/file_format.rs @@ -40,6 +40,8 @@ //! //! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present), //! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object. +//! +//! [`FileMetadata`]: ../file_metadata/struct.FileMetadata.html pub mod reader; pub mod writer; From ac59189cd20542b2f9aa2676f1799bd498c6f8bf Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 23 Nov 2023 11:27:51 +0000 Subject: [PATCH 2/6] fix: toml format Signed-off-by: Zhenchi --- src/index/Cargo.toml | 10 +++++----- src/index/src/inverted_index/format/reader/footer.rs | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 9eb2583ec3e5..6d667336903d 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -5,15 +5,15 @@ edition.workspace = true license.workspace = true [dependencies] -fst.workspace = true -greptime-proto.workspace = true -snafu.workspace = true -common-error.workspace = true -common-macro.workspace = true async-trait.workspace = true common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true +fst.workspace = true futures.workspace = true +greptime-proto.workspace = true prost.workspace = true +snafu.workspace = true [dev-dependencies] tokio-util.workspace = true diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index e1e4547bb79c..da267cc8ee97 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -110,7 +110,6 @@ mod tests { use super::*; - // Helper function to construct the test payload fn create_test_payload(meta: InvertedIndexMeta) -> Vec { let mut metas = InvertedIndexMetas::default(); metas.metas.insert("test".to_string(), meta); From ea5f46d507c27b4e67f0854ebc9db2be3c9c4bb5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 23 Nov 2023 11:31:08 +0000 Subject: [PATCH 3/6] chore: add prefix relative_ to the offset parameter Signed-off-by: Zhenchi --- src/index/src/inverted_index/format/reader.rs | 7 ++++++- src/index/src/inverted_index/format/reader/blob.rs | 9 +++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 2d515444f51c..2fbe703798d4 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -34,5 +34,10 @@ pub trait InvertedIndexReader { async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result; /// Retrieve the bitmap for a given inverted index metadata entry at the specified offset and size. - async fn bitmap(&mut self, meta: &InvertedIndexMeta, offset: u32, size: u32) -> Result; + async fn bitmap( + &mut self, + meta: &InvertedIndexMeta, + relative_offset: u32, + size: u32, + ) -> Result; } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 302bf7d41d59..d1001cdaad29 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -66,8 +66,13 @@ impl InvertedIndexReader for InvertedIn FstMap::new(buf).context(DecodeFstSnafu) } - async fn bitmap(&mut self, meta: &InvertedIndexMeta, offset: u32, size: u32) -> Result { - let offset = SeekFrom::Start(meta.base_offset + offset as u64); + async fn bitmap( + &mut self, + meta: &InvertedIndexMeta, + relative_offset: u32, + size: u32, + ) -> Result { + let offset = SeekFrom::Start(meta.base_offset + relative_offset as u64); self.source.seek(offset).await.context(SeekSnafu)?; let mut buf = vec![0u8; size as usize]; self.source.read_exact(&mut buf).await.context(ReadSnafu)?; From 9eae701626a9e46a5f008e7617c3bc1b8244b94c Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 24 Nov 2023 07:51:29 +0000 Subject: [PATCH 4/6] docs: add doc comment Signed-off-by: Zhenchi --- .../src/inverted_index/format/reader/blob.rs | 45 ++++++++++--------- .../inverted_index/format/reader/footer.rs | 23 ++++++---- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index d1001cdaad29..40509e46bafb 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -27,7 +27,9 @@ use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader}; use crate::inverted_index::format::MIN_BLOB_SIZE; +/// Inverted index blob reader, implements [`InvertedIndexReader`] pub struct InvertedIndexBlobReader { + /// The blob source: R, } @@ -36,14 +38,8 @@ impl InvertedIndexBlobReader { pub fn new(source: R) -> Self { Self { source } } -} - -#[async_trait] -impl InvertedIndexReader for InvertedIndexBlobReader { - async fn metadata(&mut self) -> Result { - let end = SeekFrom::End(0); - let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + fn validate_blob_size(blob_size: u64) -> Result<()> { ensure!( blob_size >= MIN_BLOB_SIZE, UnexpectedBlobSizeSnafu { @@ -51,10 +47,19 @@ impl InvertedIndexReader for InvertedIn actual_blob_size: blob_size, } ); + Ok(()) + } +} + +#[async_trait] +impl InvertedIndexReader for InvertedIndexBlobReader { + async fn metadata(&mut self) -> Result { + let end = SeekFrom::End(0); + let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + Self::validate_blob_size(blob_size)?; - InvertedIndeFooterReader::new(&mut self.source, blob_size) - .metadata() - .await + let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); + footer_reader.metadata().await } async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result { @@ -115,7 +120,7 @@ mod tests { inverted_index.extend_from_slice(&create_fake_fst()); // fst let meta = InvertedIndexMeta { - name: "index0".to_string(), + name: "tag0".to_string(), base_offset: 0, inverted_index_size: inverted_index.len() as _, relative_null_bitmap_offset: bitmap_size as _, @@ -127,7 +132,7 @@ mod tests { // second index let meta1 = InvertedIndexMeta { - name: "index1".to_string(), + name: "tag1".to_string(), base_offset: meta.inverted_index_size, inverted_index_size: inverted_index.len() as _, relative_null_bitmap_offset: bitmap_size as _, @@ -167,8 +172,8 @@ mod tests { let metas = blob_reader.metadata().await.unwrap(); assert_eq!(metas.metas.len(), 2); - let meta0 = metas.metas.get("index0").unwrap(); - assert_eq!(meta0.name, "index0"); + let meta0 = metas.metas.get("tag0").unwrap(); + assert_eq!(meta0.name, "tag0"); assert_eq!(meta0.base_offset, 0); assert_eq!(meta0.inverted_index_size, 54); assert_eq!(meta0.relative_null_bitmap_offset, 2); @@ -176,8 +181,8 @@ mod tests { assert_eq!(meta0.relative_fst_offset, 4); assert_eq!(meta0.fst_size, 50); - let meta1 = metas.metas.get("index1").unwrap(); - assert_eq!(meta1.name, "index1"); + let meta1 = metas.metas.get("tag1").unwrap(); + assert_eq!(meta1.name, "tag1"); assert_eq!(meta1.base_offset, 54); assert_eq!(meta1.inverted_index_size, 54); assert_eq!(meta1.relative_null_bitmap_offset, 2); @@ -192,14 +197,14 @@ mod tests { let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); let metas = blob_reader.metadata().await.unwrap(); - let meta = metas.metas.get("index0").unwrap(); + let meta = metas.metas.get("tag0").unwrap(); let fst_map = blob_reader.fst(meta).await.unwrap(); assert_eq!(fst_map.len(), 2); assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); assert_eq!(fst_map.get("key2".as_bytes()), Some(2)); - let meta = metas.metas.get("index1").unwrap(); + let meta = metas.metas.get("tag1").unwrap(); let fst_map = blob_reader.fst(meta).await.unwrap(); assert_eq!(fst_map.len(), 2); assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); @@ -212,7 +217,7 @@ mod tests { let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); let metas = blob_reader.metadata().await.unwrap(); - let meta = metas.metas.get("index0").unwrap(); + let meta = metas.metas.get("tag0").unwrap(); let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); assert_eq!(bitmap.into_vec(), create_fake_bitmap()); @@ -220,7 +225,7 @@ mod tests { assert_eq!(bitmap.into_vec(), create_fake_bitmap()); let metas = blob_reader.metadata().await.unwrap(); - let meta = metas.metas.get("index1").unwrap(); + let meta = metas.metas.get("tag1").unwrap(); let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); assert_eq!(bitmap.into_vec(), create_fake_bitmap()); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index da267cc8ee97..77cc61d7c21e 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -51,15 +51,7 @@ impl InvertedIndeFooterReader { self.source.read_exact(size_buf).await.context(ReadSnafu)?; let payload_size = u32::from_le_bytes(*size_buf) as u64; - - let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; - ensure!( - payload_size <= max_payload_size, - UnexpectedFooterPayloadSizeSnafu { - max_payload_size, - actual_payload_size: payload_size, - } - ); + self.validate_payload_size(payload_size)?; Ok(payload_size) } @@ -78,6 +70,19 @@ impl InvertedIndeFooterReader { Ok(metas) } + fn validate_payload_size(&self, payload_size: u64) -> Result<()> { + let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; + ensure!( + payload_size <= max_payload_size, + UnexpectedFooterPayloadSizeSnafu { + max_payload_size, + actual_payload_size: payload_size, + } + ); + + Ok(()) + } + /// Check if the read metadata is consistent with expected sizes and offsets. fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> { for meta in metas.metas.values() { From 935931d1e354a5740a989db4d6faae754b38f27b Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 24 Nov 2023 08:27:46 +0000 Subject: [PATCH 5/6] chore: update proto Signed-off-by: Zhenchi --- Cargo.lock | 2 +- Cargo.toml | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c3410b95092..01234897c5d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3544,7 +3544,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/zhongzc/greptime-proto.git?branch=zhongzc/inverted-index-base-offset#3610e9ea16a3606c79962c6c497f3f1fb90fffc6" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7#2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7" dependencies = [ "prost 0.12.2", "serde", diff --git a/Cargo.toml b/Cargo.toml index 21269cff475a..070f7fc186b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2ffdcae9f9816e41f261b24502e6603d68442466" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" @@ -194,6 +194,3 @@ strip = true lto = "thin" debug = false incremental = false - -[patch."https://github.com/GreptimeTeam/greptime-proto.git"] -greptime-proto = { git = "https://github.com/zhongzc/greptime-proto.git", branch = "zhongzc/inverted-index-base-offset" } From 9ca5bc43e676014632faa0a6419d453c8f54f67d Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 24 Nov 2023 09:20:55 +0000 Subject: [PATCH 6/6] fix: outdated docs Signed-off-by: Zhenchi --- src/index/src/inverted_index/format.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/index/src/inverted_index/format.rs b/src/index/src/inverted_index/format.rs index 07d68be61b22..fe26d9eb0330 100644 --- a/src/index/src/inverted_index/format.rs +++ b/src/index/src/inverted_index/format.rs @@ -39,7 +39,7 @@ //! //! `footer_payload footer_payload_size` //! -//! - `footer_payload`: Protobuf-encoded `InvertedIndexFooter` information describing the metadata of each inverted index. +//! - `footer_payload`: Protobuf-encoded [`InvertedIndexMetas`] describing the metadata of each inverted index. //! - `footer_payload_size`: Size in bytes of the `footer_payload`, displayed as a `u32` integer. //! - The footer aids in the interpretation of the inverted indices, providing necessary offset and count information. //! @@ -47,6 +47,7 @@ //! //! More detailed information regarding the encoding of the inverted indices can be found in the [RFC]. //! +//! [`InvertedIndexMetas`]: https://github.com/GreptimeTeam/greptime-proto/blob/2aaee38de81047537dfa42af9df63bcfb866e06c/proto/greptime/v1/index/inverted_index.proto#L32-L64 //! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md pub mod reader;