From 5dde148b3d375488589321ca1a098a9e50bafa93 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 25 Jun 2024 20:27:06 +0800 Subject: [PATCH] feat(puffin): implement CachedPuffinReader (#4209) * feat(puffin): implement CachedPuffinReader Signed-off-by: Zhenchi * chore: next PR to introduce CachedPuffinManager Signed-off-by: Zhenchi * chore: rename Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/puffin/src/error.rs | 26 +++ src/puffin/src/puffin_manager.rs | 1 + .../puffin_manager/cached_puffin_manager.rs | 24 +- .../cached_puffin_manager/dir_meta.rs | 35 +++ .../cached_puffin_manager/reader.rs | 206 ++++++++++++++++++ .../cached_puffin_manager/writer.rs | 18 +- .../src/puffin_manager/file_accessor.rs | 36 +++ 7 files changed, 324 insertions(+), 22 deletions(-) create mode 100644 src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs create mode 100644 src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs create mode 100644 src/puffin/src/puffin_manager/file_accessor.rs diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 86f08948f7ff..8dfb5f4575dc 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -197,6 +197,29 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Blob not found: {blob}"))] + BlobNotFound { + blob: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Blob index out of bound, index: {}, max index: {}", index, max_index))] + BlobIndexOutOfBound { + index: usize, + max_index: usize, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("File key not match, expected: {}, actual: {}", expected, actual))] + FileKeyNotMatch { + expected: String, + actual: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -221,6 +244,9 @@ impl ErrorExt for Error { | InvalidBlobAreaEnd { .. } | Lz4Compression { .. } | Lz4Decompression { .. } + | BlobNotFound { .. } + | BlobIndexOutOfBound { .. } + | FileKeyNotMatch { .. } | WalkDirError { .. } => StatusCode::Unexpected, UnsupportedCompression { .. } | UnsupportedDecompression { .. } => { diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 933c974ee672..96d1dfd51928 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -14,6 +14,7 @@ pub mod cache_manager; pub mod cached_puffin_manager; +pub mod file_accessor; use std::path::PathBuf; diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager.rs b/src/puffin/src/puffin_manager/cached_puffin_manager.rs index 984d787e4931..a9edb011698e 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/cached_puffin_manager.rs @@ -12,27 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod dir_meta; +mod reader; mod writer; -use serde::{Deserialize, Serialize}; +pub use reader::CachedPuffinReader; pub use writer::CachedPuffinWriter; - -/// Metadata for directory in puffin file. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DirMetadata { - pub files: Vec, -} - -/// Metadata for file in directory in puffin file. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DirFileMetadata { - /// The relative path of the file in the directory. - pub relative_path: String, - - /// The file is stored as a blob in the puffin file. - /// `blob_index` is the index of the blob in the puffin file. - pub blob_index: usize, - - /// The key of the blob in the puffin file. - pub key: String, -} diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs b/src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs new file mode 100644 index 000000000000..3e677739eff0 --- /dev/null +++ b/src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Metadata for directory in puffin file. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DirMetadata { + pub files: Vec, +} + +/// Metadata for file in directory in puffin file. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DirFileMetadata { + /// The relative path of the file in the directory. + pub relative_path: String, + + /// The file is stored as a blob in the puffin file. + /// `blob_index` is the index of the blob in the puffin file. + pub blob_index: usize, + + /// The key of the blob in the puffin file. + pub key: String, +} diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs new file mode 100644 index 000000000000..323a0675620a --- /dev/null +++ b/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs @@ -0,0 +1,206 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::PathBuf; + +use async_compression::futures::bufread::ZstdDecoder; +use async_trait::async_trait; +use futures::future::BoxFuture; +use futures::io::BufReader; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::blob_metadata::CompressionCodec; +use crate::error::{ + BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, + ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, +}; +use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; +use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef}; +use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata; +use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; +use crate::puffin_manager::PuffinReader; + +/// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files. +pub struct CachedPuffinReader { + /// The name of the puffin file. + puffin_file_name: String, + + /// The cache manager. + cache_manager: CacheManagerRef, + + /// The puffin file accessor. + puffin_file_accessor: PuffinFileAccessorRef, +} + +impl CachedPuffinReader { + #[allow(unused)] + pub(crate) fn new( + puffin_file_name: String, + cache_manager: CacheManagerRef, + puffin_file_accessor: PuffinFileAccessorRef, + ) -> Self { + Self { + puffin_file_name, + cache_manager, + puffin_file_accessor, + } + } +} + +#[async_trait] +impl PuffinReader for CachedPuffinReader +where + AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, + AW: AsyncWrite + 'static, + CR: AsyncRead + AsyncSeek, +{ + type Reader = CR; + + async fn blob(&self, key: &str) -> Result { + self.cache_manager + .get_blob( + self.puffin_file_name.as_str(), + key, + Box::new(move |writer| { + let accessor = self.puffin_file_accessor.clone(); + let puffin_file_name = self.puffin_file_name.clone(); + let key = key.to_string(); + Self::init_blob_to_cache(puffin_file_name, key, writer, accessor) + }), + ) + .await + } + + async fn dir(&self, key: &str) -> Result { + self.cache_manager + .get_dir( + self.puffin_file_name.as_str(), + key, + Box::new(|writer_provider| { + let accessor = self.puffin_file_accessor.clone(); + let puffin_file_name = self.puffin_file_name.clone(); + let key = key.to_string(); + Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor) + }), + ) + .await + } +} + +impl CachedPuffinReader +where + AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, + AW: AsyncWrite + 'static, + CR: AsyncRead + AsyncSeek, +{ + fn init_blob_to_cache( + puffin_file_name: String, + key: String, + mut writer: BoxWriter, + accessor: PuffinFileAccessorRef, + ) -> BoxFuture<'static, Result> { + Box::pin(async move { + let reader = accessor.reader(&puffin_file_name).await?; + let mut file = PuffinFileReader::new(reader); + + let metadata = file.metadata().await?; + let blob_metadata = metadata + .blobs + .iter() + .find(|m| m.blob_type == key.as_str()) + .context(BlobNotFoundSnafu { blob: key })?; + let reader = file.blob_reader(blob_metadata)?; + + let compression = blob_metadata.compression_codec; + let size = Self::handle_decompress(reader, &mut writer, compression).await?; + + Ok(size) + }) + } + + fn init_dir_to_cache( + puffin_file_name: String, + key: String, + writer_provider: DirWriterProviderRef, + accessor: PuffinFileAccessorRef, + ) -> BoxFuture<'static, Result> { + Box::pin(async move { + let reader = accessor.reader(&puffin_file_name).await?; + let mut file = PuffinFileReader::new(reader); + + let puffin_metadata = file.metadata().await?; + let blob_metadata = puffin_metadata + .blobs + .iter() + .find(|m| m.blob_type == key.as_str()) + .context(BlobNotFoundSnafu { blob: key })?; + + let mut reader = file.blob_reader(blob_metadata)?; + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.context(ReadSnafu)?; + let dir_meta: DirMetadata = + serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?; + + let mut size = 0; + for file_meta in dir_meta.files { + let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context( + BlobIndexOutOfBoundSnafu { + index: file_meta.blob_index, + max_index: puffin_metadata.blobs.len(), + }, + )?; + ensure!( + blob_meta.blob_type == file_meta.key, + FileKeyNotMatchSnafu { + expected: file_meta.key, + actual: &blob_meta.blob_type, + } + ); + + let reader = file.blob_reader(blob_meta)?; + let writer = writer_provider.writer(&file_meta.relative_path).await?; + + let compression = blob_meta.compression_codec; + size += Self::handle_decompress(reader, writer, compression).await?; + } + + Ok(size) + }) + } + + /// Handles the decompression of the reader and writes the decompressed data to the writer. + /// Returns the number of bytes written. + async fn handle_decompress( + reader: impl AsyncRead, + mut writer: impl AsyncWrite + Unpin, + compression: Option, + ) -> Result { + match compression { + Some(CompressionCodec::Lz4) => UnsupportedDecompressionSnafu { + decompression: "lz4", + } + .fail(), + Some(CompressionCodec::Zstd) => { + let reader = ZstdDecoder::new(BufReader::new(reader)); + futures::io::copy(reader, &mut writer) + .await + .context(WriteSnafu) + } + None => futures::io::copy(reader, &mut writer) + .await + .context(WriteSnafu), + } + } +} diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs index cacc0bad6c5b..b05e0e78d0d1 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs @@ -30,7 +30,7 @@ use crate::error::{ }; use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; use crate::puffin_manager::cache_manager::CacheManagerRef; -use crate::puffin_manager::cached_puffin_manager::{DirFileMetadata, DirMetadata}; +use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata}; use crate::puffin_manager::{PuffinWriter, PutOptions}; /// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. @@ -48,6 +48,22 @@ pub struct CachedPuffinWriter { blob_keys: HashSet, } +impl CachedPuffinWriter { + #[allow(unused)] + pub(crate) fn new( + puffin_file_name: String, + cache_manager: CacheManagerRef, + writer: W, + ) -> Self { + Self { + puffin_file_name, + cache_manager, + puffin_file_writer: PuffinFileWriter::new(writer), + blob_keys: HashSet::new(), + } + } +} + #[async_trait] impl PuffinWriter for CachedPuffinWriter where diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs new file mode 100644 index 000000000000..43d2b248fe9c --- /dev/null +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; + +use crate::error::Result; + +/// `PuffinFileAccessor` is for opening readers and writers for puffin files. +#[async_trait] +pub trait PuffinFileAccessor { + type Reader: AsyncRead + AsyncSeek; + type Writer: AsyncWrite; + + /// Opens a reader for the given puffin file. + async fn reader(&self, puffin_file_name: &str) -> Result; + + /// Creates a writer for the given puffin file. + async fn writer(&self, puffin_file_name: &str) -> Result; +} + +pub type PuffinFileAccessorRef = + Arc + Send + Sync>;