From c3e8e449c81de6f31604a17c8adcd3a8f06568c6 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 30 Jul 2024 19:15:11 +1000 Subject: [PATCH 1/7] Parquet/async: Swap default MetadataLoader::load behaviour to suffix requests, swap get_bytes to take a GetRange (equivalent to object-store's) --- parquet/src/arrow/async_reader/metadata.rs | 109 +++++++++++---------- parquet/src/arrow/async_reader/mod.rs | 93 ++++++++++++++---- parquet/src/arrow/async_reader/store.rs | 55 +++++++---- 3 files changed, 165 insertions(+), 92 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 9224ea3f68a8..e202d3688b3c 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::async_reader::AsyncFileReader; +use crate::arrow::async_reader::{AsyncFileReader, GetRange}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; @@ -25,15 +25,14 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; use std::future::Future; -use std::ops::Range; /// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] pub trait MetadataFetch { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result>; } impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.get_bytes(range) } } @@ -52,22 +51,9 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters - pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { - if file_size < 8 { - return Err(ParquetError::EOF(format!( - "file size of {file_size} is less than footer" - ))); - } + pub async fn load(mut fetch: F, prefetch: Option) -> Result { - // If a size hint is provided, read more than the minimum size - // to try and avoid a second fetch. - let footer_start = if let Some(size_hint) = prefetch { - file_size.saturating_sub(size_hint) - } else { - file_size - 8 - }; - - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?; let suffix_len = suffix.len(); let mut footer = [0; 8]; @@ -75,26 +61,20 @@ impl MetadataLoader { let length = decode_footer(&footer)?; - if file_size < length + 8 { - return Err(ParquetError::EOF(format!( - "file size of {} is less than footer + metadata {}", - file_size, - length + 8 - ))); - } - // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - 8 { - let metadata_start = file_size - length - 8; - let meta = fetch.fetch(metadata_start..file_size - 8).await?; - (decode_metadata(&meta)?, None) + let metadata_offset = length + 8; + let meta = fetch.fetch(GetRange::Suffix(metadata_offset)).await?; + let slice = &meta[0..length]; + (decode_metadata(&slice)?, None) } else { - let metadata_start = file_size - length - 8 - footer_start; + let metadata_offset = length + 8; + let metadata_start = suffix_len - metadata_offset; let slice = &suffix[metadata_start..suffix_len - 8]; ( decode_metadata(slice)?, - Some((footer_start, suffix.slice(..metadata_start))), + Some((0, suffix.slice(..metadata_start))), ) }; @@ -105,6 +85,10 @@ impl MetadataLoader { }) } + pub async fn load_absolute(mut fetch: F, file_size: usize, prefetch: Option) -> Result { + todo!() + } + /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`] pub fn new(fetch: F, metadata: ParquetMetaData) -> Self { Self { @@ -133,13 +117,15 @@ impl MetadataLoader { Some(range) => range, }; + let page_index_len = range.end - range.start; + // TODO: determine if _remainder_start is needed even in the non-suffix request case let data = match &self.remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - remainder.slice(offset..range.end - *remainder_start + offset) + Some((_remainder_start, remainder)) if remainder.len() >= page_index_len => { + let offset = remainder.len() - page_index_len; + remainder.slice(offset..) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => self.fetch.fetch(range.start..range.end).await?, + _ => self.fetch.fetch((range.start..range.end).into()).await?, }; // Sanity check @@ -200,10 +186,10 @@ struct MetadataFetchFn(F); impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(GetRange) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -226,15 +212,18 @@ where /// significantly reduce the number of `fetch` requests, and consequently latency pub async fn fetch_parquet_metadata( fetch: F, - file_size: usize, + file_size: Option, prefetch: Option, ) -> Result where - F: FnMut(Range) -> Fut + Send, + F: FnMut(GetRange) -> Fut + Send, Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; + let loader = match file_size { + Some(file_size) => MetadataLoader::load_absolute(fetch, file_size, prefetch).await?, + None => MetadataLoader::load(fetch, prefetch).await? + }; Ok(loader.finish()) } @@ -247,7 +236,13 @@ mod tests { use std::io::{Read, Seek, SeekFrom}; use std::sync::atomic::{AtomicUsize, Ordering}; - fn read_range(file: &mut File, range: Range) -> Result { + fn read_range(file: &mut File, range: GetRange) -> Result { + let file_size = file.len().try_into().unwrap(); + let range = match range { + GetRange::Bounded(range) => range, + GetRange::Offset(offset) => offset..file_size, + GetRange::Suffix(end_offset) => (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size) + }; file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; let mut buf = Vec::with_capacity(len); @@ -269,13 +264,13 @@ mod tests { futures::future::ready(read_range(&mut file, range)) }; - let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap(); + let actual = fetch_parquet_metadata(&mut fetch, None, None).await.unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); // Metadata hint too small fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(10)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(10)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -283,7 +278,7 @@ mod tests { // Metadata hint too large fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(500)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(500)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -291,19 +286,19 @@ mod tests { // Metadata hint exactly correct fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(428)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(428)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let err = fetch_parquet_metadata(&mut fetch, 4, None) + let err = fetch_parquet_metadata(&mut fetch, Some(4), None) .await .unwrap_err() .to_string(); assert_eq!(err, "EOF: file size of 4 is less than footer"); - let err = fetch_parquet_metadata(&mut fetch, 20, None) + let err = fetch_parquet_metadata(&mut fetch, Some(20), None) .await .unwrap_err() .to_string(); @@ -321,7 +316,7 @@ mod tests { }; let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, None).await.unwrap(); + let mut loader = MetadataLoader::load(f, None).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); @@ -331,7 +326,7 @@ mod tests { // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap(); + let mut loader = MetadataLoader::load(f, Some(1729)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -341,7 +336,7 @@ mod tests { // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap(); + let mut loader = MetadataLoader::load(f, Some(130649)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -351,7 +346,17 @@ mod tests { // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap(); + let mut loader = MetadataLoader::load(f, Some(130650)).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load(f, Some(131651)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5695dbc10fe1..0baf9b7ab5e9 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -79,6 +79,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; use std::io::SeekFrom; use std::ops::Range; +use std::ops::RangeBounds; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -121,10 +122,41 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum GetRange { + /// Request a specific range of bytes + /// + /// If the given range is zero-length or starts after the end of the object, + /// an error will be returned. Additionally, if the range ends after the end + /// of the object, the entire remainder of the object will be returned. + /// Otherwise, the exact requested range will be returned. + Bounded(Range), + /// Request all bytes starting from a given byte offset + Offset(usize), + /// Request up to the last n bytes + Suffix(usize), +} + +impl> From for GetRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i + 1, + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => Self::Bounded(first..(i + 1)), + Excluded(i) => Self::Bounded(first..*i), + Unbounded => Self::Offset(first), + } + } +} + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { @@ -132,7 +164,7 @@ pub trait AsyncFileReader: Send { let mut result = Vec::with_capacity(ranges.len()); for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; + let data = self.get_bytes(range.into()).await?; result.push(data); } @@ -148,7 +180,7 @@ pub trait AsyncFileReader: Send { } impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } @@ -162,15 +194,31 @@ impl AsyncFileReader for Box { } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; - - let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { - return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); + let to_read = match range { + GetRange::Suffix(end_offset) => { + self.seek(SeekFrom::End(-(end_offset as i64))).await?; + Some(end_offset) + }, + GetRange::Offset(offset) => { + self.seek(SeekFrom::Start(offset as u64)).await?; + None + }, + GetRange::Bounded(range) => { + self.seek(SeekFrom::Start(range.start as u64)).await?; + Some(range.end - range.start) + } + }; + // TODO: figure out a better alternative for Offset ranges + let mut buffer = Vec::with_capacity(to_read.unwrap_or(1_024usize)); + if let Some(to_read) = to_read { + let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); + } + } else { + self.read_to_end(&mut buffer).await?; } Ok(buffer.into()) @@ -358,11 +406,11 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self.input.0.get_bytes((offset..offset + length as usize).into()), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes((offset..offset + SBBF_HEADER_SIZE_ESTIMATE).into()), } .await?; @@ -393,7 +441,7 @@ impl ParquetRecordBatchStreamBuilder { })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes((bitset_offset as usize..bitset_offset as usize + bitset_length).into()) .await? } }; @@ -932,12 +980,17 @@ mod tests { struct TestReader { data: Bytes, metadata: Arc, - requests: Arc>>>, + requests: Arc>>, } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.requests.lock().unwrap().push(range.clone()); + let range = match range { + GetRange::Bounded(range) => range, + GetRange::Offset(offset) => offset..self.data.len(), + GetRange::Suffix(end_offset) => self.data.len().saturating_sub(end_offset)..self.data.len() + }; futures::future::ready(Ok(self.data.slice(range))).boxed() } @@ -995,8 +1048,8 @@ mod tests { assert_eq!( &requests[..], &[ - offset_1 as usize..(offset_1 + length_1) as usize, - offset_2 as usize..(offset_2 + length_2) as usize + GetRange::from(offset_1 as usize..(offset_1 + length_1) as usize), + GetRange::from(offset_2 as usize..(offset_2 + length_2) as usize) ] ); } @@ -1581,7 +1634,7 @@ mod tests { // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; - let mut expected_page_requests: Vec> = vec![]; + let mut expected_page_requests: Vec = vec![]; while let Some(page) = pages.next() { let num_rows = if let Some(next_page) = pages.peek() { next_page.first_row_index - page.first_row_index @@ -1595,7 +1648,7 @@ mod tests { selectors.push(RowSelector::select(num_rows as usize)); let start = page.offset as usize; let end = start + page.compressed_page_size as usize; - expected_page_requests.push(start..end); + expected_page_requests.push((start..end).into()); } skip = !skip; } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..7d06251af4fd 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,12 +22,25 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::GetOptions; +use object_store::{ObjectStore, path::Path}; use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use crate::errors::Result; use crate::file::metadata::ParquetMetaData; +use super::GetRange; + +impl From for object_store::GetRange { + fn from(range: GetRange) -> Self { + match range { + GetRange::Bounded(range) => object_store::GetRange::Bounded(range), + GetRange::Offset(offset) => object_store::GetRange::Offset(offset), + GetRange::Suffix(end_offset) => object_store::GetRange::Suffix(end_offset), + } + } +} + /// Reads Parquet files in object storage using [`ObjectStore`]. /// /// ```no_run @@ -55,7 +68,7 @@ use crate::file::metadata::ParquetMetaData; #[derive(Clone, Debug)] pub struct ParquetObjectReader { store: Arc, - meta: ObjectMeta, + location: Path, metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, @@ -65,10 +78,10 @@ impl ParquetObjectReader { /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] /// /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] - pub fn new(store: Arc, meta: ObjectMeta) -> Self { + pub fn new(store: Arc, location: Path) -> Self { Self { store, - meta, + location, metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, @@ -102,11 +115,16 @@ impl ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.store - .get_range(&self.meta.location, range) - .map_err(|e| e.into()) - .boxed() + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { + async move { + self.store + .get_opts(&self.location, GetOptions { + range: Some(range.into()), + ..Default::default() + }).await?.bytes().await + } + .map_err(|e| e.into()) + .boxed() } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> @@ -115,7 +133,7 @@ impl AsyncFileReader for ParquetObjectReader { { async move { self.store - .get_ranges(&self.meta.location, &ranges) + .get_ranges(&self.location, &ranges) .await .map_err(|e| e.into()) } @@ -126,9 +144,10 @@ impl AsyncFileReader for ParquetObjectReader { Box::pin(async move { let preload_column_index = self.preload_column_index; let preload_offset_index = self.preload_offset_index; - let file_size = self.meta.size; let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; + // TODO: distinguish between suffix-supporting object stores, and those that don't + // Alternatively, surface in the form of a flag on this struct. + let mut loader = MetadataLoader::load(self, prefetch).await?; loader .load_page_index(preload_column_index, preload_offset_index) .await?; @@ -155,14 +174,10 @@ mod tests { async fn test_simple() { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); - - let mut meta = store - .head(&Path::from("alltypes_plain.parquet")) - .await - .unwrap(); + let mut location = Path::from("alltypes_plain.parquet"); let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone()); + let object_reader = ParquetObjectReader::new(Arc::clone(&store), location.clone()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -171,9 +186,9 @@ mod tests { assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 8); - meta.location = Path::from("I don't exist.parquet"); + location = Path::from("I don't exist.parquet"); - let object_reader = ParquetObjectReader::new(store, meta); + let object_reader = ParquetObjectReader::new(store, location); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), From 73cd820025660a66a67986ab9e2cbc35b9fb24ed Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 30 Jul 2024 19:42:59 +1000 Subject: [PATCH 2/7] cargo fmt --- parquet/src/arrow/async_reader/metadata.rs | 19 +++++++++++++------ parquet/src/arrow/async_reader/mod.rs | 17 ++++++++++++----- parquet/src/arrow/async_reader/store.rs | 16 +++++++++++----- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index e202d3688b3c..7bade9f0a750 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -52,7 +52,6 @@ impl MetadataLoader { /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters pub async fn load(mut fetch: F, prefetch: Option) -> Result { - let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?; let suffix_len = suffix.len(); @@ -85,7 +84,11 @@ impl MetadataLoader { }) } - pub async fn load_absolute(mut fetch: F, file_size: usize, prefetch: Option) -> Result { + pub async fn load_absolute( + mut fetch: F, + file_size: usize, + prefetch: Option, + ) -> Result { todo!() } @@ -222,7 +225,7 @@ where let fetch = MetadataFetchFn(fetch); let loader = match file_size { Some(file_size) => MetadataLoader::load_absolute(fetch, file_size, prefetch).await?, - None => MetadataLoader::load(fetch, prefetch).await? + None => MetadataLoader::load(fetch, prefetch).await?, }; Ok(loader.finish()) } @@ -241,7 +244,9 @@ mod tests { let range = match range { GetRange::Bounded(range) => range, GetRange::Offset(offset) => offset..file_size, - GetRange::Suffix(end_offset) => (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size) + GetRange::Suffix(end_offset) => { + (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size) + } }; file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; @@ -264,7 +269,9 @@ mod tests { futures::future::ready(read_range(&mut file, range)) }; - let actual = fetch_parquet_metadata(&mut fetch, None, None).await.unwrap(); + let actual = fetch_parquet_metadata(&mut fetch, None, None) + .await + .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -352,7 +359,7 @@ mod tests { assert_eq!(fetch_count.load(Ordering::SeqCst), 1); let metadata = loader.finish(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); - + // Prefetch more than enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 0baf9b7ab5e9..6e2d3d747a6a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -200,11 +200,11 @@ impl AsyncFileReader for T { GetRange::Suffix(end_offset) => { self.seek(SeekFrom::End(-(end_offset as i64))).await?; Some(end_offset) - }, + } GetRange::Offset(offset) => { self.seek(SeekFrom::Start(offset as u64)).await?; None - }, + } GetRange::Bounded(range) => { self.seek(SeekFrom::Start(range.start as u64)).await?; Some(range.end - range.start) @@ -406,7 +406,10 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes((offset..offset + length as usize).into()), + Some(length) => self + .input + .0 + .get_bytes((offset..offset + length as usize).into()), None => self .input .0 @@ -441,7 +444,9 @@ impl ParquetRecordBatchStreamBuilder { })?; self.input .0 - .get_bytes((bitset_offset as usize..bitset_offset as usize + bitset_length).into()) + .get_bytes( + (bitset_offset as usize..bitset_offset as usize + bitset_length).into(), + ) .await? } }; @@ -989,7 +994,9 @@ mod tests { let range = match range { GetRange::Bounded(range) => range, GetRange::Offset(offset) => offset..self.data.len(), - GetRange::Suffix(end_offset) => self.data.len().saturating_sub(end_offset)..self.data.len() + GetRange::Suffix(end_offset) => { + self.data.len().saturating_sub(end_offset)..self.data.len() + } }; futures::future::ready(Ok(self.data.slice(range))).boxed() } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 7d06251af4fd..29a4f566b856 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -23,7 +23,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use object_store::GetOptions; -use object_store::{ObjectStore, path::Path}; +use object_store::{path::Path, ObjectStore}; use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use crate::errors::Result; @@ -118,10 +118,16 @@ impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { async move { self.store - .get_opts(&self.location, GetOptions { - range: Some(range.into()), - ..Default::default() - }).await?.bytes().await + .get_opts( + &self.location, + GetOptions { + range: Some(range.into()), + ..Default::default() + }, + ) + .await? + .bytes() + .await } .map_err(|e| e.into()) .boxed() From b671545d62130a849e5a22585bce02c3710b6505 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 30 Jul 2024 19:51:28 +1000 Subject: [PATCH 3/7] Fix read_with_rowgroup test --- parquet/examples/read_with_rowgroup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..07e47eb96d6e 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -167,7 +167,7 @@ impl InMemoryRowGroup { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); let data = reader - .get_bytes(start as usize..(start + len) as usize) + .get_bytes((start as usize..(start + len) as usize).into()) .await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { From 76a4633cbce6d2fed2358fcadf81ef9b0772b410 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Thu, 1 Aug 2024 20:14:38 +1000 Subject: [PATCH 4/7] Add tests, implementation for MetadataLoader::load_absolute --- parquet/src/arrow/async_reader/metadata.rs | 118 ++++++++++++++++++++- parquet/src/arrow/async_reader/store.rs | 29 ++++- 2 files changed, 142 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 7bade9f0a750..4c556c0be9aa 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -89,7 +89,56 @@ impl MetadataLoader { file_size: usize, prefetch: Option, ) -> Result { - todo!() + if file_size < 8 { + return Err(ParquetError::EOF(format!( + "file size of {file_size} is less than footer" + ))); + } + + // If a size hint is provided, read more than the minimum size + // to try and avoid a second fetch. + let footer_start = if let Some(size_hint) = prefetch { + file_size.saturating_sub(size_hint) + } else { + file_size - 8 + }; + + let suffix = fetch.fetch((footer_start..file_size).into()).await?; + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let length = decode_footer(&footer)?; + + if file_size < length + 8 { + return Err(ParquetError::EOF(format!( + "file size of {} is less than footer + metadata {}", + file_size, + length + 8 + ))); + } + + // Did not fetch the entire file metadata in the initial read, need to make a second request + let (metadata, remainder) = if length > suffix_len - 8 { + let metadata_start = file_size - length - 8; + let meta = fetch.fetch((metadata_start..file_size - 8).into()).await?; + (decode_metadata(&meta)?, None) + } else { + let metadata_offset = length + 8; + let metadata_start = suffix_len - metadata_offset; + + let slice = &suffix[metadata_start..suffix_len - 8]; + ( + decode_metadata(slice)?, + Some((0, suffix.slice(..metadata_start))), + ) + }; + Ok(Self { + fetch, + metadata, + remainder, + }) } /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`] @@ -245,7 +294,7 @@ mod tests { GetRange::Bounded(range) => range, GetRange::Offset(offset) => offset..file_size, GetRange::Suffix(end_offset) => { - (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size) + file_size.saturating_sub(end_offset.try_into().unwrap())..file_size } }; file.seek(SeekFrom::Start(range.start as _))?; @@ -268,7 +317,14 @@ mod tests { fetch_count.fetch_add(1, Ordering::SeqCst); futures::future::ready(read_range(&mut file, range)) }; + // Known file size, unknown metadata size + let actual = fetch_parquet_metadata(&mut fetch, Some(len), None) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + fetch_count.store(0, Ordering::SeqCst); let actual = fetch_parquet_metadata(&mut fetch, None, None) .await .unwrap(); @@ -369,5 +425,63 @@ mod tests { assert_eq!(fetch_count.load(Ordering::SeqCst), 1); let metadata = loader.finish(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Known-size file + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_absolute(f, len, None).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 3); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch just footer exactly + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_absolute(f, len, Some(1729)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than footer but not enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_absolute(f, len, Some(130649)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch exactly enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_absolute(f, len, Some(130650)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_absolute(f, len, Some(131651)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); } } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 29a4f566b856..13da0afe11d5 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -70,6 +70,7 @@ pub struct ParquetObjectReader { store: Arc, location: Path, metadata_size_hint: Option, + file_size: Option, preload_column_index: bool, preload_offset_index: bool, } @@ -82,12 +83,20 @@ impl ParquetObjectReader { Self { store, location, + file_size: None, metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, } } + /// Provide the size of the file, for object stores that do not support suffix ranges (e.g. Azure) + pub fn with_file_size(self, file_size: usize) -> Self { + Self { + file_size: Some(file_size), + ..self + } + } /// Provide a hint as to the size of the parquet file's footer, /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata) pub fn with_footer_size_hint(self, hint: usize) -> Self { @@ -151,9 +160,10 @@ impl AsyncFileReader for ParquetObjectReader { let preload_column_index = self.preload_column_index; let preload_offset_index = self.preload_offset_index; let prefetch = self.metadata_size_hint; - // TODO: distinguish between suffix-supporting object stores, and those that don't - // Alternatively, surface in the form of a flag on this struct. - let mut loader = MetadataLoader::load(self, prefetch).await?; + let mut loader = match self.file_size { + Some(file_size) => MetadataLoader::load_absolute(self, file_size, prefetch).await, + None => MetadataLoader::load(self, prefetch).await, + }?; loader .load_page_index(preload_column_index, preload_offset_index) .await?; @@ -192,6 +202,19 @@ mod tests { assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 8); + let meta = store.head(&location).await.unwrap(); + let file_size = meta.size; + + let object_reader = ParquetObjectReader::new(Arc::clone(&store), location.clone()) + .with_file_size(file_size); + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) + .await + .unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 8); + location = Path::from("I don't exist.parquet"); let object_reader = ParquetObjectReader::new(store, location); From e9c28004cc7c31756de9e4a4bc1ae46de1db0bca Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Thu, 1 Aug 2024 20:31:58 +1000 Subject: [PATCH 5/7] Address MetadataLoader clippy errors, correct docs + add a suffix example --- parquet/Cargo.toml | 2 +- parquet/src/arrow/async_reader/metadata.rs | 6 ++---- parquet/src/arrow/async_reader/store.rs | 21 +++++++++++++++++---- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 7391d0964646..b5b5b04dbd0e 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -82,7 +82,7 @@ serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } -object_store = { version = "0.10.0", default-features = false, features = ["azure"] } +object_store = { version = "0.10.0", default-features = false, features = ["azure", "aws"] } # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 4c556c0be9aa..29fa489168c5 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -65,7 +65,7 @@ impl MetadataLoader { let metadata_offset = length + 8; let meta = fetch.fetch(GetRange::Suffix(metadata_offset)).await?; let slice = &meta[0..length]; - (decode_metadata(&slice)?, None) + (decode_metadata(slice)?, None) } else { let metadata_offset = length + 8; let metadata_start = suffix_len - metadata_offset; @@ -293,9 +293,7 @@ mod tests { let range = match range { GetRange::Bounded(range) => range, GetRange::Offset(offset) => offset..file_size, - GetRange::Suffix(end_offset) => { - file_size.saturating_sub(end_offset.try_into().unwrap())..file_size - } + GetRange::Suffix(end_offset) => file_size.saturating_sub(end_offset)..file_size, }; file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 13da0afe11d5..43ee83c66da6 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -47,12 +47,24 @@ impl From for object_store::GetRange { /// # use std::io::stdout; /// # use std::sync::Arc; /// # use object_store::azure::MicrosoftAzureBuilder; +/// # use object_store::aws::AmazonS3Builder; /// # use object_store::ObjectStore; /// # use object_store::path::Path; /// # use parquet::arrow::async_reader::ParquetObjectReader; /// # use parquet::arrow::ParquetRecordBatchStreamBuilder; /// # use parquet::schema::printer::print_parquet_metadata; /// # async fn run() { +/// // Object Stores that support suffix ranges: +/// // Populate configuration from environment +/// let storage_container = Arc::new(AmazonS3Builder::from_env().build().unwrap()); +/// let location = Path::from("path/to/blob.parquet"); +/// +/// // Show Parquet metadata +/// let reader = ParquetObjectReader::new(storage_container, location); +/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); +/// print_parquet_metadata(&mut stdout(), builder.metadata()); +/// # } +/// # async fn run_non_suffixed() { /// // Populate configuration from environment /// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap()); /// let location = Path::from("path/to/blob.parquet"); @@ -60,9 +72,10 @@ impl From for object_store::GetRange { /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta); +/// let reader = ParquetObjectReader::new(storage_container, location).with_file_size(meta.size); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); +/// /// # } /// ``` #[derive(Clone, Debug)] @@ -76,9 +89,7 @@ pub struct ParquetObjectReader { } impl ParquetObjectReader { - /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] - /// - /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`] pub fn new(store: Arc, location: Path) -> Self { Self { store, @@ -91,6 +102,8 @@ impl ParquetObjectReader { } /// Provide the size of the file, for object stores that do not support suffix ranges (e.g. Azure) + /// + /// file_size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] pub fn with_file_size(self, file_size: usize) -> Self { Self { file_size: Some(file_size), From 5f44af4d4bb88d31f320cf236f4c058068761f6b Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 27 Aug 2024 14:54:33 +1000 Subject: [PATCH 6/7] Swap precedence of file size agnostic and aware load methods --- parquet/src/arrow/async_reader/metadata.rs | 28 +++++++++++----------- parquet/src/arrow/async_reader/store.rs | 27 ++++++++++++++------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 29fa489168c5..a18fcd4145d6 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -51,7 +51,7 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters - pub async fn load(mut fetch: F, prefetch: Option) -> Result { + pub async fn load_without_size(mut fetch: F, prefetch: Option) -> Result { let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?; let suffix_len = suffix.len(); @@ -84,7 +84,7 @@ impl MetadataLoader { }) } - pub async fn load_absolute( + pub async fn load( mut fetch: F, file_size: usize, prefetch: Option, @@ -273,8 +273,8 @@ where { let fetch = MetadataFetchFn(fetch); let loader = match file_size { - Some(file_size) => MetadataLoader::load_absolute(fetch, file_size, prefetch).await?, - None => MetadataLoader::load(fetch, prefetch).await?, + Some(file_size) => MetadataLoader::load(fetch, file_size, prefetch).await?, + None => MetadataLoader::load_without_size(fetch, prefetch).await?, }; Ok(loader.finish()) } @@ -377,7 +377,7 @@ mod tests { }; let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, None).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, None).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); @@ -387,7 +387,7 @@ mod tests { // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, Some(1729)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(1729)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -397,7 +397,7 @@ mod tests { // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, Some(130649)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(130649)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -407,7 +407,7 @@ mod tests { // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, Some(130650)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(130650)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -417,7 +417,7 @@ mod tests { // Prefetch more than enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, Some(131651)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(131651)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -427,7 +427,7 @@ mod tests { // Known-size file fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_absolute(f, len, None).await.unwrap(); + let mut loader = MetadataLoader::load(f, len, None).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); @@ -437,7 +437,7 @@ mod tests { // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_absolute(f, len, Some(1729)) + let mut loader = MetadataLoader::load(f, len, Some(1729)) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -449,7 +449,7 @@ mod tests { // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_absolute(f, len, Some(130649)) + let mut loader = MetadataLoader::load(f, len, Some(130649)) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -461,7 +461,7 @@ mod tests { // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_absolute(f, len, Some(130650)) + let mut loader = MetadataLoader::load(f, len, Some(130650)) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -473,7 +473,7 @@ mod tests { // Prefetch more than enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_absolute(f, len, Some(131651)) + let mut loader = MetadataLoader::load(f, len, Some(131651)) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 43ee83c66da6..2a59d3f95dac 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; -use object_store::GetOptions; +use object_store::{GetOptions, ObjectMeta}; use object_store::{path::Path, ObjectStore}; use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; @@ -90,7 +90,18 @@ pub struct ParquetObjectReader { impl ParquetObjectReader { /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`] - pub fn new(store: Arc, location: Path) -> Self { + pub fn new(store: Arc, meta: ObjectMeta) -> Self { + Self { + store, + location: meta.location, + file_size: Some(meta.size), + metadata_size_hint: None, + preload_column_index: false, + preload_offset_index: false, + } + } + + pub fn new_without_size(store: Arc, location: Path) -> Self { Self { store, location, @@ -174,8 +185,8 @@ impl AsyncFileReader for ParquetObjectReader { let preload_offset_index = self.preload_offset_index; let prefetch = self.metadata_size_hint; let mut loader = match self.file_size { - Some(file_size) => MetadataLoader::load_absolute(self, file_size, prefetch).await, - None => MetadataLoader::load(self, prefetch).await, + Some(file_size) => MetadataLoader::load(self, file_size, prefetch).await, + None => MetadataLoader::load_without_size(self, prefetch).await, }?; loader .load_page_index(preload_column_index, preload_offset_index) @@ -206,7 +217,7 @@ mod tests { let mut location = Path::from("alltypes_plain.parquet"); let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), location.clone()); + let object_reader = ParquetObjectReader::new_without_size(Arc::clone(&store), location.clone()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -216,10 +227,8 @@ mod tests { assert_eq!(batches[0].num_rows(), 8); let meta = store.head(&location).await.unwrap(); - let file_size = meta.size; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), location.clone()) - .with_file_size(file_size); + let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -230,7 +239,7 @@ mod tests { location = Path::from("I don't exist.parquet"); - let object_reader = ParquetObjectReader::new(store, location); + let object_reader = ParquetObjectReader::new_without_size(store, location); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), From 3862419edb6828141691a227d42941c292af5839 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 27 Aug 2024 19:41:18 +1000 Subject: [PATCH 7/7] cargo fmt --- parquet/src/arrow/async_reader/metadata.rs | 38 ++++++++++------------ parquet/src/arrow/async_reader/store.rs | 5 +-- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index a18fcd4145d6..98b3f70b63c4 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -84,11 +84,7 @@ impl MetadataLoader { }) } - pub async fn load( - mut fetch: F, - file_size: usize, - prefetch: Option, - ) -> Result { + pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { if file_size < 8 { return Err(ParquetError::EOF(format!( "file size of {file_size} is less than footer" @@ -387,7 +383,9 @@ mod tests { // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_without_size(f, Some(1729)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(1729)) + .await + .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -397,7 +395,9 @@ mod tests { // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_without_size(f, Some(130649)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(130649)) + .await + .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -407,7 +407,9 @@ mod tests { // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_without_size(f, Some(130650)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(130650)) + .await + .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -417,7 +419,9 @@ mod tests { // Prefetch more than enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load_without_size(f, Some(131651)).await.unwrap(); + let mut loader = MetadataLoader::load_without_size(f, Some(131651)) + .await + .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -437,9 +441,7 @@ mod tests { // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(1729)) - .await - .unwrap(); + let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -449,9 +451,7 @@ mod tests { // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130649)) - .await - .unwrap(); + let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -461,9 +461,7 @@ mod tests { // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130650)) - .await - .unwrap(); + let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); @@ -473,9 +471,7 @@ mod tests { // Prefetch more than enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(131651)) - .await - .unwrap(); + let mut loader = MetadataLoader::load(f, len, Some(131651)).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); loader.load_page_index(true, true).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 2a59d3f95dac..dfe2d69ce8a7 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,8 +22,8 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; -use object_store::{GetOptions, ObjectMeta}; use object_store::{path::Path, ObjectStore}; +use object_store::{GetOptions, ObjectMeta}; use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use crate::errors::Result; @@ -217,7 +217,8 @@ mod tests { let mut location = Path::from("alltypes_plain.parquet"); let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new_without_size(Arc::clone(&store), location.clone()); + let object_reader = + ParquetObjectReader::new_without_size(Arc::clone(&store), location.clone()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap();