Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(puffin): add partial reader #2741

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi
] }
parquet = "47.0"
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
prost = "0.12"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
Expand Down
5 changes: 5 additions & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@ license.workspace = true

[dependencies]
derive_builder.workspace = true
futures.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true

[dev-dependencies]
tokio.workspace = true
1 change: 1 addition & 0 deletions src/puffin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod blob_metadata;
pub mod file_metadata;
pub mod partial_reader;
95 changes: 95 additions & 0 deletions src/puffin/src/partial_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod r#async;
mod position;
mod sync;

use pin_project::pin_project;

/// `PartialReader` to perform synchronous or asynchronous reads on a portion of a resource.
#[pin_project]
pub struct PartialReader<R> {
/// offset of the portion in the resource
offset: u64,

/// size of the portion in the resource
size: u64,

/// Resource for the portion.
/// The `offset` and `size` fields are used to determine the slice of `source` to read.
#[pin]
source: R,

/// The current position within the portion.
///
/// A `None` value indicates that no read operations have been performed yet on this portion.
/// Before a read operation can be performed, the resource must be positioned at the correct offset in the portion.
/// After the first read operation, this field will be set to `Some(_)`, representing the current read position in the portion.
position_in_portion: Option<u64>,
}

impl<R> PartialReader<R> {
/// Creates a new `PartialReader` for the given resource.
pub fn new(source: R, offset: u64, size: u64) -> Self {
Self {
offset,
size,
source,
position_in_portion: None,
}
}

/// Returns the current position in the portion.
pub fn position(&self) -> u64 {
self.position_in_portion.unwrap_or_default()
}

/// Returns the size of the portion in portion.
pub fn size(&self) -> u64 {
self.size
}

/// Returns whether the portion is empty.
pub fn is_empty(&self) -> bool {
self.size == 0
}

/// Returns whether the current position is at the end of the portion.
pub fn is_eof(&self) -> bool {
self.position() == self.size
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use super::*;

#[test]
fn is_empty_returns_true_for_zero_length_blob() {
let data: Vec<u8> = (0..100).collect();
let reader = PartialReader::new(Cursor::new(data), 10, 0);
assert!(reader.is_empty());
assert!(reader.is_eof());
}

#[test]
fn is_empty_returns_false_for_non_zero_length_blob() {
let data: Vec<u8> = (0..100).collect();
let reader = PartialReader::new(Cursor::new(data), 10, 30);
assert!(!reader.is_empty());
}
}
196 changes: 196 additions & 0 deletions src/puffin/src/partial_reader/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{ready, AsyncRead, AsyncSeek};

use crate::partial_reader::position::position_after_seek;
use crate::partial_reader::PartialReader;

impl<R: AsyncRead + AsyncSeek + Unpin> AsyncRead for PartialReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// past end of portion
if self.position() > self.size() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid read past the end of the portion",
)));
}

// end of portion
if self.is_eof() {
return Poll::Ready(Ok(0));
}

// first read, seek to the correct offset
if self.position_in_portion.is_none() {
// seek operation
let seek_from = io::SeekFrom::Start(self.offset);
ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?;

self.position_in_portion = Some(0);
}

// prevent reading over the end
let max_len = (self.size() - self.position_in_portion.unwrap()) as usize;
let actual_len = max_len.min(buf.len());

// create a limited reader
let target_buf = &mut buf[..actual_len];

// read operation
let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?;
self.position_in_portion = Some(self.position() + read_bytes as u64);

Poll::Ready(Ok(read_bytes))
}
}

impl<R: AsyncRead + AsyncSeek + Unpin> AsyncSeek for PartialReader<R> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
let new_position = position_after_seek(pos, self.position(), self.size())?;
let pos = io::SeekFrom::Start(self.offset + new_position);
ready!(self.as_mut().project().source.poll_seek(cx, pos))?;

self.position_in_portion = Some(new_position);
Poll::Ready(Ok(new_position))
}
}

#[cfg(test)]
mod tests {
use futures::io::Cursor;
use futures::{AsyncReadExt as _, AsyncSeekExt as _};

use super::*;

#[tokio::test]
async fn read_all_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100);
let mut buf = vec![0; 100];
assert_eq!(reader.read(&mut buf).await.unwrap(), 100);
assert_eq!(buf, data);
}

#[tokio::test]
async fn read_part_of_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
assert_eq!(buf, (10..40).collect::<Vec<u8>>());
}

#[tokio::test]
async fn seek_and_read_data_in_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
let mut buf = vec![0; 10];
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
assert_eq!(buf, (20..30).collect::<Vec<u8>>());
}

#[tokio::test]
async fn read_past_end_of_portion_is_eof() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 50];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF
}

#[tokio::test]
async fn seek_past_end_of_portion_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// seeking past the portion returns an error
assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err());
}

#[tokio::test]
async fn seek_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
// seeking back to the start of the portion
assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0);
// seeking to a negative position returns an error
assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err());
}

#[tokio::test]
async fn seek_from_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
let mut buf = vec![0; 10];
// seek to 10 bytes before the end of the portion
assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20);
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
// the final 10 bytes of the portion
assert_eq!(buf, (30..40).collect::<Vec<u8>>());
assert!(reader.is_eof());
}

#[tokio::test]
async fn seek_from_end_to_negative_position_returns_error() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30);
// seeking to a negative position returns an error
assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err());
}

#[tokio::test]
async fn zero_length_portion_returns_zero_on_read() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 0);
let mut buf = vec![0; 10];
// reading a portion with zero length returns 0 bytes
assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
}

#[tokio::test]
async fn is_eof_returns_true_at_end_of_portion() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
// we are not at the end of the portion
assert!(!reader.is_eof());
let mut buf = vec![0; 30];
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
// we are at the end of the portion
assert!(reader.is_eof());
}

#[tokio::test]
async fn position_resets_after_seek_to_start() {
let data: Vec<u8> = (0..100).collect();
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
assert_eq!(reader.position(), 10);
assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0);
assert_eq!(reader.position(), 0);
}
}
Loading
Loading