Skip to content

Commit

Permalink
fix prefetch of page index (#6999)
Browse files Browse the repository at this point in the history
* fix prefetch of page index

* move to assertion

* fmt

* less invasive version

* typo

* fmt
  • Loading branch information
adriangb authored Jan 22, 2025
1 parent 1664214 commit ffeda12
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ impl ParquetMetaDataReader {
let bytes = match &remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
let end = offset + range.end - range.start;
assert!(end <= remainder.len());
remainder.slice(offset..end)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
Expand Down Expand Up @@ -1052,5 +1054,41 @@ mod async_tests {
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough but less than the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len - 1000)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len + 1000)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}

0 comments on commit ffeda12

Please sign in to comment.