Skip to content

Commit

Permalink
refactor(puffin): adjust generic parameters (#4279)
Browse files Browse the repository at this point in the history
* refactor(puffin): adjust generic parameters

Signed-off-by: Zhenchi <[email protected]>

* fix: address comments

Signed-off-by: Zhenchi <[email protected]>

* fix: remove Box impl

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jul 4, 2024
1 parent 70f7baf commit 6e2c21d
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 7 additions & 13 deletions src/mito2/src/sst/index/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::BlobGuard;
use snafu::ResultExt;

Expand All @@ -36,12 +36,8 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;

pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinManager = FsPuffinManager<
Arc<FsBlobGuard>,
Arc<FsDirGuard>,
InstrumentedAsyncRead,
InstrumentedAsyncWrite,
>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;

const STAGING_DIR: &str = "staging";

Expand Down Expand Up @@ -75,12 +71,12 @@ impl PuffinManagerFactory {
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor))
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
}

#[cfg(test)]
impl PuffinManagerFactory {
#[cfg(test)]
pub(crate) async fn new_for_test_async(
prefix: &str,
) -> (common_test_util::temp_dir::TempDir, Self) {
Expand All @@ -91,7 +87,6 @@ impl PuffinManagerFactory {
(tempdir, factory)
}

#[cfg(test)]
pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);

Expand All @@ -103,6 +98,7 @@ impl PuffinManagerFactory {
}

/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
#[derive(Clone)]
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
}
Expand Down Expand Up @@ -152,9 +148,7 @@ mod tests {
use futures::AsyncReadExt;
use object_store::services::Memory;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};

use super::*;

Expand Down
1 change: 1 addition & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
async-compression = { version = "0.4", features = ["futures-io", "zstd"] }
async-trait.workspace = true
async-walkdir = "2.0.0"
auto_impl = "1.2.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct PutOptions {

/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;
Expand All @@ -91,13 +92,15 @@ pub trait PuffinReader {

/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
#[auto_impl::auto_impl(Arc)]
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek + Unpin;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}

/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
#[auto_impl::auto_impl(Arc)]
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}
12 changes: 4 additions & 8 deletions src/puffin/src/puffin_manager/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};

use crate::error::Result;

/// `PuffinFileAccessor` is for opening readers and writers for puffin files.
#[async_trait]
pub trait PuffinFileAccessor {
type Reader: AsyncRead + AsyncSeek;
type Writer: AsyncWrite;
#[auto_impl::auto_impl(Arc)]
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: AsyncRead + AsyncSeek + Unpin + Send;
type Writer: AsyncWrite + Unpin + Send;

/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;

/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
}

pub type PuffinFileAccessorRef<R, W> =
Arc<dyn PuffinFileAccessor<Reader = R, Writer = W> + Send + Sync>;
33 changes: 13 additions & 20 deletions src/puffin/src/puffin_manager/fs_puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,25 @@ mod reader;
mod writer;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
pub use reader::FsPuffinReader;
pub use writer::FsPuffinWriter;

use super::file_accessor::PuffinFileAccessor;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;

