diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 9de5f9a17c92..8aa5e9dfbfae 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -45,6 +45,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to flush"))] + Flush { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to close"))] + Close { + #[snafu(source)] + error: IoError, + location: Location, + }, + #[snafu(display("Magic not matched"))] MagicNotMatched { location: Location }, @@ -112,6 +126,8 @@ impl ErrorExt for Error { | MagicNotMatched { .. } | DeserializeJson { .. } | Write { .. } + | Flush { .. } + | Close { .. } | SerializeJson { .. } | BytesToInteger { .. } | ParseStageNotMatch { .. } diff --git a/src/puffin/src/file_format.rs b/src/puffin/src/file_format.rs index 0802c977e87e..075a06c96d50 100644 --- a/src/puffin/src/file_format.rs +++ b/src/puffin/src/file_format.rs @@ -42,14 +42,23 @@ //! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object. pub mod reader; +pub mod writer; use bitflags::bitflags; pub const MAGIC: [u8; 4] = [0x50, 0x46, 0x41, 0x31]; +pub const MAGIC_SIZE: u64 = MAGIC.len() as u64; +pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE; +pub const FLAGS_SIZE: u64 = 4; +pub const PAYLOAD_SIZE_SIZE: u64 = 4; +pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE * 2 + FLAGS_SIZE + PAYLOAD_SIZE_SIZE; + bitflags! { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Flags: u32 { + const DEFAULT = 0b00000000; + const FOOTER_PAYLOAD_COMPRESSED_LZ4 = 0b00000001; } } diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index a7ca115b6cb2..b6b2df32f681 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -23,9 +23,9 @@ use crate::error::{ MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu, UnsupportedDecompressionSnafu, }; -use crate::file_format::reader::footer::{FooterParser, MIN_FOOTER_SIZE}; +use crate::file_format::reader::footer::FooterParser; use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader}; -use crate::file_format::MAGIC; +use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -43,9 +43,6 @@ pub struct PuffinFileReader { metadata: Option, } -pub const MAGIC_SIZE: u64 = MAGIC.len() as u64; -pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE; - impl PuffinFileReader { pub fn new(source: R) -> Self { Self { diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 987c70a7d7ef..0b7c67ccb3e6 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -22,8 +22,7 @@ use crate::error::{ MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, UnsupportedDecompressionSnafu, }; -use crate::file_format::reader::file::{MAGIC_SIZE, MIN_FILE_SIZE}; -use crate::file_format::{Flags, MAGIC}; +use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; /// Parser for the footer of a Puffin data file @@ -43,10 +42,6 @@ pub struct FooterParser { file_size: u64, } -pub const FLAGS_SIZE: u64 = 4; -pub const PAYLOAD_SIZE_SIZE: u64 = 4; -pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE * 2 + FLAGS_SIZE + PAYLOAD_SIZE_SIZE; - impl FooterParser { pub fn new(source: R, file_size: u64) -> Self { Self { source, file_size } diff --git a/src/puffin/src/file_format/writer.rs b/src/puffin/src/file_format/writer.rs new file mode 100644 index 000000000000..95760df0fea3 --- /dev/null +++ b/src/puffin/src/file_format/writer.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod file; +mod footer; + +use std::collections::HashMap; + +use async_trait::async_trait; + +use crate::error::Result; +pub use crate::file_format::writer::file::PuffinFileWriter; + +/// Blob ready to be written +pub struct Blob { + // TODO(zhongzc): ignore `input_fields`, `snapshot_id`, `sequence_number` + // and `compression_codec` for now to keep thing simple + /// The type of the blob + pub blob_type: String, + + /// The data of the blob + pub data: R, + + /// The properties of the blob + pub properties: HashMap, +} + +/// The trait for writing Puffin files synchronously +pub trait PuffinSyncWriter { + /// Set the properties of the Puffin file + fn set_properties(&mut self, properties: HashMap); + + /// Add a blob to the Puffin file + fn add_blob(&mut self, blob: Blob) -> Result<()>; + + /// Finish writing the Puffin file + fn finish(&mut self) -> Result<()>; +} + +/// The trait for writing Puffin files asynchronously +#[async_trait] +pub trait PuffinAsyncWriter { + /// Set the properties of the Puffin file + fn set_properties(&mut self, properties: HashMap); + + /// Add a blob to the Puffin file + async fn add_blob(&mut self, blob: Blob) -> Result<()>; + + /// Finish writing the Puffin file + async fn finish(&mut self) -> Result<()>; +} diff --git a/src/puffin/src/file_format/writer/file.rs b/src/puffin/src/file_format/writer/file.rs new file mode 100644 index 000000000000..3f65b9c89d1c --- /dev/null +++ b/src/puffin/src/file_format/writer/file.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::{io, mem}; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use snafu::ResultExt; + +use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder}; +use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; +use crate::file_format::writer::footer::FooterWriter; +use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinSyncWriter}; +use crate::file_format::MAGIC; + +/// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`] +pub struct PuffinFileWriter { + /// The writer to write to + writer: W, + + /// The properties of the file + properties: HashMap, + + /// The metadata of the blobs + blob_metadata: Vec, + + /// The offset of the next blob + next_blob_offset: u64, +} + +impl PuffinFileWriter { + pub fn new(writer: W) -> Self { + Self { + writer, + properties: HashMap::new(), + blob_metadata: Vec::new(), + next_blob_offset: 0, + } + } + + fn create_blob_metadata( + &self, + typ: String, + properties: HashMap, + size: u64, + ) -> BlobMetadata { + BlobMetadataBuilder::default() + .blob_type(typ) + .properties(properties) + .offset(self.next_blob_offset as _) + .length(size as _) + .build() + .expect("Required fields are not set") + } +} + +impl PuffinSyncWriter for PuffinFileWriter { + fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; + } + + fn add_blob(&mut self, mut blob: Blob) -> Result<()> { + self.write_header_if_needed_sync()?; + + let size = io::copy(&mut blob.data, &mut self.writer).context(WriteSnafu)?; + + let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size); + self.blob_metadata.push(blob_metadata); + + self.next_blob_offset += size; + Ok(()) + } + + fn finish(&mut self) -> Result<()> { + self.write_header_if_needed_sync()?; + self.write_footer_sync()?; + self.writer.flush().context(FlushSnafu) + } +} + +#[async_trait] +impl PuffinAsyncWriter for PuffinFileWriter { + fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; + } + + async fn add_blob(&mut self, blob: Blob) -> Result<()> { + self.write_header_if_needed_async().await?; + + let size = futures::io::copy(blob.data, &mut self.writer) + .await + .context(WriteSnafu)?; + + let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size); + self.blob_metadata.push(blob_metadata); + + self.next_blob_offset += size; + Ok(()) + } + + async fn finish(&mut self) -> Result<()> { + self.write_header_if_needed_async().await?; + self.write_footer_async().await?; + self.writer.flush().await.context(FlushSnafu)?; + self.writer.close().await.context(CloseSnafu) + } +} + +impl PuffinFileWriter { + fn write_header_if_needed_sync(&mut self) -> Result<()> { + if self.next_blob_offset == 0 { + self.writer.write_all(&MAGIC).context(WriteSnafu)?; + self.next_blob_offset += MAGIC.len() as u64; + } + Ok(()) + } + + fn write_footer_sync(&mut self) -> Result<()> { + let bytes = FooterWriter::new( + mem::take(&mut self.blob_metadata), + mem::take(&mut self.properties), + ) + .into_footer_bytes()?; + + self.writer.write_all(&bytes).context(WriteSnafu) + } +} + +impl PuffinFileWriter { + async fn write_header_if_needed_async(&mut self) -> Result<()> { + if self.next_blob_offset == 0 { + self.writer.write_all(&MAGIC).await.context(WriteSnafu)?; + self.next_blob_offset += MAGIC.len() as u64; + } + Ok(()) + } + + async fn write_footer_async(&mut self) -> Result<()> { + let bytes = FooterWriter::new( + mem::take(&mut self.blob_metadata), + mem::take(&mut self.properties), + ) + .into_footer_bytes()?; + + self.writer.write_all(&bytes).await.context(WriteSnafu) + } +} diff --git a/src/puffin/src/file_format/writer/footer.rs b/src/puffin/src/file_format/writer/footer.rs new file mode 100644 index 000000000000..b24a50e8ca7a --- /dev/null +++ b/src/puffin/src/file_format/writer/footer.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::mem; + +use snafu::ResultExt; + +use crate::blob_metadata::BlobMetadata; +use crate::error::{Result, SerializeJsonSnafu}; +use crate::file_format::{Flags, MAGIC, MIN_FOOTER_SIZE}; +use crate::file_metadata::FileMetadataBuilder; + +/// Writer for the footer of a Puffin file. +/// +/// ```text +/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic +/// [4] [?] [4] [4] [4] +/// ``` +pub struct FooterWriter { + blob_metadata: Vec, + file_properties: HashMap, +} + +impl FooterWriter { + pub fn new(blob_metadata: Vec, file_properties: HashMap) -> Self { + Self { + blob_metadata, + file_properties, + } + } + + /// Serializes the footer to bytes + pub fn into_footer_bytes(mut self) -> Result> { + let payload = self.footer_payload()?; + let payload_size = payload.len(); + + let capacity = MIN_FOOTER_SIZE as usize + payload_size; + let mut buf = Vec::with_capacity(capacity); + + self.write_magic(&mut buf); // HeadMagic + self.write_payload(&mut buf, &payload); // Payload + self.write_footer_payload_size(payload_size as _, &mut buf); // PayloadSize + self.write_flags(&mut buf); // Flags + self.write_magic(&mut buf); // FootMagic + Ok(buf) + } + + fn write_magic(&self, buf: &mut Vec) { + buf.extend_from_slice(&MAGIC); + } + + fn write_payload(&self, buf: &mut Vec, payload: &[u8]) { + buf.extend_from_slice(payload); + } + + fn write_footer_payload_size(&self, payload_size: i32, buf: &mut Vec) { + buf.extend_from_slice(&payload_size.to_le_bytes()); + } + + /// Appends reserved flags (currently zero-initialized) to the given buffer. + /// + /// TODO(zhongzc): support compression + fn write_flags(&self, buf: &mut Vec) { + buf.extend_from_slice(&Flags::DEFAULT.bits().to_le_bytes()); + } + + fn footer_payload(&mut self) -> Result> { + let file_metadata = FileMetadataBuilder::default() + .blobs(mem::take(&mut self.blob_metadata)) + .properties(mem::take(&mut self.file_properties)) + .build() + .expect("Required fields are not set"); + + serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu) + } +} diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index 1d48ecd5656e..ecf5f1d49b15 100644 Binary files a/src/puffin/src/tests.rs and b/src/puffin/src/tests.rs differ