diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 7391d0964646..b5b5b04dbd0e 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -82,7 +82,7 @@ serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } -object_store = { version = "0.10.0", default-features = false, features = ["azure"] } +object_store = { version = "0.10.0", default-features = false, features = ["azure", "aws"] } # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..07e47eb96d6e 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -167,7 +167,7 @@ impl InMemoryRowGroup { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); let data = reader - .get_bytes(start as usize..(start + len) as usize) + .get_bytes((start as usize..(start + len) as usize).into()) .await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 9224ea3f68a8..98b3f70b63c4 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::async_reader::AsyncFileReader; +use crate::arrow::async_reader::{AsyncFileReader, GetRange}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; @@ -25,15 +25,14 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; use std::future::Future; -use std::ops::Range; /// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] pub trait MetadataFetch { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result>; } impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.get_bytes(range) } } @@ -52,6 +51,39 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters + pub async fn load_without_size(mut fetch: F, prefetch: Option) -> Result { + let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?; + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let length = decode_footer(&footer)?; + + // Did not fetch the entire file metadata in the initial read, need to make a second request + let (metadata, remainder) = if length > suffix_len - 8 { + let metadata_offset = length + 8; + let meta = fetch.fetch(GetRange::Suffix(metadata_offset)).await?; + let slice = &meta[0..length]; + (decode_metadata(slice)?, None) + } else { + let metadata_offset = length + 8; + let metadata_start = suffix_len - metadata_offset; + + let slice = &suffix[metadata_start..suffix_len - 8]; + ( + decode_metadata(slice)?, + Some((0, suffix.slice(..metadata_start))), + ) + }; + + Ok(Self { + fetch, + metadata, + remainder, + }) + } + pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { if file_size < 8 { return Err(ParquetError::EOF(format!( @@ -67,7 +99,7 @@ impl MetadataLoader { file_size - 8 }; - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch((footer_start..file_size).into()).await?; let suffix_len = suffix.len(); let mut footer = [0; 8]; @@ -86,18 +118,18 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - 8 { let metadata_start = file_size - length - 8; - let meta = fetch.fetch(metadata_start..file_size - 8).await?; + let meta = fetch.fetch((metadata_start..file_size - 8).into()).await?; (decode_metadata(&meta)?, None) } else { - let metadata_start = file_size - length - 8 - footer_start; + let metadata_offset = length + 8; + let metadata_start = suffix_len - metadata_offset; let slice = &suffix[metadata_start..suffix_len - 8]; ( decode_metadata(slice)?, - Some((footer_start, suffix.slice(..metadata_start))), + Some((0, suffix.slice(..metadata_start))), ) }; - Ok(Self { fetch, metadata, @@ -133,13 +165,15 @@ impl MetadataLoader { Some(range) => range, }; + let page_index_len = range.end - range.start; + // TODO: determine if _remainder_start is needed even in the non-suffix request case let data = match &self.remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - remainder.slice(offset..range.end - *remainder_start + offset) + Some((_remainder_start, remainder)) if remainder.len() >= page_index_len => { + let offset = remainder.len() - page_index_len; + remainder.slice(offset..) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => self.fetch.fetch(range.start..range.end).await?, + _ => self.fetch.fetch((range.start..range.end).into()).await?, }; // Sanity check @@ -200,10 +234,10 @@ struct MetadataFetchFn(F); impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(GetRange) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -226,15 +260,18 @@ where /// significantly reduce the number of `fetch` requests, and consequently latency pub async fn fetch_parquet_metadata( fetch: F, - file_size: usize, + file_size: Option, prefetch: Option, ) -> Result where - F: FnMut(Range) -> Fut + Send, + F: FnMut(GetRange) -> Fut + Send, Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; + let loader = match file_size { + Some(file_size) => MetadataLoader::load(fetch, file_size, prefetch).await?, + None => MetadataLoader::load_without_size(fetch, prefetch).await?, + }; Ok(loader.finish()) } @@ -247,7 +284,13 @@ mod tests { use std::io::{Read, Seek, SeekFrom}; use std::sync::atomic::{AtomicUsize, Ordering}; - fn read_range(file: &mut File, range: Range) -> Result { + fn read_range(file: &mut File, range: GetRange) -> Result { + let file_size = file.len().try_into().unwrap(); + let range = match range { + GetRange::Bounded(range) => range, + GetRange::Offset(offset) => offset..file_size, + GetRange::Suffix(end_offset) => file_size.saturating_sub(end_offset)..file_size, + }; file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; let mut buf = Vec::with_capacity(len); @@ -268,14 +311,23 @@ mod tests { fetch_count.fetch_add(1, Ordering::SeqCst); futures::future::ready(read_range(&mut file, range)) }; + // Known file size, unknown metadata size + let actual = fetch_parquet_metadata(&mut fetch, Some(len), None) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); - let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap(); + fetch_count.store(0, Ordering::SeqCst); + let actual = fetch_parquet_metadata(&mut fetch, None, None) + .await + .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); // Metadata hint too small fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(10)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(10)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -283,7 +335,7 @@ mod tests { // Metadata hint too large fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(500)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(500)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -291,19 +343,19 @@ mod tests { // Metadata hint exactly correct fetch_count.store(0, Ordering::SeqCst); - let actual = fetch_parquet_metadata(&mut fetch, len, Some(428)) + let actual = fetch_parquet_metadata(&mut fetch, None, Some(428)) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let err = fetch_parquet_metadata(&mut fetch, 4, None) + let err = fetch_parquet_metadata(&mut fetch, Some(4), None) .await .unwrap_err() .to_string(); assert_eq!(err, "EOF: file size of 4 is less than footer"); - let err = fetch_parquet_metadata(&mut fetch, 20, None) + let err = fetch_parquet_metadata(&mut fetch, Some(20), None) .await .unwrap_err() .to_string(); @@ -320,6 +372,64 @@ mod tests { futures::future::ready(read_range(&mut file, range)) }; + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_without_size(f, None).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 3); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch just footer exactly + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_without_size(f, Some(1729)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than footer but not enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_without_size(f, Some(130649)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch exactly enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_without_size(f, Some(130650)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load_without_size(f, Some(131651)) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Known-size file + fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); let mut loader = MetadataLoader::load(f, len, None).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); @@ -357,5 +467,15 @@ mod tests { assert_eq!(fetch_count.load(Ordering::SeqCst), 1); let metadata = loader.finish(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = MetadataLoader::load(f, len, Some(131651)).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + loader.load_page_index(true, true).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + let metadata = loader.finish(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5695dbc10fe1..6e2d3d747a6a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -79,6 +79,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; use std::io::SeekFrom; use std::ops::Range; +use std::ops::RangeBounds; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -121,10 +122,41 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum GetRange { + /// Request a specific range of bytes + /// + /// If the given range is zero-length or starts after the end of the object, + /// an error will be returned. Additionally, if the range ends after the end + /// of the object, the entire remainder of the object will be returned. + /// Otherwise, the exact requested range will be returned. + Bounded(Range), + /// Request all bytes starting from a given byte offset + Offset(usize), + /// Request up to the last n bytes + Suffix(usize), +} + +impl> From for GetRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i + 1, + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => Self::Bounded(first..(i + 1)), + Excluded(i) => Self::Bounded(first..*i), + Unbounded => Self::Offset(first), + } + } +} + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { @@ -132,7 +164,7 @@ pub trait AsyncFileReader: Send { let mut result = Vec::with_capacity(ranges.len()); for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; + let data = self.get_bytes(range.into()).await?; result.push(data); } @@ -148,7 +180,7 @@ pub trait AsyncFileReader: Send { } impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } @@ -162,15 +194,31 @@ impl AsyncFileReader for Box { } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; - - let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { - return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); + let to_read = match range { + GetRange::Suffix(end_offset) => { + self.seek(SeekFrom::End(-(end_offset as i64))).await?; + Some(end_offset) + } + GetRange::Offset(offset) => { + self.seek(SeekFrom::Start(offset as u64)).await?; + None + } + GetRange::Bounded(range) => { + self.seek(SeekFrom::Start(range.start as u64)).await?; + Some(range.end - range.start) + } + }; + // TODO: figure out a better alternative for Offset ranges + let mut buffer = Vec::with_capacity(to_read.unwrap_or(1_024usize)); + if let Some(to_read) = to_read { + let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); + } + } else { + self.read_to_end(&mut buffer).await?; } Ok(buffer.into()) @@ -358,11 +406,14 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self + .input + .0 + .get_bytes((offset..offset + length as usize).into()), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes((offset..offset + SBBF_HEADER_SIZE_ESTIMATE).into()), } .await?; @@ -393,7 +444,9 @@ impl ParquetRecordBatchStreamBuilder { })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes( + (bitset_offset as usize..bitset_offset as usize + bitset_length).into(), + ) .await? } }; @@ -932,12 +985,19 @@ mod tests { struct TestReader { data: Bytes, metadata: Arc, - requests: Arc>>>, + requests: Arc>>, } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { self.requests.lock().unwrap().push(range.clone()); + let range = match range { + GetRange::Bounded(range) => range, + GetRange::Offset(offset) => offset..self.data.len(), + GetRange::Suffix(end_offset) => { + self.data.len().saturating_sub(end_offset)..self.data.len() + } + }; futures::future::ready(Ok(self.data.slice(range))).boxed() } @@ -995,8 +1055,8 @@ mod tests { assert_eq!( &requests[..], &[ - offset_1 as usize..(offset_1 + length_1) as usize, - offset_2 as usize..(offset_2 + length_2) as usize + GetRange::from(offset_1 as usize..(offset_1 + length_1) as usize), + GetRange::from(offset_2 as usize..(offset_2 + length_2) as usize) ] ); } @@ -1581,7 +1641,7 @@ mod tests { // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; - let mut expected_page_requests: Vec> = vec![]; + let mut expected_page_requests: Vec = vec![]; while let Some(page) = pages.next() { let num_rows = if let Some(next_page) = pages.peek() { next_page.first_row_index - page.first_row_index @@ -1595,7 +1655,7 @@ mod tests { selectors.push(RowSelector::select(num_rows as usize)); let start = page.offset as usize; let end = start + page.compressed_page_size as usize; - expected_page_requests.push(start..end); + expected_page_requests.push((start..end).into()); } skip = !skip; } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..dfe2d69ce8a7 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,24 +22,49 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{path::Path, ObjectStore}; +use object_store::{GetOptions, ObjectMeta}; use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use crate::errors::Result; use crate::file::metadata::ParquetMetaData; +use super::GetRange; + +impl From for object_store::GetRange { + fn from(range: GetRange) -> Self { + match range { + GetRange::Bounded(range) => object_store::GetRange::Bounded(range), + GetRange::Offset(offset) => object_store::GetRange::Offset(offset), + GetRange::Suffix(end_offset) => object_store::GetRange::Suffix(end_offset), + } + } +} + /// Reads Parquet files in object storage using [`ObjectStore`]. /// /// ```no_run /// # use std::io::stdout; /// # use std::sync::Arc; /// # use object_store::azure::MicrosoftAzureBuilder; +/// # use object_store::aws::AmazonS3Builder; /// # use object_store::ObjectStore; /// # use object_store::path::Path; /// # use parquet::arrow::async_reader::ParquetObjectReader; /// # use parquet::arrow::ParquetRecordBatchStreamBuilder; /// # use parquet::schema::printer::print_parquet_metadata; /// # async fn run() { +/// // Object Stores that support suffix ranges: +/// // Populate configuration from environment +/// let storage_container = Arc::new(AmazonS3Builder::from_env().build().unwrap()); +/// let location = Path::from("path/to/blob.parquet"); +/// +/// // Show Parquet metadata +/// let reader = ParquetObjectReader::new(storage_container, location); +/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); +/// print_parquet_metadata(&mut stdout(), builder.metadata()); +/// # } +/// # async fn run_non_suffixed() { /// // Populate configuration from environment /// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap()); /// let location = Path::from("path/to/blob.parquet"); @@ -47,34 +72,55 @@ use crate::file::metadata::ParquetMetaData; /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta); +/// let reader = ParquetObjectReader::new(storage_container, location).with_file_size(meta.size); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); +/// /// # } /// ``` #[derive(Clone, Debug)] pub struct ParquetObjectReader { store: Arc, - meta: ObjectMeta, + location: Path, metadata_size_hint: Option, + file_size: Option, preload_column_index: bool, preload_offset_index: bool, } impl ParquetObjectReader { - /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] - /// - /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`] pub fn new(store: Arc, meta: ObjectMeta) -> Self { Self { store, - meta, + location: meta.location, + file_size: Some(meta.size), + metadata_size_hint: None, + preload_column_index: false, + preload_offset_index: false, + } + } + + pub fn new_without_size(store: Arc, location: Path) -> Self { + Self { + store, + location, + file_size: None, metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, } } + /// Provide the size of the file, for object stores that do not support suffix ranges (e.g. Azure) + /// + /// file_size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn with_file_size(self, file_size: usize) -> Self { + Self { + file_size: Some(file_size), + ..self + } + } /// Provide a hint as to the size of the parquet file's footer, /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata) pub fn with_footer_size_hint(self, hint: usize) -> Self { @@ -102,11 +148,22 @@ impl ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.store - .get_range(&self.meta.location, range) - .map_err(|e| e.into()) - .boxed() + fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result> { + async move { + self.store + .get_opts( + &self.location, + GetOptions { + range: Some(range.into()), + ..Default::default() + }, + ) + .await? + .bytes() + .await + } + .map_err(|e| e.into()) + .boxed() } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> @@ -115,7 +172,7 @@ impl AsyncFileReader for ParquetObjectReader { { async move { self.store - .get_ranges(&self.meta.location, &ranges) + .get_ranges(&self.location, &ranges) .await .map_err(|e| e.into()) } @@ -126,9 +183,11 @@ impl AsyncFileReader for ParquetObjectReader { Box::pin(async move { let preload_column_index = self.preload_column_index; let preload_offset_index = self.preload_offset_index; - let file_size = self.meta.size; let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; + let mut loader = match self.file_size { + Some(file_size) => MetadataLoader::load(self, file_size, prefetch).await, + None => MetadataLoader::load_without_size(self, prefetch).await, + }?; loader .load_page_index(preload_column_index, preload_offset_index) .await?; @@ -155,14 +214,22 @@ mod tests { async fn test_simple() { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); + let mut location = Path::from("alltypes_plain.parquet"); - let mut meta = store - .head(&Path::from("alltypes_plain.parquet")) + let store = Arc::new(store) as Arc; + let object_reader = + ParquetObjectReader::new_without_size(Arc::clone(&store), location.clone()); + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); - let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone()); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 8); + + let meta = store.head(&location).await.unwrap(); + + let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -171,9 +238,9 @@ mod tests { assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 8); - meta.location = Path::from("I don't exist.parquet"); + location = Path::from("I don't exist.parquet"); - let object_reader = ParquetObjectReader::new(store, meta); + let object_reader = ParquetObjectReader::new_without_size(store, location); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"),