/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
pub struct FsPuffinManager<B, D, AR, AW> {
pub struct FsPuffinManager<S, F> {
/// The stager.
stager: StagerRef<B, D>,

stager: S,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}

impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
impl<S, F> FsPuffinManager<S, F> {
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`.
pub fn new(
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
pub fn new(stager: S, puffin_file_accessor: F) -> Self {
Self {
stager,
puffin_file_accessor,
Expand All @@ -49,15 +44,13 @@ impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
}

#[async_trait]
impl<B, D, AR, AW> PuffinManager for FsPuffinManager<B, D, AR, AW>
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + Send + Unpin + 'static,
S: Stager + Clone,
F: PuffinFileAccessor + Clone,
{
type Reader = FsPuffinReader<B, D, AR, AW>;
type Writer = FsPuffinWriter<B, D, AW>;
type Reader = FsPuffinReader<S, F>;
type Writer = FsPuffinWriter<S, F::Writer>;

async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(
Expand Down
46 changes: 19 additions & 27 deletions src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};

use crate::blob_metadata::CompressionCodec;
Expand All @@ -25,29 +25,25 @@ use crate::error::{
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef};
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader};
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::PuffinReader;

/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<B, G, AR, AW> {
pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,

/// The stager.
stager: StagerRef<B, G>,
stager: S,

/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}

impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
pub(crate) fn new(
puffin_file_name: String,
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
impl<S, F> FsPuffinReader<S, F> {
pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self {
Self {
puffin_file_name,
stager,
Expand All @@ -57,15 +53,13 @@ impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
}

#[async_trait]
impl<B, D, AR, AW> PuffinReader for FsPuffinReader<B, D, AR, AW>
impl<S, F> PuffinReader for FsPuffinReader<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor + Clone,
{
type Blob = B;
type Dir = D;
type Blob = S::Blob;
type Dir = S::Dir;

async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.stager
Expand Down Expand Up @@ -98,18 +92,16 @@ where
}
}

impl<B, G, AR, AW> FsPuffinReader<B, G, AR, AW>
impl<S, F> FsPuffinReader<S, F>
where
B: BlobGuard,
G: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor,
{
fn init_blob_to_cache(
puffin_file_name: String,
key: String,
mut writer: BoxWriter,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
Expand All @@ -134,7 +126,7 @@ where
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
Expand Down
22 changes: 10 additions & 12 deletions src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::error::{
};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::{PuffinWriter, PutOptions};

/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<B, D, W> {
pub struct FsPuffinWriter<S, W> {
/// The name of the puffin file.
puffin_file_name: String,

/// The stager.
stager: StagerRef<B, D>,
stager: S,

/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
Expand All @@ -48,8 +48,8 @@ pub struct FsPuffinWriter<B, D, W> {
blob_keys: HashSet<String>,
}

impl<B, D, W> FsPuffinWriter<B, D, W> {
pub(crate) fn new(puffin_file_name: String, stager: StagerRef<B, D>, writer: W) -> Self {
impl<S, W> FsPuffinWriter<S, W> {
pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self {
Self {
puffin_file_name,
stager,
Expand All @@ -60,10 +60,9 @@ impl<B, D, W> FsPuffinWriter<B, D, W> {
}

#[async_trait]
impl<B, D, W> PuffinWriter for FsPuffinWriter<B, D, W>
impl<S, W> PuffinWriter for FsPuffinWriter<S, W>
where
B: BlobGuard,
D: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
Expand Down Expand Up @@ -164,10 +163,9 @@ where
}
}

impl<B, G, W> FsPuffinWriter<B, G, W>
impl<S, W> FsPuffinWriter<S, W>
where
B: BlobGuard,
G: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
/// Compresses the raw data and writes it to the puffin file.
Expand Down
6 changes: 2 additions & 4 deletions src/puffin/src/puffin_manager/stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod bounded_stager;

use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
Expand Down Expand Up @@ -53,7 +52,8 @@ pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;

/// `Stager` manages the staging area for the puffin files.
#[async_trait]
pub trait Stager {
#[auto_impl::auto_impl(Arc)]
pub trait Stager: Send + Sync {
type Blob: BlobGuard;
type Dir: DirGuard;

Expand Down Expand Up @@ -88,5 +88,3 @@ pub trait Stager {
dir_size: u64,
) -> Result<()>;
}

pub type StagerRef<B, D> = Arc<dyn Stager<Blob = B, Dir = D> + Send + Sync>;
4 changes: 2 additions & 2 deletions src/puffin/src/puffin_manager/stager/bounded_stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ pub struct FsBlobGuard {
delete_queue: Sender<DeleteTask>,
}

impl BlobGuard for Arc<FsBlobGuard> {
impl BlobGuard for FsBlobGuard {
type Reader = Compat<fs::File>;

fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
Expand Down Expand Up @@ -460,7 +460,7 @@ pub struct FsDirGuard {
delete_queue: Sender<DeleteTask>,
}

impl DirGuard for Arc<FsDirGuard> {
impl DirGuard for FsDirGuard {
fn path(&self) -> &PathBuf {
&self.path
}
Expand Down

0 comments on commit 6e2c21d

Please sign in to comment.