Skip to content

Commit

Permalink
feat(puffin): add file writer (#2776)
Browse files Browse the repository at this point in the history
* feat(puffin): add file writer

Signed-off-by: Zhenchi <[email protected]>

* Update src/puffin/src/file_format/writer/file.rs

Co-authored-by: dennis zhuang <[email protected]>

* Update src/puffin/src/file_format/writer/file.rs

Co-authored-by: dennis zhuang <[email protected]>

* feat: footer bytes with capacity

Signed-off-by: Zhenchi <[email protected]>

* feat: footer bytes with capacity

Signed-off-by: Zhenchi <[email protected]>

* Update src/puffin/src/file_format/writer.rs

Co-authored-by: Yingwen <[email protected]>

* feat: add flush

Signed-off-by: Zhenchi <[email protected]>

* chore: specify default flags

Signed-off-by: Zhenchi <[email protected]>

* feat: close async writer

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
3 people authored Nov 21, 2023
1 parent efc5abf commit a7bbd61
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 11 deletions.
16 changes: 16 additions & 0 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to flush"))]
Flush {
#[snafu(source)]
error: IoError,
location: Location,
},

#[snafu(display("Failed to close"))]
Close {
#[snafu(source)]
error: IoError,
location: Location,
},

#[snafu(display("Magic not matched"))]
MagicNotMatched { location: Location },

Expand Down Expand Up @@ -112,6 +126,8 @@ impl ErrorExt for Error {
| MagicNotMatched { .. }
| DeserializeJson { .. }
| Write { .. }
| Flush { .. }
| Close { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
Expand Down
9 changes: 9 additions & 0 deletions src/puffin/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,23 @@
//! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object.
pub mod reader;
pub mod writer;

use bitflags::bitflags;

pub const MAGIC: [u8; 4] = [0x50, 0x46, 0x41, 0x31];

pub const MAGIC_SIZE: u64 = MAGIC.len() as u64;
pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE;
pub const FLAGS_SIZE: u64 = 4;
pub const PAYLOAD_SIZE_SIZE: u64 = 4;
pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE * 2 + FLAGS_SIZE + PAYLOAD_SIZE_SIZE;

bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Flags: u32 {
const DEFAULT = 0b00000000;

const FOOTER_PAYLOAD_COMPRESSED_LZ4 = 0b00000001;
}
}
7 changes: 2 additions & 5 deletions src/puffin/src/file_format/reader/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::error::{
MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu,
UnsupportedDecompressionSnafu,
};
use crate::file_format::reader::footer::{FooterParser, MIN_FOOTER_SIZE};
use crate::file_format::reader::footer::FooterParser;
use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader};
use crate::file_format::MAGIC;
use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE};
use crate::file_metadata::FileMetadata;
use crate::partial_reader::PartialReader;

Expand All @@ -43,9 +43,6 @@ pub struct PuffinFileReader<R> {
metadata: Option<FileMetadata>,
}

pub const MAGIC_SIZE: u64 = MAGIC.len() as u64;
pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE;

impl<R> PuffinFileReader<R> {
pub fn new(source: R) -> Self {
Self {
Expand Down
7 changes: 1 addition & 6 deletions src/puffin/src/file_format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use crate::error::{
MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, SeekSnafu,
UnexpectedFooterPayloadSizeSnafu, UnsupportedDecompressionSnafu,
};
use crate::file_format::reader::file::{MAGIC_SIZE, MIN_FILE_SIZE};
use crate::file_format::{Flags, MAGIC};
use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
use crate::file_metadata::FileMetadata;

/// Parser for the footer of a Puffin data file
Expand All @@ -43,10 +42,6 @@ pub struct FooterParser<R> {
file_size: u64,
}

pub const FLAGS_SIZE: u64 = 4;
pub const PAYLOAD_SIZE_SIZE: u64 = 4;
pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE * 2 + FLAGS_SIZE + PAYLOAD_SIZE_SIZE;

impl<R> FooterParser<R> {
pub fn new(source: R, file_size: u64) -> Self {
Self { source, file_size }
Expand Down
62 changes: 62 additions & 0 deletions src/puffin/src/file_format/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod file;
mod footer;

use std::collections::HashMap;

use async_trait::async_trait;

use crate::error::Result;
pub use crate::file_format::writer::file::PuffinFileWriter;

/// Blob ready to be written
pub struct Blob<R> {
// TODO(zhongzc): ignore `input_fields`, `snapshot_id`, `sequence_number`
// and `compression_codec` for now to keep thing simple
/// The type of the blob
pub blob_type: String,

/// The data of the blob
pub data: R,

/// The properties of the blob
pub properties: HashMap<String, String>,
}

/// The trait for writing Puffin files synchronously
pub trait PuffinSyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);

/// Add a blob to the Puffin file
fn add_blob<R: std::io::Read>(&mut self, blob: Blob<R>) -> Result<()>;

/// Finish writing the Puffin file
fn finish(&mut self) -> Result<()>;
}

/// The trait for writing Puffin files asynchronously
#[async_trait]
pub trait PuffinAsyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);

/// Add a blob to the Puffin file
async fn add_blob<R: futures::AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()>;

/// Finish writing the Puffin file
async fn finish(&mut self) -> Result<()>;
}
159 changes: 159 additions & 0 deletions src/puffin/src/file_format/writer/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::{io, mem};

use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use snafu::ResultExt;

use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder};
use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::file_format::writer::footer::FooterWriter;
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinSyncWriter};
use crate::file_format::MAGIC;

/// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`]
pub struct PuffinFileWriter<W> {
/// The writer to write to
writer: W,

/// The properties of the file
properties: HashMap<String, String>,

/// The metadata of the blobs
blob_metadata: Vec<BlobMetadata>,

/// The offset of the next blob
next_blob_offset: u64,
}

impl<W> PuffinFileWriter<W> {
pub fn new(writer: W) -> Self {
Self {
writer,
properties: HashMap::new(),
blob_metadata: Vec::new(),
next_blob_offset: 0,
}
}

fn create_blob_metadata(
&self,
typ: String,
properties: HashMap<String, String>,
size: u64,
) -> BlobMetadata {
BlobMetadataBuilder::default()
.blob_type(typ)
.properties(properties)
.offset(self.next_blob_offset as _)
.length(size as _)
.build()
.expect("Required fields are not set")
}
}

impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
fn set_properties(&mut self, properties: HashMap<String, String>) {
self.properties = properties;
}

fn add_blob<R: io::Read>(&mut self, mut blob: Blob<R>) -> Result<()> {
self.write_header_if_needed_sync()?;

let size = io::copy(&mut blob.data, &mut self.writer).context(WriteSnafu)?;

let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
self.blob_metadata.push(blob_metadata);

self.next_blob_offset += size;
Ok(())
}

fn finish(&mut self) -> Result<()> {
self.write_header_if_needed_sync()?;
self.write_footer_sync()?;
self.writer.flush().context(FlushSnafu)
}
}

#[async_trait]
impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
fn set_properties(&mut self, properties: HashMap<String, String>) {
self.properties = properties;
}

async fn add_blob<R: AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()> {
self.write_header_if_needed_async().await?;

let size = futures::io::copy(blob.data, &mut self.writer)
.await
.context(WriteSnafu)?;

let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
self.blob_metadata.push(blob_metadata);

self.next_blob_offset += size;
Ok(())
}

async fn finish(&mut self) -> Result<()> {
self.write_header_if_needed_async().await?;
self.write_footer_async().await?;
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)
}
}

impl<W: io::Write> PuffinFileWriter<W> {
fn write_header_if_needed_sync(&mut self) -> Result<()> {
if self.next_blob_offset == 0 {
self.writer.write_all(&MAGIC).context(WriteSnafu)?;
self.next_blob_offset += MAGIC.len() as u64;
}
Ok(())
}

fn write_footer_sync(&mut self) -> Result<()> {
let bytes = FooterWriter::new(
mem::take(&mut self.blob_metadata),
mem::take(&mut self.properties),
)
.into_footer_bytes()?;

self.writer.write_all(&bytes).context(WriteSnafu)
}
}

impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
async fn write_header_if_needed_async(&mut self) -> Result<()> {
if self.next_blob_offset == 0 {
self.writer.write_all(&MAGIC).await.context(WriteSnafu)?;
self.next_blob_offset += MAGIC.len() as u64;
}
Ok(())
}

async fn write_footer_async(&mut self) -> Result<()> {
let bytes = FooterWriter::new(
mem::take(&mut self.blob_metadata),
mem::take(&mut self.properties),
)
.into_footer_bytes()?;

self.writer.write_all(&bytes).await.context(WriteSnafu)
}
}
Loading

0 comments on commit a7bbd61

Please sign in to comment.