Skip to content

Commit

Permalink
feat(puffin): implement CachedPuffinReader (#4209)
Browse files Browse the repository at this point in the history
* feat(puffin): implement CachedPuffinReader

Signed-off-by: Zhenchi <[email protected]>

* chore: next PR to introduce CachedPuffinManager

Signed-off-by: Zhenchi <[email protected]>

* chore: rename

Signed-off-by: Zhenchi <[email protected]>

* address comments

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jun 25, 2024
1 parent 8cbe716 commit 5dde148
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 22 deletions.
26 changes: 26 additions & 0 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,29 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Blob not found: {blob}"))]
BlobNotFound {
blob: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Blob index out of bound, index: {}, max index: {}", index, max_index))]
BlobIndexOutOfBound {
index: usize,
max_index: usize,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("File key not match, expected: {}, actual: {}", expected, actual))]
FileKeyNotMatch {
expected: String,
actual: String,
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
Expand All @@ -221,6 +244,9 @@ impl ErrorExt for Error {
| InvalidBlobAreaEnd { .. }
| Lz4Compression { .. }
| Lz4Decompression { .. }
| BlobNotFound { .. }
| BlobIndexOutOfBound { .. }
| FileKeyNotMatch { .. }
| WalkDirError { .. } => StatusCode::Unexpected,

UnsupportedCompression { .. } | UnsupportedDecompression { .. } => {
Expand Down
1 change: 1 addition & 0 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod cache_manager;
pub mod cached_puffin_manager;
pub mod file_accessor;

use std::path::PathBuf;

Expand Down
24 changes: 3 additions & 21 deletions src/puffin/src/puffin_manager/cached_puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod dir_meta;
mod reader;
mod writer;

use serde::{Deserialize, Serialize};
pub use reader::CachedPuffinReader;
pub use writer::CachedPuffinWriter;

/// Metadata for directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirMetadata {
pub files: Vec<DirFileMetadata>,
}

/// Metadata for file in directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirFileMetadata {
/// The relative path of the file in the directory.
pub relative_path: String,

/// The file is stored as a blob in the puffin file.
/// `blob_index` is the index of the blob in the puffin file.
pub blob_index: usize,

/// The key of the blob in the puffin file.
pub key: String,
}
35 changes: 35 additions & 0 deletions src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// Metadata for directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirMetadata {
pub files: Vec<DirFileMetadata>,
}

/// Metadata for file in directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirFileMetadata {
/// The relative path of the file in the directory.
pub relative_path: String,

/// The file is stored as a blob in the puffin file.
/// `blob_index` is the index of the blob in the puffin file.
pub blob_index: usize,

/// The key of the blob in the puffin file.
pub key: String,
}
206 changes: 206 additions & 0 deletions src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::PathBuf;

use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};

use crate::blob_metadata::CompressionCodec;
use crate::error::{
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef};
use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::PuffinReader;

/// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files.
pub struct CachedPuffinReader<CR, AR, AW> {
/// The name of the puffin file.
puffin_file_name: String,

/// The cache manager.
cache_manager: CacheManagerRef<CR>,

/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
}

impl<CR, AR, AW> CachedPuffinReader<CR, AR, AW> {
#[allow(unused)]
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<CR>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
Self {
puffin_file_name,
cache_manager,
puffin_file_accessor,
}
}
}

#[async_trait]
impl<CR, AR, AW> PuffinReader for CachedPuffinReader<CR, AR, AW>
where
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
CR: AsyncRead + AsyncSeek,
{
type Reader = CR;

async fn blob(&self, key: &str) -> Result<Self::Reader> {
self.cache_manager
.get_blob(
self.puffin_file_name.as_str(),
key,
Box::new(move |writer| {
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let key = key.to_string();
Self::init_blob_to_cache(puffin_file_name, key, writer, accessor)
}),
)
.await
}

async fn dir(&self, key: &str) -> Result<PathBuf> {
self.cache_manager
.get_dir(
self.puffin_file_name.as_str(),
key,
Box::new(|writer_provider| {
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let key = key.to_string();
Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor)
}),
)
.await
}
}

