diff --git a/Cargo.lock b/Cargo.lock index 622a3fafd4cc..698e3138d806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6549,8 +6549,11 @@ name = "puffin" version = "0.4.2" dependencies = [ "derive_builder 0.12.0", + "futures", + "pin-project", "serde", "serde_json", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e00d8afad6ea..b178fd9b97ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi ] } parquet = "47.0" paste = "1.0" +pin-project = "1.0" prometheus = { version = "0.13.3", features = ["process"] } prost = "0.12" raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index 6c1520f3505d..a0c56c0f8136 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -6,5 +6,10 @@ license.workspace = true [dependencies] derive_builder.workspace = true +futures.workspace = true +pin-project.workspace = true serde.workspace = true serde_json.workspace = true + +[dev-dependencies] +tokio.workspace = true diff --git a/src/puffin/src/lib.rs b/src/puffin/src/lib.rs index 53ef9295145b..88a1d0254019 100644 --- a/src/puffin/src/lib.rs +++ b/src/puffin/src/lib.rs @@ -14,3 +14,4 @@ pub mod blob_metadata; pub mod file_metadata; +pub mod partial_reader; diff --git a/src/puffin/src/partial_reader.rs b/src/puffin/src/partial_reader.rs new file mode 100644 index 000000000000..ef4815679440 --- /dev/null +++ b/src/puffin/src/partial_reader.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod r#async; +mod position; +mod sync; + +use pin_project::pin_project; + +/// `PartialReader` to perform synchronous or asynchronous reads on a portion of a resource. +#[pin_project] +pub struct PartialReader { + /// offset of the portion in the resource + offset: u64, + + /// size of the portion in the resource + size: u64, + + /// Resource for the portion. + /// The `offset` and `size` fields are used to determine the slice of `source` to read. + #[pin] + source: R, + + /// The current position within the portion. + /// + /// A `None` value indicates that no read operations have been performed yet on this portion. + /// Before a read operation can be performed, the resource must be positioned at the correct offset in the portion. + /// After the first read operation, this field will be set to `Some(_)`, representing the current read position in the portion. + position_in_portion: Option, +} + +impl PartialReader { + /// Creates a new `PartialReader` for the given resource. + pub fn new(source: R, offset: u64, size: u64) -> Self { + Self { + offset, + size, + source, + position_in_portion: None, + } + } + + /// Returns the current position in the portion. + pub fn position(&self) -> u64 { + self.position_in_portion.unwrap_or_default() + } + + /// Returns the size of the portion in portion. + pub fn size(&self) -> u64 { + self.size + } + + /// Returns whether the portion is empty. + pub fn is_empty(&self) -> bool { + self.size == 0 + } + + /// Returns whether the current position is at the end of the portion. + pub fn is_eof(&self) -> bool { + self.position() == self.size + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::*; + + #[test] + fn is_empty_returns_true_for_zero_length_blob() { + let data: Vec = (0..100).collect(); + let reader = PartialReader::new(Cursor::new(data), 10, 0); + assert!(reader.is_empty()); + assert!(reader.is_eof()); + } + + #[test] + fn is_empty_returns_false_for_non_zero_length_blob() { + let data: Vec = (0..100).collect(); + let reader = PartialReader::new(Cursor::new(data), 10, 30); + assert!(!reader.is_empty()); + } +} diff --git a/src/puffin/src/partial_reader/async.rs b/src/puffin/src/partial_reader/async.rs new file mode 100644 index 000000000000..2cc9fae5236a --- /dev/null +++ b/src/puffin/src/partial_reader/async.rs @@ -0,0 +1,196 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{ready, AsyncRead, AsyncSeek}; + +use crate::partial_reader::position::position_after_seek; +use crate::partial_reader::PartialReader; + +impl AsyncRead for PartialReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // past end of portion + if self.position() > self.size() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid read past the end of the portion", + ))); + } + + // end of portion + if self.is_eof() { + return Poll::Ready(Ok(0)); + } + + // first read, seek to the correct offset + if self.position_in_portion.is_none() { + // seek operation + let seek_from = io::SeekFrom::Start(self.offset); + ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?; + + self.position_in_portion = Some(0); + } + + // prevent reading over the end + let max_len = (self.size() - self.position_in_portion.unwrap()) as usize; + let actual_len = max_len.min(buf.len()); + + // create a limited reader + let target_buf = &mut buf[..actual_len]; + + // read operation + let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?; + self.position_in_portion = Some(self.position() + read_bytes as u64); + + Poll::Ready(Ok(read_bytes)) + } +} + +impl AsyncSeek for PartialReader { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: io::SeekFrom, + ) -> Poll> { + let new_position = position_after_seek(pos, self.position(), self.size())?; + let pos = io::SeekFrom::Start(self.offset + new_position); + ready!(self.as_mut().project().source.poll_seek(cx, pos))?; + + self.position_in_portion = Some(new_position); + Poll::Ready(Ok(new_position)) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use futures::{AsyncReadExt as _, AsyncSeekExt as _}; + + use super::*; + + #[tokio::test] + async fn read_all_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100); + let mut buf = vec![0; 100]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 100); + assert_eq!(buf, data); + } + + #[tokio::test] + async fn read_part_of_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + assert_eq!(buf, (10..40).collect::>()); + } + + #[tokio::test] + async fn seek_and_read_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + let mut buf = vec![0; 10]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 10); + assert_eq!(buf, (20..30).collect::>()); + } + + #[tokio::test] + async fn read_past_end_of_portion_is_eof() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 50]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF + } + + #[tokio::test] + async fn seek_past_end_of_portion_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // seeking past the portion returns an error + assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err()); + } + + #[tokio::test] + async fn seek_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + // seeking back to the start of the portion + assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0); + // seeking to a negative position returns an error + assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err()); + } + + #[tokio::test] + async fn seek_from_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 10]; + // seek to 10 bytes before the end of the portion + assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20); + assert_eq!(reader.read(&mut buf).await.unwrap(), 10); + // the final 10 bytes of the portion + assert_eq!(buf, (30..40).collect::>()); + assert!(reader.is_eof()); + } + + #[tokio::test] + async fn seek_from_end_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30); + // seeking to a negative position returns an error + assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err()); + } + + #[tokio::test] + async fn zero_length_portion_returns_zero_on_read() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 0); + let mut buf = vec![0; 10]; + // reading a portion with zero length returns 0 bytes + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + } + + #[tokio::test] + async fn is_eof_returns_true_at_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // we are not at the end of the portion + assert!(!reader.is_eof()); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + // we are at the end of the portion + assert!(reader.is_eof()); + } + + #[tokio::test] + async fn position_resets_after_seek_to_start() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); + assert_eq!(reader.position(), 10); + assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0); + assert_eq!(reader.position(), 0); + } +} diff --git a/src/puffin/src/partial_reader/position.rs b/src/puffin/src/partial_reader/position.rs new file mode 100644 index 000000000000..e57817c493af --- /dev/null +++ b/src/puffin/src/partial_reader/position.rs @@ -0,0 +1,102 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +/// Calculates the new position after seeking. It checks if the new position +/// is valid (within the portion bounds) before returning it. +pub fn position_after_seek( + seek_from: io::SeekFrom, + position_in_portion: u64, + size_of_portion: u64, +) -> io::Result { + let new_position = match seek_from { + io::SeekFrom::Start(offset) => offset, + io::SeekFrom::Current(offset) => { + let next = (position_in_portion as i64) + offset; + if next < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )); + } + next as u64 + } + io::SeekFrom::End(offset) => { + let end = size_of_portion as i64; + (end + offset) as u64 + } + }; + + if new_position > size_of_portion { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a position beyond the end of the portion", + )); + } + + Ok(new_position) +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use super::*; + + #[test] + fn test_position_after_seek_from_start() { + let result = position_after_seek(io::SeekFrom::Start(10), 0, 20).unwrap(); + assert_eq!(result, 10); + } + + #[test] + fn test_position_after_seek_from_start_out_of_bounds() { + let result = position_after_seek(io::SeekFrom::Start(30), 0, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } + + #[test] + fn test_position_after_seek_from_current() { + let result = position_after_seek(io::SeekFrom::Current(10), 10, 30).unwrap(); + assert_eq!(result, 20); + } + + #[test] + fn test_position_after_seek_from_current_negative_position_within_bounds() { + let result = position_after_seek(io::SeekFrom::Current(-10), 15, 20).unwrap(); + assert_eq!(result, 5); + } + + #[test] + fn test_position_after_seek_from_current_negative_position() { + let result = position_after_seek(io::SeekFrom::Current(-10), 5, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } + + #[test] + fn test_position_after_seek_from_end() { + let result = position_after_seek(io::SeekFrom::End(-10), 0, 30).unwrap(); + assert_eq!(result, 20); + } + + #[test] + fn test_position_after_seek_from_end_out_of_bounds() { + let result = position_after_seek(io::SeekFrom::End(10), 0, 20); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), ErrorKind::InvalidInput); + } +} diff --git a/src/puffin/src/partial_reader/sync.rs b/src/puffin/src/partial_reader/sync.rs new file mode 100644 index 000000000000..1b7781543973 --- /dev/null +++ b/src/puffin/src/partial_reader/sync.rs @@ -0,0 +1,180 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use crate::partial_reader::position::position_after_seek; +use crate::partial_reader::PartialReader; + +impl io::Read for PartialReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + // past end of portion + if self.position() > self.size() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid read past the end of the portion", + )); + } + + // end of portion + if self.is_eof() { + return Ok(0); + } + + // haven't read from the portion yet, need to seek to the start of it. + if self.position_in_portion.is_none() { + self.source.seek(io::SeekFrom::Start(self.offset))?; + self.position_in_portion = Some(0); + } + + // prevent reading over the end + let max_len = (self.size() - self.position_in_portion.unwrap()) as usize; + let actual_len = max_len.min(buf.len()); + + // create a limited reader + let target_buf = &mut buf[..actual_len]; + + // perform the actual read from the source and update the position. + let read_bytes = self.source.read(target_buf)?; + self.position_in_portion = Some(self.position_in_portion.unwrap() + read_bytes as u64); + + Ok(read_bytes) + } +} + +impl io::Seek for PartialReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let new_position = position_after_seek(pos, self.position(), self.size())?; + let pos = io::SeekFrom::Start(self.offset + new_position); + self.source.seek(pos)?; + + self.position_in_portion = Some(new_position); + Ok(new_position) + } +} + +#[cfg(test)] +mod tests { + use std::io::{Cursor, Read, Seek, SeekFrom}; + + use super::*; + + #[test] + fn read_all_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100); + let mut buf = vec![0; 100]; + assert_eq!(reader.read(&mut buf).unwrap(), 100); + assert_eq!(buf, data); + } + + #[test] + fn read_part_of_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + assert_eq!(buf, (10..40).collect::>()); + } + + #[test] + fn seek_and_read_data_in_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + let mut buf = vec![0; 10]; + assert_eq!(reader.read(&mut buf).unwrap(), 10); + assert_eq!(buf, (20..30).collect::>()); + } + + #[test] + fn read_past_end_of_portion_is_eof() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 50]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + assert_eq!(reader.read(&mut buf).unwrap(), 0); // hit EOF + } + + #[test] + fn seek_past_end_of_portion_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // seeking past the portion returns an error + assert!(reader.seek(SeekFrom::Start(31)).is_err()); + } + + #[test] + fn seek_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + // seeking back to the start of the portion + assert_eq!(reader.seek(SeekFrom::Current(-10)).unwrap(), 0); + // seeking to a negative position returns an error + assert!(reader.seek(SeekFrom::Current(-1)).is_err()); + } + + #[test] + fn seek_from_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut buf = vec![0; 10]; + // seek to 10 bytes before the end of the portion + assert_eq!(reader.seek(SeekFrom::End(-10)).unwrap(), 20); + assert_eq!(reader.read(&mut buf).unwrap(), 10); + // the final 10 bytes of the portion + assert_eq!(buf, (30..40).collect::>()); + assert!(reader.is_eof()); + } + + #[test] + fn seek_from_end_to_negative_position_returns_error() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30); + // seeking to a negative position returns an error + assert!(reader.seek(SeekFrom::End(-31)).is_err()); + } + + #[test] + fn zero_length_portion_returns_zero_on_read() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 0); + let mut buf = vec![0; 10]; + // reading a portion with zero length returns 0 bytes + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + + #[test] + fn is_eof_returns_true_at_end_of_portion() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + // we are not at the end of the portion + assert!(!reader.is_eof()); + let mut buf = vec![0; 30]; + assert_eq!(reader.read(&mut buf).unwrap(), 30); + // we are at the end of the portion + assert!(reader.is_eof()); + } + + #[test] + fn position_resets_after_seek_to_start() { + let data: Vec = (0..100).collect(); + let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + assert_eq!(reader.seek(SeekFrom::Start(10)).unwrap(), 10); + assert_eq!(reader.position(), 10); + assert_eq!(reader.seek(SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(reader.position(), 0); + } +}