From b92be3d9969b2a238d8bba5baf922dea4cacd02b Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 24 Nov 2023 16:26:15 +0800 Subject: [PATCH] feat(inverted_index): add index reader (#2803) * feat(inverted_index): add reader Signed-off-by: Zhenchi * fix: toml format Signed-off-by: Zhenchi * chore: add prefix relative_ to the offset parameter Signed-off-by: Zhenchi * docs: add doc comment Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 25 +- Cargo.toml | 7 +- src/index/Cargo.toml | 20 ++ src/index/src/inverted_index.rs | 16 ++ src/index/src/inverted_index/error.rs | 99 ++++++++ src/index/src/inverted_index/format.rs | 55 ++++ src/index/src/inverted_index/format/reader.rs | 43 ++++ .../src/inverted_index/format/reader/blob.rs | 235 ++++++++++++++++++ .../inverted_index/format/reader/footer.rs | 186 ++++++++++++++ src/index/src/lib.rs | 15 ++ src/puffin/src/file_format.rs | 2 + 11 files changed, 701 insertions(+), 2 deletions(-) create mode 100644 src/index/Cargo.toml create mode 100644 src/index/src/inverted_index.rs create mode 100644 src/index/src/inverted_index/error.rs create mode 100644 src/index/src/inverted_index/format.rs create mode 100644 src/index/src/inverted_index/format/reader.rs create mode 100644 src/index/src/inverted_index/format/reader/blob.rs create mode 100644 src/index/src/inverted_index/format/reader/footer.rs create mode 100644 src/index/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index da0b43f05d94..a8d1eac9e949 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3342,6 +3342,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fst" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a" + [[package]] name = "funty" version = "2.0.0" @@ -3536,7 +3542,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab" +source = "git+https://github.com/zhongzc/greptime-proto.git?branch=zhongzc/inverted-index-base-offset#3610e9ea16a3606c79962c6c497f3f1fb90fffc6" dependencies = [ "prost 0.12.2", "serde", @@ -3901,6 +3907,23 @@ dependencies = [ "quote", ] +[[package]] +name = "index" +version = "0.4.3" +dependencies = [ + "async-trait", + "common-base", + "common-error", + "common-macro", + "fst", + "futures", + "greptime-proto", + "prost 0.12.2", + "snafu", + "tokio", + "tokio-util", +] + [[package]] name = "indexmap" version = "1.9.3" diff --git a/Cargo.toml b/Cargo.toml index ba46247cf922..21269cff475a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/sql", "src/store-api", "src/table", + "src/index", "tests-integration", "tests/runner", ] @@ -83,9 +84,10 @@ datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" etcd-client = "0.12" +fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2ffdcae9f9816e41f261b24502e6603d68442466" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" @@ -192,3 +194,6 @@ strip = true lto = "thin" debug = false incremental = false + +[patch."https://github.com/GreptimeTeam/greptime-proto.git"] +greptime-proto = { git = "https://github.com/zhongzc/greptime-proto.git", branch = "zhongzc/inverted-index-base-offset" } diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml new file mode 100644 index 000000000000..6d667336903d --- /dev/null +++ b/src/index/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "index" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true +fst.workspace = true +futures.workspace = true +greptime-proto.workspace = true +prost.workspace = true +snafu.workspace = true + +[dev-dependencies] +tokio-util.workspace = true +tokio.workspace = true diff --git a/src/index/src/inverted_index.rs b/src/index/src/inverted_index.rs new file mode 100644 index 000000000000..43a2234fdde3 --- /dev/null +++ b/src/index/src/inverted_index.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod format; diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs new file mode 100644 index 000000000000..e1c650a3637b --- /dev/null +++ b/src/index/src/inverted_index/error.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Error as IoError; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to seek"))] + Seek { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to read"))] + Read { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display( + "Unexpected inverted index blob size, min: {min_blob_size}, actual: {actual_blob_size}" + ))] + UnexpectedBlobSize { + min_blob_size: u64, + actual_blob_size: u64, + location: Location, + }, + + #[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))] + UnexpectedFooterPayloadSize { + max_payload_size: u64, + actual_payload_size: u64, + location: Location, + }, + + #[snafu(display("Unexpected inverted index offset size, offset: {offset}, size: {size}, blob_size: {blob_size}, payload_size: {payload_size}"))] + UnexpectedOffsetSize { + offset: u64, + size: u64, + blob_size: u64, + payload_size: u64, + }, + + #[snafu(display("Failed to decode fst"))] + DecodeFst { + #[snafu(source)] + error: fst::Error, + location: Location, + }, + + #[snafu(display("Failed to decode protobuf"))] + DecodeProto { + #[snafu(source)] + error: prost::DecodeError, + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + Seek { .. } + | Read { .. } + | UnexpectedFooterPayloadSize { .. } + | UnexpectedOffsetSize { .. } + | UnexpectedBlobSize { .. } + | DecodeProto { .. } + | DecodeFst { .. } => StatusCode::Unexpected, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/index/src/inverted_index/format.rs b/src/index/src/inverted_index/format.rs new file mode 100644 index 000000000000..07d68be61b22 --- /dev/null +++ b/src/index/src/inverted_index/format.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # SST Files with Inverted Index Format Specification +//! +//! ## File Structure +//! +//! Each SST file includes a series of inverted indices followed by a footer. +//! +//! `inverted_index₀ inverted_index₁ ... inverted_indexₙ footer` +//! +//! - Each `inverted_indexᵢ` represents an index entry corresponding to tag values and their locations within the file. +//! - `footer`: Contains metadata about the inverted indices, encoded as a protobuf message. +//! +//! ## Inverted Index Internals +//! +//! An inverted index comprises a collection of bitmaps, a null bitmap, and a finite state transducer (FST) indicating tag values' positions: +//! +//! `bitmap₀ bitmap₁ bitmap₂ ... bitmapₙ null_bitmap fst` +//! +//! - `bitmapᵢ`: Bitset indicating the presence of tag values within a row group. +//! - `null_bitmap`: Bitset tracking the presence of null values within the tag column. +//! - `fst`: Finite State Transducer providing an ordered map of bytes, representing the tag values. +//! +//! ## Footer Details +//! +//! The footer encapsulates the metadata for inversion mappings: +//! +//! `footer_payload footer_payload_size` +//! +//! - `footer_payload`: Protobuf-encoded `InvertedIndexFooter` information describing the metadata of each inverted index. +//! - `footer_payload_size`: Size in bytes of the `footer_payload`, displayed as a `u32` integer. +//! - The footer aids in the interpretation of the inverted indices, providing necessary offset and count information. +//! +//! ## Reference +//! +//! More detailed information regarding the encoding of the inverted indices can be found in the [RFC]. +//! +//! [RFC]: https://github.com/GreptimeTeam/greptimedb/blob/develop/docs/rfcs/2023-11-03-inverted-index.md + +pub mod reader; + +const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4; +const MIN_BLOB_SIZE: u64 = FOOTER_PAYLOAD_SIZE_SIZE; diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs new file mode 100644 index 000000000000..2fbe703798d4 --- /dev/null +++ b/src/index/src/inverted_index/format/reader.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod blob; +mod footer; + +use async_trait::async_trait; +use common_base::BitVec; +use fst::Map; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; + +use crate::inverted_index::error::Result; + +pub type FstMap = Map>; + +/// InvertedIndexReader defines an asynchronous reader of inverted index data +#[async_trait] +pub trait InvertedIndexReader { + /// Retrieve metadata of all inverted indices stored within the blob. + async fn metadata(&mut self) -> Result; + + /// Retrieve the finite state transducer (FST) map for a given inverted index metadata entry. + async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result; + + /// Retrieve the bitmap for a given inverted index metadata entry at the specified offset and size. + async fn bitmap( + &mut self, + meta: &InvertedIndexMeta, + relative_offset: u32, + size: u32, + ) -> Result; +} diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs new file mode 100644 index 000000000000..40509e46bafb --- /dev/null +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::SeekFrom; + +use async_trait::async_trait; +use common_base::BitVec; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; +use snafu::{ensure, ResultExt}; + +use crate::inverted_index::error::{ + DecodeFstSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu, +}; +use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; +use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader}; +use crate::inverted_index::format::MIN_BLOB_SIZE; + +/// Inverted index blob reader, implements [`InvertedIndexReader`] +pub struct InvertedIndexBlobReader { + /// The blob + source: R, +} + +impl InvertedIndexBlobReader { + #[allow(dead_code)] + pub fn new(source: R) -> Self { + Self { source } + } + + fn validate_blob_size(blob_size: u64) -> Result<()> { + ensure!( + blob_size >= MIN_BLOB_SIZE, + UnexpectedBlobSizeSnafu { + min_blob_size: MIN_BLOB_SIZE, + actual_blob_size: blob_size, + } + ); + Ok(()) + } +} + +#[async_trait] +impl InvertedIndexReader for InvertedIndexBlobReader { + async fn metadata(&mut self) -> Result { + let end = SeekFrom::End(0); + let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + Self::validate_blob_size(blob_size)?; + + let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); + footer_reader.metadata().await + } + + async fn fst(&mut self, meta: &InvertedIndexMeta) -> Result { + let offset = SeekFrom::Start(meta.base_offset + meta.relative_fst_offset as u64); + self.source.seek(offset).await.context(SeekSnafu)?; + let mut buf = vec![0u8; meta.fst_size as usize]; + self.source.read_exact(&mut buf).await.context(ReadSnafu)?; + + FstMap::new(buf).context(DecodeFstSnafu) + } + + async fn bitmap( + &mut self, + meta: &InvertedIndexMeta, + relative_offset: u32, + size: u32, + ) -> Result { + let offset = SeekFrom::Start(meta.base_offset + relative_offset as u64); + self.source.seek(offset).await.context(SeekSnafu)?; + let mut buf = vec![0u8; size as usize]; + self.source.read_exact(&mut buf).await.context(ReadSnafu)?; + + Ok(BitVec::from_vec(buf)) + } +} + +#[cfg(test)] +mod tests { + use common_base::bit_vec::prelude::*; + use fst::MapBuilder; + use futures::io::Cursor; + use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; + use prost::Message; + + use super::*; + + fn create_fake_fst() -> Vec { + let mut fst_buf = Vec::new(); + let mut build = MapBuilder::new(&mut fst_buf).unwrap(); + build.insert("key1".as_bytes(), 1).unwrap(); + build.insert("key2".as_bytes(), 2).unwrap(); + build.finish().unwrap(); + fst_buf + } + + fn create_fake_bitmap() -> Vec { + bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0, 1, 0].into_vec() + } + + fn create_inverted_index_blob() -> Vec { + let bitmap_size = create_fake_bitmap().len(); + let fst_size = create_fake_fst().len(); + + // first index + let mut inverted_index = Vec::new(); + inverted_index.extend_from_slice(&create_fake_bitmap()); // value bitmap + inverted_index.extend_from_slice(&create_fake_bitmap()); // null bitmap + inverted_index.extend_from_slice(&create_fake_fst()); // fst + + let meta = InvertedIndexMeta { + name: "tag0".to_string(), + base_offset: 0, + inverted_index_size: inverted_index.len() as _, + relative_null_bitmap_offset: bitmap_size as _, + null_bitmap_size: bitmap_size as _, + relative_fst_offset: (bitmap_size * 2) as _, + fst_size: fst_size as _, + ..Default::default() + }; + + // second index + let meta1 = InvertedIndexMeta { + name: "tag1".to_string(), + base_offset: meta.inverted_index_size, + inverted_index_size: inverted_index.len() as _, + relative_null_bitmap_offset: bitmap_size as _, + null_bitmap_size: bitmap_size as _, + relative_fst_offset: (bitmap_size * 2) as _, + fst_size: fst_size as _, + ..Default::default() + }; + + // metas + let mut metas = InvertedIndexMetas::default(); + metas.metas.insert(meta.name.clone(), meta); + metas.metas.insert(meta1.name.clone(), meta1); + let mut meta_buf = Vec::new(); + metas.encode(&mut meta_buf).unwrap(); + + let mut blob = vec![]; + + // first index + blob.extend_from_slice(&inverted_index); + + // second index + blob.extend_from_slice(&inverted_index); + + // footer + blob.extend_from_slice(&meta_buf); + blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes()); + + blob + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_metadata() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + assert_eq!(metas.metas.len(), 2); + + let meta0 = metas.metas.get("tag0").unwrap(); + assert_eq!(meta0.name, "tag0"); + assert_eq!(meta0.base_offset, 0); + assert_eq!(meta0.inverted_index_size, 54); + assert_eq!(meta0.relative_null_bitmap_offset, 2); + assert_eq!(meta0.null_bitmap_size, 2); + assert_eq!(meta0.relative_fst_offset, 4); + assert_eq!(meta0.fst_size, 50); + + let meta1 = metas.metas.get("tag1").unwrap(); + assert_eq!(meta1.name, "tag1"); + assert_eq!(meta1.base_offset, 54); + assert_eq!(meta1.inverted_index_size, 54); + assert_eq!(meta1.relative_null_bitmap_offset, 2); + assert_eq!(meta1.null_bitmap_size, 2); + assert_eq!(meta1.relative_fst_offset, 4); + assert_eq!(meta1.fst_size, 50); + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_fst() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("tag0").unwrap(); + + let fst_map = blob_reader.fst(meta).await.unwrap(); + assert_eq!(fst_map.len(), 2); + assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); + assert_eq!(fst_map.get("key2".as_bytes()), Some(2)); + + let meta = metas.metas.get("tag1").unwrap(); + let fst_map = blob_reader.fst(meta).await.unwrap(); + assert_eq!(fst_map.len(), 2); + assert_eq!(fst_map.get("key1".as_bytes()), Some(1)); + assert_eq!(fst_map.get("key2".as_bytes()), Some(2)); + } + + #[tokio::test] + async fn test_inverted_index_blob_reader_bitmap() { + let blob = create_inverted_index_blob(); + let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("tag0").unwrap(); + + let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + + let metas = blob_reader.metadata().await.unwrap(); + let meta = metas.metas.get("tag1").unwrap(); + + let bitmap = blob_reader.bitmap(meta, 0, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader.bitmap(meta, 2, 2).await.unwrap(); + assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + } +} diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs new file mode 100644 index 000000000000..77cc61d7c21e --- /dev/null +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -0,0 +1,186 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::SeekFrom; + +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; +use prost::Message; +use snafu::{ensure, ResultExt}; + +use crate::inverted_index::error::{ + DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + UnexpectedOffsetSizeSnafu, +}; +use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; + +/// InvertedIndeFooterReader is for reading the footer section of the blob. +pub struct InvertedIndeFooterReader { + source: R, + blob_size: u64, +} + +impl InvertedIndeFooterReader { + pub fn new(source: R, blob_size: u64) -> Self { + Self { source, blob_size } + } +} + +impl InvertedIndeFooterReader { + pub async fn metadata(&mut self) -> Result { + let payload_size = self.read_payload_size().await?; + let metas = self.read_payload(payload_size).await?; + Ok(metas) + } + + async fn read_payload_size(&mut self) -> Result { + let size_offset = SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE); + self.source.seek(size_offset).await.context(SeekSnafu)?; + let size_buf = &mut [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; + self.source.read_exact(size_buf).await.context(ReadSnafu)?; + + let payload_size = u32::from_le_bytes(*size_buf) as u64; + self.validate_payload_size(payload_size)?; + + Ok(payload_size) + } + + async fn read_payload(&mut self, payload_size: u64) -> Result { + let payload_offset = + SeekFrom::Start(self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size); + self.source.seek(payload_offset).await.context(SeekSnafu)?; + + let payload = &mut vec![0u8; payload_size as usize]; + self.source.read_exact(payload).await.context(ReadSnafu)?; + + let metas = InvertedIndexMetas::decode(&payload[..]).context(DecodeProtoSnafu)?; + self.validate_metas(&metas, payload_size)?; + + Ok(metas) + } + + fn validate_payload_size(&self, payload_size: u64) -> Result<()> { + let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; + ensure!( + payload_size <= max_payload_size, + UnexpectedFooterPayloadSizeSnafu { + max_payload_size, + actual_payload_size: payload_size, + } + ); + + Ok(()) + } + + /// Check if the read metadata is consistent with expected sizes and offsets. + fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> { + for meta in metas.metas.values() { + let InvertedIndexMeta { + base_offset, + inverted_index_size, + .. + } = meta; + + let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size; + ensure!( + *base_offset + *inverted_index_size <= limit, + UnexpectedOffsetSizeSnafu { + offset: *base_offset, + size: *inverted_index_size, + blob_size: self.blob_size, + payload_size, + } + ); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use prost::Message; + + use super::*; + + fn create_test_payload(meta: InvertedIndexMeta) -> Vec { + let mut metas = InvertedIndexMetas::default(); + metas.metas.insert("test".to_string(), meta); + + let mut payload_buf = vec![]; + metas.encode(&mut payload_buf).unwrap(); + + let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec(); + payload_buf.extend_from_slice(&footer_payload_size); + payload_buf + } + + #[tokio::test] + async fn test_read_payload() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + segment_row_count: 4096, + ..Default::default() + }; + + let payload_buf = create_test_payload(meta); + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size = reader.read_payload_size().await.unwrap(); + let metas = reader.read_payload(payload_size).await.unwrap(); + + assert_eq!(metas.metas.len(), 1); + let index_meta = &metas.metas.get("test").unwrap(); + assert_eq!(index_meta.name, "test"); + assert_eq!(index_meta.segment_row_count, 4096); + } + + #[tokio::test] + async fn test_invalid_footer_payload_size() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + segment_row_count: 4096, + ..Default::default() + }; + + let mut payload_buf = create_test_payload(meta); + payload_buf.push(0xff); // Add an extra byte to corrupt the footer + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size_result = reader.read_payload_size().await; + assert!(payload_size_result.is_err()); + } + + #[tokio::test] + async fn test_invalid_offset_size() { + let meta = InvertedIndexMeta { + name: "test".to_string(), + base_offset: 0, + inverted_index_size: 1, // Set size to 1 to make ecceed the blob size + segment_row_count: 4096, + ..Default::default() + }; + + let payload_buf = create_test_payload(meta); + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(Cursor::new(payload_buf), blob_size); + + let payload_size = reader.read_payload_size().await.unwrap(); + let payload_result = reader.read_payload(payload_size).await; + assert!(payload_result.is_err()); + } +} diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs new file mode 100644 index 000000000000..efed1e963bf0 --- /dev/null +++ b/src/index/src/lib.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod inverted_index; diff --git a/src/puffin/src/file_format.rs b/src/puffin/src/file_format.rs index 075a06c96d50..2cb77c8c242d 100644 --- a/src/puffin/src/file_format.rs +++ b/src/puffin/src/file_format.rs @@ -40,6 +40,8 @@ //! //! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present), //! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object. +//! +//! [`FileMetadata`]: ../file_metadata/struct.FileMetadata.html pub mod reader; pub mod writer;