impl<CR, AR, AW> CachedPuffinReader<CR, AR, AW>
where
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
CR: AsyncRead + AsyncSeek,
{
fn init_blob_to_cache(
puffin_file_name: String,
key: String,
mut writer: BoxWriter,
accessor: PuffinFileAccessorRef<AR, AW>,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
let mut file = PuffinFileReader::new(reader);

let metadata = file.metadata().await?;
let blob_metadata = metadata
.blobs
.iter()
.find(|m| m.blob_type == key.as_str())
.context(BlobNotFoundSnafu { blob: key })?;
let reader = file.blob_reader(blob_metadata)?;

let compression = blob_metadata.compression_codec;
let size = Self::handle_decompress(reader, &mut writer, compression).await?;

Ok(size)
})
}

fn init_dir_to_cache(
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
accessor: PuffinFileAccessorRef<AR, AW>,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
let mut file = PuffinFileReader::new(reader);

let puffin_metadata = file.metadata().await?;
let blob_metadata = puffin_metadata
.blobs
.iter()
.find(|m| m.blob_type == key.as_str())
.context(BlobNotFoundSnafu { blob: key })?;

let mut reader = file.blob_reader(blob_metadata)?;
let mut buf = vec![];
reader.read_to_end(&mut buf).await.context(ReadSnafu)?;
let dir_meta: DirMetadata =
serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;

let mut size = 0;
for file_meta in dir_meta.files {
let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
BlobIndexOutOfBoundSnafu {
index: file_meta.blob_index,
max_index: puffin_metadata.blobs.len(),
},
)?;
ensure!(
blob_meta.blob_type == file_meta.key,
FileKeyNotMatchSnafu {
expected: file_meta.key,
actual: &blob_meta.blob_type,
}
);

let reader = file.blob_reader(blob_meta)?;
let writer = writer_provider.writer(&file_meta.relative_path).await?;

let compression = blob_meta.compression_codec;
size += Self::handle_decompress(reader, writer, compression).await?;
}

Ok(size)
})
}

/// Handles the decompression of the reader and writes the decompressed data to the writer.
/// Returns the number of bytes written.
async fn handle_decompress(
reader: impl AsyncRead,
mut writer: impl AsyncWrite + Unpin,
compression: Option<CompressionCodec>,
) -> Result<u64> {
match compression {
Some(CompressionCodec::Lz4) => UnsupportedDecompressionSnafu {
decompression: "lz4",
}
.fail(),
Some(CompressionCodec::Zstd) => {
let reader = ZstdDecoder::new(BufReader::new(reader));
futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)
}
None => futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu),
}
}
}
18 changes: 17 additions & 1 deletion src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::{
};
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use crate::puffin_manager::cache_manager::CacheManagerRef;
use crate::puffin_manager::cached_puffin_manager::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::{PuffinWriter, PutOptions};

/// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
Expand All @@ -48,6 +48,22 @@ pub struct CachedPuffinWriter<CR, W> {
blob_keys: HashSet<String>,
}

impl<CR, W> CachedPuffinWriter<CR, W> {
#[allow(unused)]
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<CR>,
writer: W,
) -> Self {
Self {
puffin_file_name,
cache_manager,
puffin_file_writer: PuffinFileWriter::new(writer),
blob_keys: HashSet::new(),
}
}
}

#[async_trait]
impl<CR, W> PuffinWriter for CachedPuffinWriter<CR, W>
where
Expand Down
36 changes: 36 additions & 0 deletions src/puffin/src/puffin_manager/file_accessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};

use crate::error::Result;

/// `PuffinFileAccessor` is for opening readers and writers for puffin files.
#[async_trait]
pub trait PuffinFileAccessor {
type Reader: AsyncRead + AsyncSeek;
type Writer: AsyncWrite;

/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;

/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
}

pub type PuffinFileAccessorRef<R, W> =
Arc<dyn PuffinFileAccessor<Reader = R, Writer = W> + Send + Sync>;

0 comments on commit 5dde148

Please sign in to comment.