diff --git a/Cargo.lock b/Cargo.lock index d96eb267599f..4aa54819191e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6558,6 +6558,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "puffin" +version = "0.4.2" +dependencies = [ + "async-trait", + "bitflags 2.4.1", + "common-error", + "common-macro", + "derive_builder 0.12.0", + "futures", + "pin-project", + "serde", + "serde_json", + "snafu", + "tokio", + "tokio-util", +] + [[package]] name = "pulldown-cmark" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 0659941d9c51..0122f5a755e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "src/partition", "src/plugins", "src/promql", + "src/puffin", "src/query", "src/script", "src/servers", @@ -113,6 +114,8 @@ serde_json = "1.0" smallvec = "1" snafu = "0.7" # on branch v0.38.x +bitflags = "2.4.1" +pin-project = "1.0" sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd", features = [ "visitor", ] } @@ -123,7 +126,6 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.7" tonic = { version = "0.10", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } - ## workspaces members api = { path = "src/api" } auth = { path = "src/auth" } diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml new file mode 100644 index 000000000000..80f7d264ce03 --- /dev/null +++ b/src/puffin/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "puffin" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +bitflags.workspace = true +common-error.workspace = true +common-macro.workspace = true +derive_builder.workspace = true +futures.workspace = true +pin-project.workspace = true +serde.workspace = true +serde_json.workspace = true +snafu.workspace = true +tokio-util.workspace = true +tokio.workspace = true diff --git a/src/puffin/src/blob_metadata.rs b/src/puffin/src/blob_metadata.rs new file mode 100644 index 000000000000..34e886c2c19e --- /dev/null +++ b/src/puffin/src/blob_metadata.rs @@ -0,0 +1,335 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::fmt; +use std::collections::HashMap; + +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +/// Blob metadata of Puffin +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)] +#[serde(rename_all = "kebab-case")] +pub struct BlobMetadata { + /// Blob type + #[serde(rename = "type")] + pub blob_type: String, + + /// For Iceberg, it' list of field IDs the blob was computed for; + /// the order of items is used to compute sketches stored in the blob. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + #[serde(rename = "fields")] + pub input_fields: Vec, + + /// For Iceberg, it's ID of the Iceberg table’s snapshot the blob was computed from. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + pub snapshot_id: i64, + + /// For Iceberg, it's sequence number of the Iceberg table’s snapshot the blob was computed from. + /// + /// For usage outside the context of Iceberg, it can be ignored. + #[builder(default)] + #[serde(default)] + pub sequence_number: i64, + + /// The offset in the file where the blob contents start + pub offset: i64, + + /// The length of the blob stored in the file (after compression, if compressed) + pub length: i64, + + /// See [`CompressionCodec`]. If omitted, the data is assumed to be uncompressed. + #[builder(default, setter(strip_option))] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub compression_codec: Option, + + /// Storage for arbitrary meta-information about the blob + #[builder(default)] + #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, +} + +/// Compression codec used to compress the blob +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompressionCodec { + /// Single [LZ4 compression frame](https://github.com/lz4/lz4/blob/77d1b93f72628af7bbde0243b4bba9205c3138d9/doc/lz4_Frame_format.md), + /// with content size present + LZ4, + + /// Single [Zstandard compression frame](https://github.com/facebook/zstd/blob/8af64f41161f6c2e0ba842006fe238c664a6a437/doc/zstd_compression_format.md#zstandard-frames), + /// with content size present + ZSTD, +} + +impl fmt::Display for CompressionCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompressionCodec::LZ4 => write!(f, "lz4"), + CompressionCodec::ZSTD => write!(f, "zstd"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_blob_metadata_builder() { + let mut properties = HashMap::new(); + properties.insert("property1".to_string(), "value1".to_string()); + properties.insert("property2".to_string(), "value2".to_string()); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .input_fields(vec![1, 2, 3]) + .snapshot_id(100) + .sequence_number(200) + .offset(300) + .length(400) + .compression_codec(CompressionCodec::LZ4) + .properties(properties) + .build() + .unwrap(); + + assert_eq!("type1", blob_metadata.blob_type); + assert_eq!(vec![1, 2, 3], blob_metadata.input_fields); + assert_eq!(100, blob_metadata.snapshot_id); + assert_eq!(200, blob_metadata.sequence_number); + assert_eq!(300, blob_metadata.offset); + assert_eq!(400, blob_metadata.length); + assert_eq!(Some(CompressionCodec::LZ4), blob_metadata.compression_codec); + assert_eq!( + "value1", + blob_metadata.properties.get("property1").unwrap().as_str() + ); + assert_eq!( + "value2", + blob_metadata.properties.get("property2").unwrap().as_str() + ); + } + + #[test] + fn test_blob_metadata_minimal_builder() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(300) + .length(400) + .build() + .unwrap(); + + assert_eq!("type1", blob_metadata.blob_type); + assert_eq!(300, blob_metadata.offset); + assert_eq!(400, blob_metadata.length); + assert_eq!(None, blob_metadata.compression_codec); + assert_eq!(0, blob_metadata.properties.len()); + } + + #[test] + fn test_blob_metadata_missing_field() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(300) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`length` must be initialized" + ); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .length(400) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`offset` must be initialized" + ); + + let blob_metadata = BlobMetadataBuilder::default() + .offset(300) + .length(400) + .build(); + assert_eq!( + blob_metadata.unwrap_err().to_string(), + "`blob_type` must be initialized" + ); + } + + #[test] + fn test_serialize_deserialize_blob_metadata_with_properties() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + properties.insert(String::from("key2"), String::from("value2")); + + let metadata = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::LZ4), + properties: properties.clone(), + }; + + let json = serde_json::to_string(&metadata).unwrap(); + let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(metadata, deserialized); + assert_eq!(properties, deserialized.properties); + } + + #[test] + fn test_serialize_deserialize_blob_metadata_without_compression_codec() { + let metadata = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: None, + properties: HashMap::new(), + }; + + let expected_json = r#"{"type":"test","fields":[1,2,3],"snapshot-id":12345,"sequence-number":67890,"offset":100,"length":200}"#; + + let json = serde_json::to_string(&metadata).unwrap(); + let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(expected_json, json); + assert_eq!(metadata, deserialized); + } + + #[test] + fn test_deserialize_blob_metadata_with_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4", + "properties": { + "key1": "value1", + "key2": "value2" + } + }"#; + + let mut expected_properties = HashMap::new(); + expected_properties.insert(String::from("key1"), String::from("value1")); + expected_properties.insert(String::from("key2"), String::from("value2")); + + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::LZ4), + properties: expected_properties.clone(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + assert_eq!(expected_properties, deserialized.properties); + } + + #[test] + fn test_deserialize_blob_metadata_without_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4" + }"#; + + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::LZ4), + properties: HashMap::new(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + } + + #[test] + fn test_deserialize_blob_metadata_with_empty_properties() { + let json = r#"{ + "type": "test", + "fields": [1, 2, 3], + "snapshot-id": 12345, + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "lz4", + "properties": {} + }"#; + + let expected_properties = HashMap::new(); + let expected = BlobMetadata { + blob_type: String::from("test"), + input_fields: vec![1, 2, 3], + snapshot_id: 12345, + sequence_number: 67890, + offset: 100, + length: 200, + compression_codec: Some(CompressionCodec::LZ4), + properties: expected_properties.clone(), + }; + + let deserialized: BlobMetadata = serde_json::from_str(json).unwrap(); + + assert_eq!(expected, deserialized); + assert_eq!(expected_properties, deserialized.properties); + } + + #[test] + fn test_deserialize_invalid_blob_metadata() { + let invalid_json = r#"{ + "type": "test", + "input-fields": [1, 2, 3], + "snapshot-id": "12345", + "sequence-number": 67890, + "offset": 100, + "length": 200, + "compression-codec": "Invalid", + "properties": {} + }"#; + + assert!(serde_json::from_str::(invalid_json).is_err()); + } +} diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs new file mode 100644 index 000000000000..750efc55b434 --- /dev/null +++ b/src/puffin/src/error.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Error as IoError; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to seek"))] + Seek { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to read"))] + Read { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to write"))] + Write { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Magic not matched"))] + MagicNotMatched { location: Location }, + + #[snafu(display("Unsupported decompression: {}", decompression))] + UnsupportedDecompression { + decompression: String, + location: Location, + }, + + #[snafu(display("Failed to serialize json"))] + SerializeJson { + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + + #[snafu(display("Failed to deserialize json"))] + DeserializeJson { + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + Seek { .. } + | Read { .. } + | MagicNotMatched { .. } + | DeserializeJson { .. } + | Write { .. } + | SerializeJson { .. } => StatusCode::Unexpected, + + UnsupportedDecompression { .. } => StatusCode::Unsupported, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/puffin/src/file_format/mod.rs b/src/puffin/src/file_format/mod.rs new file mode 100644 index 000000000000..555c81c96483 --- /dev/null +++ b/src/puffin/src/file_format/mod.rs @@ -0,0 +1,56 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Format specification for Puffin files +//! +//! ## File structure +//! +//! Magic Blob₁ Blob₂ ... Blobₙ Footer +//! +//! - `Magic` is four bytes 0x50, 0x46, 0x41, 0x31 (short for: Puffin Fratercula arctica, version 1), +//! - `Blobᵢ` is i-th blob contained in the file, to be interpreted by application according to the footer, +//! - `Footer` is defined below. +//! +//! ## Footer structure +//! +//! Magic FooterPayload FooterPayloadSize Flags Magic +//! +//! - `Magic`: four bytes, same as at the beginning of the file +//! - `FooterPayload`: optionally compressed, UTF-8 encoded JSON payload describing the blobs in the file, with the structure described below +//! - `FooterPayloadSize`: a length in bytes of the `FooterPayload` (after compression, if compressed), stored as 4 byte integer +//! - `Flags`: 4 bytes for boolean flags +//! * byte 0 (first) +//! - bit 0 (lowest bit): whether `FooterPayload` is compressed +//! - all other bits are reserved for future use and should be set to 0 on write +//! * all other bytes are reserved for future use and should be set to 0 on write +//! A 4 byte integer is always signed, in a two’s complement representation, stored little-endian. +//! +//! ## Footer Payload +//! +//! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present), +//! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object. + +pub mod reader; +pub mod writer; + +use bitflags::bitflags; + +pub const MAGIC: [u8; 4] = [0x50, 0x46, 0x41, 0x31]; + +bitflags! { + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + pub struct Flags: u32 { + const FOOTER_PAYLOAD_COMPRESSED_LZ4 = 0b00000001; + } +} diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs new file mode 100644 index 000000000000..9ee7934202a5 --- /dev/null +++ b/src/puffin/src/file_format/reader/file.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek}; +use snafu::{ensure, ResultExt}; + +use crate::blob_metadata::BlobMetadata; +use crate::error::{MagicNotMatchedSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu}; +use crate::file_format::reader::footer::FooterParser; +use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader}; +use crate::file_format::MAGIC; +use crate::file_metadata::FileMetadata; +use crate::partial_reader::PartialReader; + +/// Puffin file parser +/// +/// File structure: Magic Blob₁ Blob₂ ... Blobₙ Footer +/// [4] [?] [?] [?] [?] +pub struct PuffinParser { + /// The source of the puffin file + source: R, + + /// The metadata of the puffin file, which is parsed from the footer + metadata: Option, +} + +impl PuffinParser { + pub fn new(source: R) -> Self { + Self { + source, + metadata: None, + } + } +} + +impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinParser { + type Reader = PartialReader<&'a mut R>; + + fn metadata(&mut self) -> Result { + if self.metadata.is_some() { + return Ok(self.metadata.clone().unwrap()); + } + + // check the magic + self.check_magic_sync()?; + + // parse the footer + let metadata = FooterParser::new(&mut self.source).parse_sync()?; + self.metadata = Some(metadata); + Ok(self.metadata.clone().unwrap()) + } + + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { + // TODO(zhongzc): support decompression + let compression = blob_metadata.compression_codec.as_ref(); + ensure!( + compression.is_none(), + UnsupportedDecompressionSnafu { + decompression: compression.unwrap().to_string() + } + ); + + Ok(PartialReader::new( + &mut self.source, + blob_metadata.offset as _, + blob_metadata.length as _, + )) + } +} + +#[async_trait] +impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a> for PuffinParser { + type Reader = PartialReader<&'a mut R>; + + async fn metadata(&'a mut self) -> Result { + if self.metadata.is_some() { + return Ok(self.metadata.clone().unwrap()); + } + + // check the magic + self.check_magic_async().await?; + + // parse the footer + let metadata = FooterParser::new(&mut self.source).parse_async().await?; + self.metadata = Some(metadata); + Ok(self.metadata.clone().unwrap()) + } + + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { + // TODO(zhongzc): support decompression + let compression = blob_metadata.compression_codec.as_ref(); + ensure!( + compression.is_none(), + UnsupportedDecompressionSnafu { + decompression: compression.unwrap().to_string() + } + ); + + Ok(PartialReader::new( + &mut self.source, + blob_metadata.offset as _, + blob_metadata.length as _, + )) + } +} + +impl PuffinParser { + fn check_magic_sync(&mut self) -> Result<()> { + let mut magic = [0; 4]; + self.source.read_exact(&mut magic).context(ReadSnafu)?; + + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + Ok(()) + } +} + +impl PuffinParser { + async fn check_magic_async(&mut self) -> Result<()> { + let mut magic = [0; 4]; + self.source + .read_exact(&mut magic) + .await + .context(ReadSnafu)?; + + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + Ok(()) + } +} diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs new file mode 100644 index 000000000000..642d10b3b48b --- /dev/null +++ b/src/puffin/src/file_format/reader/footer.rs @@ -0,0 +1,243 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{self, Read, SeekFrom}; + +use futures::{AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _}; +use snafu::{ensure, ResultExt}; + +use crate::error::{ + DeserializeJsonSnafu, MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, + UnsupportedDecompressionSnafu, +}; +use crate::file_format::{Flags, MAGIC}; +use crate::file_metadata::FileMetadata; +use crate::partial_reader::PartialReader; + +/// Parser of the footer in Puffin +/// +/// Footer structure: Magic FooterPayload FooterPayloadSize Flags Magic +/// [4] [?] [4] [4] [4] +pub struct FooterParser { + source: R, +} + +impl FooterParser { + pub fn new(source: R) -> Self { + Self { source } + } +} + +impl FooterParser { + pub fn parse_sync(&mut self) -> Result { + // seek to the end of the file + let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?; + + // check the magic + self.check_magic_sync(last_magic_offset(file_size))?; + + // read the flags + let flags = self.read_flags_sync(flags_offset(file_size))?; + + // read the footer payload size + let payload_size = self.read_payload_size_sync(payload_size_offset(file_size))? as u64; + + // check the magic + self.check_magic_sync(first_magic_offset(file_size, payload_size))?; + + // read the footer payload + let payload = + self.read_payload_sync(payload_offset(file_size, payload_size), payload_size, flags)?; + + // deserialize the footer payload + let metadata: FileMetadata = + serde_json::from_slice(payload.as_slice()).context(DeserializeJsonSnafu)?; + + Ok(metadata) + } + + fn check_magic_sync(&mut self, offset: u64) -> Result<()> { + let mut magic = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .context(SeekSnafu)?; + self.source.read_exact(&mut magic).context(ReadSnafu)?; + + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + Ok(()) + } + + fn read_flags_sync(&mut self, offset: u64) -> Result { + let mut flags = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .context(SeekSnafu)?; + self.source.read_exact(&mut flags).context(ReadSnafu)?; + let flags = Flags::from_bits_truncate(u32::from_le_bytes(flags)); + + Ok(flags) + } + + fn read_payload_size_sync(&mut self, offset: u64) -> Result { + let mut payload_size = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .context(SeekSnafu)?; + self.source + .read_exact(&mut payload_size) + .context(ReadSnafu)?; + let payload_size = i32::from_le_bytes(payload_size); + + Ok(payload_size) + } + + fn read_payload_sync(&mut self, offset: u64, size: u64, flags: Flags) -> Result> { + // TODO(zhongzc): support lz4 + ensure!( + !flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4), + UnsupportedDecompressionSnafu { + decompression: "lz4" + } + ); + + let mut reader = PartialReader::new(&mut self.source, offset, size); + let mut payload = vec![]; + reader.read_to_end(&mut payload).context(ReadSnafu)?; + + Ok(payload) + } +} + +impl FooterParser { + pub async fn parse_async(&mut self) -> Result { + // seek to the end of the file + let file_size = self + .source + .seek(SeekFrom::End(0)) + .await + .context(SeekSnafu)?; + + // check the magic + self.check_magic_async(last_magic_offset(file_size)).await?; + + // read the flags + let flags = self.read_flags_async(flags_offset(file_size)).await?; + + // read the footer payload size + let payload_size = self + .read_payload_size_async(payload_size_offset(file_size)) + .await? as u64; + + // check the magic + self.check_magic_async(first_magic_offset(file_size, payload_size)) + .await?; + + // read the footer payload + let payload = self + .read_payload_async(payload_offset(file_size, payload_size), payload_size, flags) + .await?; + + // deserialize the footer payload + let metadata: FileMetadata = + serde_json::from_slice(payload.as_slice()).context(DeserializeJsonSnafu)?; + + Ok(metadata) + } + + pub async fn check_magic_async(&mut self, offset: u64) -> Result<()> { + let mut magic = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .await + .context(SeekSnafu)?; + self.source + .read_exact(&mut magic) + .await + .context(ReadSnafu)?; + + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + Ok(()) + } + + pub async fn read_flags_async(&mut self, offset: u64) -> Result { + let mut flags = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .await + .context(SeekSnafu)?; + self.source + .read_exact(&mut flags) + .await + .context(ReadSnafu)?; + let flags = Flags::from_bits_truncate(u32::from_le_bytes(flags)); + + Ok(flags) + } + + pub async fn read_payload_size_async(&mut self, offset: u64) -> Result { + let mut payload_size = [0; 4]; + self.source + .seek(SeekFrom::Start(offset)) + .await + .context(SeekSnafu)?; + self.source + .read_exact(&mut payload_size) + .await + .context(ReadSnafu)?; + let payload_size = i32::from_le_bytes(payload_size); + + Ok(payload_size) + } + + pub async fn read_payload_async( + &mut self, + offset: u64, + size: u64, + flags: Flags, + ) -> Result> { + // TODO(zhongzc): support lz4 + ensure!( + !flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4), + UnsupportedDecompressionSnafu { + decompression: "lz4" + } + ); + + let mut reader = PartialReader::new(&mut self.source, offset, size); + let mut payload = vec![]; + reader.read_to_end(&mut payload).await.context(ReadSnafu)?; + + Ok(payload) + } +} + +fn last_magic_offset(file_size: u64) -> u64 { + file_size - 4 +} + +fn flags_offset(file_size: u64) -> u64 { + file_size - 8 +} + +fn payload_size_offset(file_size: u64) -> u64 { + file_size - 12 +} + +fn payload_offset(file_size: u64, payload_size: u64) -> u64 { + file_size - 12 - payload_size +} + +fn first_magic_offset(file_size: u64, payload_size: u64) -> u64 { + file_size - 12 - payload_size - 4 +} diff --git a/src/puffin/src/file_format/reader/mod.rs b/src/puffin/src/file_format/reader/mod.rs new file mode 100644 index 000000000000..7652bb339b15 --- /dev/null +++ b/src/puffin/src/file_format/reader/mod.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; +pub mod footer; + +use std::io; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncSeek}; + +use crate::blob_metadata::BlobMetadata; +use crate::error::Result; +use crate::file_metadata::FileMetadata; + +/// `PuffinSyncReader` defines a synchronous reader for puffin data. +pub trait PuffinSyncReader<'a> { + type Reader: io::Read + io::Seek; + + /// fetch the FileMetadata + fn metadata(&'a mut self) -> Result; + + /// read particular blob data based on given metadata + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; +} + +/// `PuffinAsyncReader` defines an asynchronous reader for puffin data. +#[async_trait] +pub trait PuffinAsyncReader<'a> { + type Reader: AsyncRead + AsyncSeek + Unpin; + + /// fetch the FileMetadata + async fn metadata(&'a mut self) -> Result; + + /// read particular blob data based on given metadata + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; +} diff --git a/src/puffin/src/file_format/writer/file.rs b/src/puffin/src/file_format/writer/file.rs new file mode 100644 index 000000000000..ff9f3e6ffe46 --- /dev/null +++ b/src/puffin/src/file_format/writer/file.rs @@ -0,0 +1,232 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::{io, mem}; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use snafu::ResultExt; + +use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder}; +use crate::error::{Result, SerializeJsonSnafu, WriteSnafu}; +use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinSyncWriter}; +use crate::file_format::MAGIC; +use crate::file_metadata::FileMetadataBuilder; + +pub struct PuffinWriter { + /// The writer to write to + writer: W, + + /// The properties of the file + properties: HashMap, + + /// The metadata of the blobs + blob_metadata: Vec, + + /// The offset of the next blob + next_blob_offset: u64, +} + +impl PuffinWriter { + pub fn new(writer: W) -> Self { + Self { + writer, + properties: HashMap::new(), + blob_metadata: Vec::new(), + next_blob_offset: 0, + } + } +} + +impl PuffinSyncWriter for PuffinWriter { + fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; + } + + fn add_blob(&mut self, blob: Blob) -> Result<()> { + self.write_header_magic_if_needed_sync()?; + let size = self.write_blob_sync(blob.data)?; + let blob_metadata = BlobMetadataBuilder::default() + .blob_type(blob.blob_type) + .properties(blob.properties) + .offset(self.next_blob_offset as _) + .length(size as _) + .build() + .expect("Required fields are not set"); + + self.next_blob_offset += size; + self.blob_metadata.push(blob_metadata); + Ok(()) + } + + fn finish(&mut self) -> Result<()> { + self.write_header_magic_if_needed_sync()?; + self.write_footer_sync() + } +} + +#[async_trait] +impl PuffinAsyncWriter for PuffinWriter { + fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; + } + + async fn add_blob(&mut self, blob: Blob) -> Result<()> { + self.write_header_magic_if_needed_async().await?; + let size = self.write_blob_async(blob.data).await?; + let blob_metadata = BlobMetadataBuilder::default() + .blob_type(blob.blob_type) + .properties(blob.properties) + .offset(self.next_blob_offset as _) + .length(size as _) + .build() + .expect("Required fields are not set"); + + self.next_blob_offset += size; + self.blob_metadata.push(blob_metadata); + Ok(()) + } + + async fn finish(&mut self) -> Result<()> { + self.write_header_magic_if_needed_async().await?; + self.write_footer_async().await + } +} + +impl PuffinWriter { + fn write_header_magic_if_needed_sync(&mut self) -> Result<()> { + if self.next_blob_offset == 0 { + self.writer.write_all(&MAGIC).context(WriteSnafu)?; + self.next_blob_offset += MAGIC.len() as u64; + } + Ok(()) + } + + fn write_magic_sync(&mut self) -> Result<()> { + self.writer.write_all(&MAGIC).context(WriteSnafu) + } + + fn write_blob_sync(&mut self, mut blob_data: R) -> Result { + io::copy(&mut blob_data, &mut self.writer).context(WriteSnafu) + } + + // Magic FooterPayload FooterPayloadSize Flags Magic + fn write_footer_sync(&mut self) -> Result<()> { + // Magic + self.write_magic_sync()?; + + // FooterPayload + let size = self.write_footer_payload_sync()?; + + // FooterPayloadSize + self.write_footer_payload_size_sync(size as _)?; + + // Flags + self.write_flags_sync()?; + + // Magic + self.write_magic_sync()?; + + Ok(()) + } + + fn write_footer_payload_sync(&mut self) -> Result { + let file_metadata = FileMetadataBuilder::default() + .blob_metadata(mem::take(&mut self.blob_metadata)) + .properties(mem::take(&mut self.properties)) + .build() + .expect("Required fields are not set"); + + let json_data = serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu)?; + self.writer.write_all(&json_data).context(WriteSnafu)?; + Ok(json_data.len() as u64) + } + + fn write_footer_payload_size_sync(&mut self, size: i32) -> Result<()> { + self.writer + .write_all(&size.to_le_bytes()) + .context(WriteSnafu) + } + + fn write_flags_sync(&mut self) -> Result<()> { + self.writer.write_all(&[0; 4]).context(WriteSnafu) + } +} + +impl PuffinWriter { + async fn write_header_magic_if_needed_async(&mut self) -> Result<()> { + if self.next_blob_offset == 0 { + self.writer.write_all(&MAGIC).await.context(WriteSnafu)?; + self.next_blob_offset += MAGIC.len() as u64; + } + Ok(()) + } + + async fn write_magic_async(&mut self) -> Result<()> { + self.writer.write_all(&MAGIC).await.context(WriteSnafu) + } + + async fn write_blob_async(&mut self, blob_data: R) -> Result { + futures::io::copy(blob_data, &mut self.writer) + .await + .context(WriteSnafu) + } + + // Magic FooterPayload FooterPayloadSize Flags Magic + async fn write_footer_async(&mut self) -> Result<()> { + // Magic + self.write_magic_async().await?; + + // FooterPayload + let size = self.write_footer_payload_async().await?; + + // FooterPayloadSize + self.write_footer_payload_size_async(size as _).await?; + + // Flags + self.write_flags_async().await?; + + // Magic + self.write_magic_async().await?; + + Ok(()) + } + + async fn write_footer_payload_async(&mut self) -> Result { + let file_metadata = FileMetadataBuilder::default() + .blob_metadata(mem::take(&mut self.blob_metadata)) + .properties(mem::take(&mut self.properties)) + .build() + .expect("Required fields are not set"); + + let json_data = serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu)?; + self.writer + .write_all(&json_data) + .await + .context(WriteSnafu)?; + Ok(json_data.len() as u64) + } + + async fn write_footer_payload_size_async(&mut self, size: i32) -> Result<()> { + self.writer + .write_all(&size.to_le_bytes()) + .await + .context(WriteSnafu) + } + + async fn write_flags_async(&mut self) -> Result<()> { + self.writer.write_all(&[0; 4]).await.context(WriteSnafu) + } +} diff --git a/src/puffin/src/file_format/writer/mod.rs b/src/puffin/src/file_format/writer/mod.rs new file mode 100644 index 000000000000..a728e8c64262 --- /dev/null +++ b/src/puffin/src/file_format/writer/mod.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; + +use std::collections::HashMap; +use std::io; + +use async_trait::async_trait; +use futures::AsyncRead; + +use crate::error::Result; + +// Blob ready to be written +pub struct Blob { + // TODO(zhongzc): ignore `input_fields`, `snapshot_id`, `sequence_number` + // and `compression_codec` for now to keep thing simple + + // The data of the blob + pub data: R, + + // The type of the blob + pub blob_type: String, + + // The properties of the blob + pub properties: HashMap, +} + +/// The trait for writing Puffin files synchronously +pub trait PuffinSyncWriter { + /// Set the properties of the Puffin file + fn set_properties(&mut self, properties: HashMap); + + /// Add a blob to the Puffin file + fn add_blob(&mut self, blob: Blob) -> Result<()>; + + /// Finish writing the Puffin file + fn finish(&mut self) -> Result<()>; +} + +/// The trait for writing Puffin files asynchronously +#[async_trait] +pub trait PuffinAsyncWriter { + /// Set the properties of the Puffin file + fn set_properties(&mut self, properties: HashMap); + + /// Add a blob to the Puffin file + async fn add_blob(&mut self, blob: Blob) -> Result<()>; + + /// Finish writing the Puffin file + async fn finish(&mut self) -> Result<()>; +} diff --git a/src/puffin/src/file_metadata.rs b/src/puffin/src/file_metadata.rs new file mode 100644 index 000000000000..ff16556613bd --- /dev/null +++ b/src/puffin/src/file_metadata.rs @@ -0,0 +1,144 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +use crate::blob_metadata::BlobMetadata; + +/// Metadata of a Puffin file +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)] +pub struct FileMetadata { + /// Metadata for each blob in the file + #[builder(default)] + #[serde(rename = "blobs")] + pub blob_metadata: Vec, + + /// Storage for arbitrary meta-information, like writer identification/version + #[builder(default)] + #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::blob_metadata::BlobMetadataBuilder; + + #[test] + fn test_file_metadata_builder() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blob_metadata(vec![blob_metadata.clone()]) + .properties(properties.clone()) + .build() + .unwrap(); + + assert_eq!(properties, metadata.properties); + assert_eq!(vec![blob_metadata], metadata.blob_metadata); + } + + #[test] + fn test_file_metadata_serialization() { + let mut properties = HashMap::new(); + properties.insert(String::from("key1"), String::from("value1")); + + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blob_metadata(vec![blob_metadata.clone()]) + .properties(properties.clone()) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!( + serialized, + r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}],"properties":{"key1":"value1"}}"# + ); + } + + #[test] + fn test_file_metadata_deserialization() { + let data = r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}],"properties":{"key1":"value1"}}"#; + let deserialized: FileMetadata = serde_json::from_str(data).unwrap(); + + assert_eq!(deserialized.blob_metadata[0].blob_type, "type1"); + assert_eq!(deserialized.blob_metadata[0].offset, 10); + assert_eq!(deserialized.blob_metadata[0].length, 30); + assert_eq!(deserialized.properties.get("key1").unwrap(), "value1"); + } + + #[test] + fn test_empty_properties_not_serialized() { + let blob_metadata = BlobMetadataBuilder::default() + .blob_type("type1".to_string()) + .offset(10) + .length(30) + .build() + .unwrap(); + + let metadata = FileMetadataBuilder::default() + .blob_metadata(vec![blob_metadata.clone()]) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!( + serialized, + r#"{"blobs":[{"type":"type1","fields":[],"snapshot-id":0,"sequence-number":0,"offset":10,"length":30}]}"# + ); + } + + #[test] + fn test_empty_blobs_serialization() { + let metadata = FileMetadataBuilder::default() + .blob_metadata(vec![]) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&metadata).unwrap(); + assert_eq!(serialized, r#"{"blobs":[]}"#); + } + + #[test] + fn test_missing_blobs_deserialization() { + let data = r#"{"properties":{"key1":"value1"}}"#; + let deserialized = serde_json::from_str::(data); + + assert!(deserialized + .unwrap_err() + .to_string() + .contains("missing field `blobs`")); + } +} diff --git a/src/puffin/src/lib.rs b/src/puffin/src/lib.rs new file mode 100644 index 000000000000..eaf29642066d --- /dev/null +++ b/src/puffin/src/lib.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod blob_metadata; +pub mod error; +pub mod file_format; +pub mod file_metadata; +pub mod partial_reader; + +pub use file_format::reader::file::PuffinParser; +pub use file_format::writer::file::PuffinWriter; + +#[cfg(test)] +mod tests; diff --git a/src/puffin/src/partial_reader/async.rs b/src/puffin/src/partial_reader/async.rs new file mode 100644 index 000000000000..2cc9fae5236a --- /dev/null +++ b/src/puffin/src/partial_reader/async.rs @@ -0,0 +1,196 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{ready, AsyncRead, AsyncSeek}; + +use crate::partial_reader::position::position_after_seek; +use crate::partial_reader::PartialReader; + +impl AsyncRead for PartialReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // past end of portion + if self.position() > self.size() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid read past the end of the portion", + ))); + } + + // end of portion + if self.is_eof() { + return Poll::Ready(Ok(0)); + } + + // first read, seek to the correct offset + if self.position_in_portion.is_none() { + // seek operation + let seek_from = io::SeekFrom::Start(self.offset); + ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?; + + self.position_in_portion = Some(0); + } + + // prevent reading over the end + let max_len = (self.size() - self.position_in_portion.unwrap()) as usize; + let actual_len = max_len.min(buf.len()); + + // create a limited reader + let target_buf = &mut buf[..actual_len]; + + // read operation + let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?; + self.position_in_portion = Some(self.position() + read_bytes as u64); + + Poll::Ready(Ok(read_bytes)) + } +} + +impl AsyncSeek for PartialReader { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: io::SeekFrom, + ) -> Poll> { + let new_position = position_after_seek(pos, self.position(), self.size())?; + let pos = io::SeekFrom::Start(self.offset + new_position); + ready!(self.as_mut().project().source.poll_seek(cx, pos))?; + + self.position_in_portion = Some(new_position); + Poll::Ready(Ok(new_position)) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use futures::{AsyncReadExt as _, AsyncSeekExt as _}; + + use super::*; + + #[tokio::test] + async fn read_all_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100); + let mut buf = vec![0; 100]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 100); + assert_eq!(buf, data); + } + + #[tokio::test] + async fn read_part_of_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + assert_eq!(buf, (10..40).collect::>()); + } + + #[tokio::test] + async fn seek_and_read_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + let mut buf = vec![0; 10]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 10); + assert_eq!(buf, (20..30).collect::>()); + } + + #[tokio::test] + async fn read_past_end_of_portion_is_eof() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 50]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF + } + + #[tokio::test] + async fn seek_past_end_of_portion_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // seeking past the portion returns an error + assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err()); + } + + #[tokio::test] + async fn seek_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + // seeking back to the start of the portion + assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0); + // seeking to a negative position returns an error + assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err()); + } + + #[tokio::test] + async fn seek_from_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 10]; + // seek to 10 bytes before the end of the portion + assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20); + assert_eq!(reader.read(&mut buf).await.unwrap(), 10); + // the final 10 bytes of the portion + assert_eq!(buf, (30..40).collect::>()); + assert!(reader.is_eof()); + } + + #[tokio::test] + async fn seek_from_end_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30); + // seeking to a negative position returns an error + assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err()); + } + + #[tokio::test] + async fn zero_length_portion_returns_zero_on_read() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 0); + let mut buf = vec![0; 10]; + // reading a portion with zero length returns 0 bytes + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + } + + #[tokio::test] + async fn is_eof_returns_true_at_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // we are not at the end of the portion + assert!(!reader.is_eof()); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + // we are at the end of the portion + assert!(reader.is_eof()); + } + + #[tokio::test] + async fn position_resets_after_seek_to_start() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + assert_eq!(reader.position(), 10); + assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0); + assert_eq!(reader.position(), 0); + } +} diff --git a/src/puffin/src/partial_reader/mod.rs b/src/puffin/src/partial_reader/mod.rs new file mode 100644 index 000000000000..ef4815679440 --- /dev/null +++ b/src/puffin/src/partial_reader/mod.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod r#async; +mod position; +mod sync; + +use pin_project::pin_project; + +/// `PartialReader` to perform synchronous or asynchronous reads on a portion of a resource. +#[pin_project] +pub struct PartialReader { + /// offset of the portion in the resource + offset: u64, + + /// size of the portion in the resource + size: u64, + + /// Resource for the portion. + /// The `offset` and `size` fields are used to determine the slice of `source` to read. + #[pin] + source: R, + + /// The current position within the portion. + /// + /// A `None` value indicates that no read operations have been performed yet on this portion. + /// Before a read operation can be performed, the resource must be positioned at the correct offset in the portion. + /// After the first read operation, this field will be set to `Some(_)`, representing the current read position in the portion. + position_in_portion: Option, +} + +impl PartialReader { + /// Creates a new `PartialReader` for the given resource. + pub fn new(source: R, offset: u64, size: u64) -> Self { + Self { + offset, + size, + source, + position_in_portion: None, + } + } + + /// Returns the current position in the portion. + pub fn position(&self) -> u64 { + self.position_in_portion.unwrap_or_default() + } + + /// Returns the size of the portion in portion. + pub fn size(&self) -> u64 { + self.size + } + + /// Returns whether the portion is empty. + pub fn is_empty(&self) -> bool { + self.size == 0 + } + + /// Returns whether the current position is at the end of the portion. + pub fn is_eof(&self) -> bool { + self.position() == self.size + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::*; + + #[test] + fn is_empty_returns_true_for_zero_length_blob() { + let data: Vec = (0..100).collect(); + let reader = PartialReader::new(Cursor::new(data), 10, 0); + assert!(reader.is_empty()); + assert!(reader.is_eof()); + } + + #[test] + fn is_empty_returns_false_for_non_zero_length_blob() { + let data: Vec = (0..100).collect(); + let reader = PartialReader::new(Cursor::new(data), 10, 30); + assert!(!reader.is_empty()); + } +} diff --git a/src/puffin/src/partial_reader/position.rs b/src/puffin/src/partial_reader/position.rs new file mode 100644 index 000000000000..e57817c493af --- /dev/null +++ b/src/puffin/src/partial_reader/position.rs @@ -0,0 +1,102 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +/// Calculates the new position after seeking. It checks if the new position +/// is valid (within the portion bounds) before returning it. +pub fn position_after_seek( + seek_from: io::SeekFrom, + position_in_portion: u64, + size_of_portion: u64, +) -> io::Result { + let new_position = match seek_from { + io::SeekFrom::Start(offset) => offset, + io::SeekFrom::Current(offset) => { + let next = (position_in_portion as i64) + offset; + if next < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )); + } + next as u64 + } + io::SeekFrom::End(offset) => { + let end = size_of_portion as i64; + (end + offset) as u64 + } + }; + + if new_position > size_of_portion { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a position beyond the end of the portion", + )); + } + + Ok(new_position) +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use super::*; + + #[test] + fn test_position_after_seek_from_start() { + let result = position_after_seek(io::SeekFrom::Start(10), 0, 20).unwrap(); + assert_eq!(result, 10); + } + + #[test] + fn test_position_after_seek_from_start_out_of_bounds() { + let result = position_after_seek(io::SeekFrom::Start(30), 0, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } + + #[test] + fn test_position_after_seek_from_current() { + let result = position_after_seek(io::SeekFrom::Current(10), 10, 30).unwrap(); + assert_eq!(result, 20); + } + + #[test] + fn test_position_after_seek_from_current_negative_position_within_bounds() { + let result = position_after_seek(io::SeekFrom::Current(-10), 15, 20).unwrap(); + assert_eq!(result, 5); + } + + #[test] + fn test_position_after_seek_from_current_negative_position() { + let result = position_after_seek(io::SeekFrom::Current(-10), 5, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } + + #[test] + fn test_position_after_seek_from_end() { + let result = position_after_seek(io::SeekFrom::End(-10), 0, 30).unwrap(); + assert_eq!(result, 20); + } + + #[test] + fn test_position_after_seek_from_end_out_of_bounds() { + let result = position_after_seek(io::SeekFrom::End(10), 0, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } +} diff --git a/src/puffin/src/partial_reader/sync.rs b/src/puffin/src/partial_reader/sync.rs new file mode 100644 index 000000000000..1b7781543973 --- /dev/null +++ b/src/puffin/src/partial_reader/sync.rs @@ -0,0 +1,180 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use crate::partial_reader::position::position_after_seek; +use crate::partial_reader::PartialReader; + +impl io::Read for PartialReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + // past end of portion + if self.position() > self.size() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid read past the end of the portion", + )); + } + + // end of portion + if self.is_eof() { + return Ok(0); + } + + // haven't read from the portion yet, need to seek to the start of it. + if self.position_in_portion.is_none() { + self.source.seek(io::SeekFrom::Start(self.offset))?; + self.position_in_portion = Some(0); + } + + // prevent reading over the end + let max_len = (self.size() - self.position_in_portion.unwrap()) as usize; + let actual_len = max_len.min(buf.len()); + + // create a limited reader + let target_buf = &mut buf[..actual_len]; + + // perform the actual read from the source and update the position. + let read_bytes = self.source.read(target_buf)?; + self.position_in_portion = Some(self.position_in_portion.unwrap() + read_bytes as u64); + + Ok(read_bytes) + } +} + +impl io::Seek for PartialReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let new_position = position_after_seek(pos, self.position(), self.size())?; + let pos = io::SeekFrom::Start(self.offset + new_position); + self.source.seek(pos)?; + + self.position_in_portion = Some(new_position); + Ok(new_position) + } +} + +#[cfg(test)] +mod tests { + use std::io::{Cursor, Read, Seek, SeekFrom}; + + use super::*; + + #[test] + fn read_all_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100); + let mut buf = vec![0; 100]; + assert_eq!(reader.read(&mut buf).unwrap(), 100); + assert_eq!(buf, data); + } + + #[test] + fn read_part_of_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + assert_eq!(buf, (10..40).collect::>()); + } + + #[test] + fn seek_and_read_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + let mut buf = vec![0; 10]; + assert_eq!(reader.read(&mut buf).unwrap(), 10); + assert_eq!(buf, (20..30).collect::>()); + } + + #[test] + fn read_past_end_of_portion_is_eof() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 50]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + assert_eq!(reader.read(&mut buf).unwrap(), 0); // hit EOF + } + + #[test] + fn seek_past_end_of_portion_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // seeking past the portion returns an error + assert!(reader.seek(SeekFrom::Start(31)).is_err()); + } + + #[test] + fn seek_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + // seeking back to the start of the portion + assert_eq!(reader.seek(SeekFrom::Current(-10)).unwrap(), 0); + // seeking to a negative position returns an error + assert!(reader.seek(SeekFrom::Current(-1)).is_err()); + } + + #[test] + fn seek_from_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 10]; + // seek to 10 bytes before the end of the portion + assert_eq!(reader.seek(SeekFrom::End(-10)).unwrap(), 20); + assert_eq!(reader.read(&mut buf).unwrap(), 10); + // the final 10 bytes of the portion + assert_eq!(buf, (30..40).collect::>()); + assert!(reader.is_eof()); + } + + #[test] + fn seek_from_end_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30); + // seeking to a negative position returns an error + assert!(reader.seek(SeekFrom::End(-31)).is_err()); + } + + #[test] + fn zero_length_portion_returns_zero_on_read() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 0); + let mut buf = vec![0; 10]; + // reading a portion with zero length returns 0 bytes + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + + #[test] + fn is_eof_returns_true_at_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // we are not at the end of the portion + assert!(!reader.is_eof()); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + // we are at the end of the portion + assert!(reader.is_eof()); + } + + #[test] + fn position_resets_after_seek_to_start() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + assert_eq!(reader.position(), 10); + assert_eq!(reader.seek(SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(reader.position(), 0); + } +} diff --git a/src/puffin/src/tests/mod.rs b/src/puffin/src/tests/mod.rs new file mode 100644 index 000000000000..910ecfeda743 Binary files /dev/null and b/src/puffin/src/tests/mod.rs differ diff --git a/src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin b/src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin new file mode 100644 index 000000000000..142b45bd4ebe Binary files /dev/null and b/src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin differ diff --git a/src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin b/src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin new file mode 100644 index 000000000000..ab8da13822c5 Binary files /dev/null and b/src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin differ diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 8141a278ea69..c5863577f47c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -59,7 +59,7 @@ opensrv-mysql = "0.4" opentelemetry-proto.workspace = true parking_lot = "0.12" pgwire = "0.16" -pin-project = "1.0" +pin-project.workspace = true postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } pprof = { version = "0.11", features = [ "flamegraph", diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 467bad1b1354..2b574ae2c071 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -40,4 +40,4 @@ tokio.workspace = true common-test-util.workspace = true parquet = { workspace = true, features = ["async"] } serde_json.workspace = true -tokio-util = { version = "0.7", features = ["compat"] } +tokio-util.workspace = true