Skip to content

Commit

Permalink
feat(puffin): add file reader (#2751)
Browse files Browse the repository at this point in the history
* feat(puffin): add file reader

Signed-off-by: Zhenchi <[email protected]>

* fix: toml format

Signed-off-by: Zhenchi <[email protected]>

* chore: rename PuffinParser to PuffinFileReader

Signed-off-by: Zhenchi <[email protected]>

* chore: polish comments

Signed-off-by: Zhenchi <[email protected]>

* Update src/puffin/src/file_format/reader/footer.rs

Co-authored-by: Yingwen <[email protected]>

* Update src/puffin/src/file_format/reader/file.rs

Co-authored-by: Yingwen <[email protected]>

* Update src/puffin/src/file_format/reader/footer.rs

Co-authored-by: Yingwen <[email protected]>

* Update src/puffin/src/file_format/reader/footer.rs

Co-authored-by: Yingwen <[email protected]>

* fix: check file size

Signed-off-by: Zhenchi <[email protected]>

* fix: redundant type cast

Signed-off-by: Zhenchi <[email protected]>

* fix: reuse read buffer

Signed-off-by: Zhenchi <[email protected]>

* fix: check payload size

Signed-off-by: Zhenchi <[email protected]>

* fix: check payload size

Signed-off-by: Zhenchi <[email protected]>

* fix: validate blob offset

Signed-off-by: Zhenchi <[email protected]>

* fix: validate blob offset

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
zhongzc and evenyag authored Nov 20, 2023
1 parent 4fcda27 commit d9eeeee
Show file tree
Hide file tree
Showing 12 changed files with 742 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async-stream = "0.3"
async-trait = "0.1"
base64 = "0.21"
bigdecimal = "0.4.2"
bitflags = "2.4.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
Expand Down
6 changes: 6 additions & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait.workspace = true
bitflags.workspace = true
common-error.workspace = true
common-macro.workspace = true
derive_builder.workspace = true
futures.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

[dev-dependencies]
tokio-util.workspace = true
tokio.workspace = true
132 changes: 132 additions & 0 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::io::Error as IoError;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to seek"))]
Seek {
#[snafu(source)]
error: IoError,
location: Location,
},

#[snafu(display("Failed to read"))]
Read {
#[snafu(source)]
error: IoError,
location: Location,
},

#[snafu(display("Failed to write"))]
Write {
#[snafu(source)]
error: IoError,
location: Location,
},

#[snafu(display("Magic not matched"))]
MagicNotMatched { location: Location },

#[snafu(display("Failed to convert bytes to integer"))]
BytesToInteger {
#[snafu(source)]
error: std::array::TryFromSliceError,
location: Location,
},

#[snafu(display("Unsupported decompression: {}", decompression))]
UnsupportedDecompression {
decompression: String,
location: Location,
},

#[snafu(display("Failed to serialize json"))]
SerializeJson {
#[snafu(source)]
error: serde_json::Error,
location: Location,
},

#[snafu(display("Failed to deserialize json"))]
DeserializeJson {
#[snafu(source)]
error: serde_json::Error,
location: Location,
},

#[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))]
ParseStageNotMatch {
expected: String,
actual: String,
location: Location,
},

#[snafu(display("Unexpected footer payload size: {}", size))]
UnexpectedFooterPayloadSize { size: i32, location: Location },

#[snafu(display(
"Unexpected puffin file size, min: {}, actual: {}",
min_file_size,
actual_file_size
))]
UnexpectedPuffinFileSize {
min_file_size: u64,
actual_file_size: u64,
location: Location,
},

#[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))]
InvalidBlobOffset { offset: i64, location: Location },

#[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))]
InvalidBlobAreaEnd { offset: u64, location: Location },
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Seek { .. }
| Read { .. }
| MagicNotMatched { .. }
| DeserializeJson { .. }
| Write { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
| UnexpectedFooterPayloadSize { .. }
| UnexpectedPuffinFileSize { .. }
| InvalidBlobOffset { .. }
| InvalidBlobAreaEnd { .. } => StatusCode::Unexpected,

UnsupportedDecompression { .. } => StatusCode::Unsupported,
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

pub type Result<T> = std::result::Result<T, Error>;
55 changes: 55 additions & 0 deletions src/puffin/src/file_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! # Format specification for Puffin files
//!
//! ## File structure
//!
//! Magic Blob₁ Blob₂ ... Blobₙ Footer
//!
//! - `Magic` is four bytes 0x50, 0x46, 0x41, 0x31 (short for: Puffin Fratercula arctica, version 1),
//! - `Blobᵢ` is i-th blob contained in the file, to be interpreted by application according to the footer,
//! - `Footer` is defined below.
//!
//! ## Footer structure
//!
//! Magic FooterPayload FooterPayloadSize Flags Magic
//!
//! - `Magic`: four bytes, same as at the beginning of the file
//! - `FooterPayload`: optionally compressed, UTF-8 encoded JSON payload describing the blobs in the file, with the structure described below
//! - `FooterPayloadSize`: a length in bytes of the `FooterPayload` (after compression, if compressed), stored as 4 byte integer
//! - `Flags`: 4 bytes for boolean flags
//! * byte 0 (first)
//! - bit 0 (lowest bit): whether `FooterPayload` is compressed
//! - all other bits are reserved for future use and should be set to 0 on write
//! * all other bytes are reserved for future use and should be set to 0 on write
//! A 4 byte integer is always signed, in a two’s complement representation, stored little-endian.
//!
//! ## Footer Payload
//!
//! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present),
//! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object.
pub mod reader;

use bitflags::bitflags;

pub const MAGIC: [u8; 4] = [0x50, 0x46, 0x41, 0x31];

bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Flags: u32 {
const FOOTER_PAYLOAD_COMPRESSED_LZ4 = 0b00000001;
}
}
46 changes: 46 additions & 0 deletions src/puffin/src/file_format/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod file;
mod footer;

use async_trait::async_trait;

use crate::blob_metadata::BlobMetadata;
use crate::error::Result;
pub use crate::file_format::reader::file::PuffinFileReader;
use crate::file_metadata::FileMetadata;

/// `PuffinSyncReader` defines a synchronous reader for puffin data.
pub trait PuffinSyncReader<'a> {
type Reader: std::io::Read + std::io::Seek;

/// fetch the FileMetadata
fn metadata(&'a mut self) -> Result<FileMetadata>;

/// read particular blob data based on given metadata
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}

/// `PuffinAsyncReader` defines an asynchronous reader for puffin data.
#[async_trait]
pub trait PuffinAsyncReader<'a> {
type Reader: futures::AsyncRead + futures::AsyncSeek;

/// fetch the FileMetadata
async fn metadata(&'a mut self) -> Result<FileMetadata>;

/// read particular blob data based on given metadata
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}
Loading

0 comments on commit d9eeeee

Please sign in to comment.