Skip to content

Commit

Permalink
fix: repartitioned reads of CSV with custom line terminator
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Dec 6, 2024
1 parent 2464703 commit 6a7c341
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 15 deletions.
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,13 @@ impl FileOpener for CsvOpener {
}

let store = Arc::clone(&self.config.object_store);
let terminator = self.config.terminator.clone();

Ok(Box::pin(async move {
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)

let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range =
calculate_range(&file_meta, &store, terminator).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl FileOpener for JsonOpener {
let file_compression_type = self.file_compression_type.to_owned();

Ok(Box::pin(async move {
let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range = calculate_range(&file_meta, &store, None).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,25 @@ enum RangeCalculation {
async fn calculate_range(
file_meta: &FileMeta,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = file_meta.location();
let file_size = file_meta.object_meta.size;
let newline = terminator.unwrap_or(b'\n');

match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size).await?
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};

let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size).await?
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
Expand All @@ -462,7 +464,7 @@ async fn calculate_range(
/// within an object, such as a file, in an object store.
///
/// This function scans the contents of the object starting from the specified `start` position
/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character.
/// up to the `end` position, looking for the first occurrence of a newline character.
/// It returns the position of the first newline relative to the start of the range.
///
/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
Expand All @@ -474,6 +476,7 @@ async fn find_first_newline(
location: &Path,
start: usize,
end: usize,
newline: u8,
) -> Result<usize> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
Expand All @@ -486,7 +489,7 @@ async fn find_first_newline(
let mut index = 0;

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
return Ok(index + position);
}

Expand Down
36 changes: 27 additions & 9 deletions datafusion/sqllogictest/test_files/csv_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,33 @@ col2 TEXT
LOCATION '../core/tests/data/cr_terminator.csv'
OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true');

# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
# See the issue: https://github.com/apache/datafusion/issues/12328
# query TT
# select * from stored_table_with_cr_terminator;
# ----
# id0 value0
# id1 value1
# id2 value2
# id3 value3
# Check single-thread reading of CSV with custom line terminator
statement ok
SET datafusion.optimizer.repartition_file_min_size = 10485760;

query TT
select * from stored_table_with_cr_terminator;
----
id0 value0
id1 value1
id2 value2
id3 value3

# Check repartitioned reading of CSV with custom line terminator
statement ok
SET datafusion.optimizer.repartition_file_min_size = 1;

query TT
select * from stored_table_with_cr_terminator order by col1;
----
id0 value0
id1 value1
id2 value2
id3 value3

# Reset repartition_file_min_size to default value
statement ok
SET datafusion.optimizer.repartition_file_min_size = 10485760;

statement ok
drop table stored_table_with_cr_terminator;

0 comments on commit 6a7c341

Please sign in to comment.