From 6a7c341484fdc240764ec136748a232cfbe96654 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Fri, 6 Dec 2024 07:28:43 +0200 Subject: [PATCH] fix: repartitioned reads of CSV with custom line terminator --- .../core/src/datasource/physical_plan/csv.rs | 4 ++- .../core/src/datasource/physical_plan/json.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 11 +++--- .../sqllogictest/test_files/csv_files.slt | 36 ++++++++++++++----- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 0c41f69c76916..797be637f26a3 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -612,11 +612,13 @@ impl FileOpener for CsvOpener { } let store = Arc::clone(&self.config.object_store); + let terminator = self.config.terminator.clone(); Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) - let calculated_range = calculate_range(&file_meta, &store).await?; + let calculated_range = + calculate_range(&file_meta, &store, terminator).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index c07e8ca74543b..5c70968fbb423 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -273,7 +273,7 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = calculate_range(&file_meta, &store).await?; + let calculated_range = calculate_range(&file_meta, &store, None).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 449b7bb435195..3146d124d9f13 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -426,9 +426,11 @@ enum RangeCalculation { async fn calculate_range( file_meta: &FileMeta, store: &Arc, + terminator: Option, ) -> Result { let location = file_meta.location(); let file_size = file_meta.object_meta.size; + let newline = terminator.unwrap_or(b'\n'); match file_meta.range { None => Ok(RangeCalculation::Range(None)), @@ -436,13 +438,13 @@ async fn calculate_range( let (start, end) = (start as usize, end as usize); let start_delta = if start != 0 { - find_first_newline(store, location, start - 1, file_size).await? + find_first_newline(store, location, start - 1, file_size, newline).await? } else { 0 }; let end_delta = if end != file_size { - find_first_newline(store, location, end - 1, file_size).await? + find_first_newline(store, location, end - 1, file_size, newline).await? } else { 0 }; @@ -462,7 +464,7 @@ async fn calculate_range( /// within an object, such as a file, in an object store. /// /// This function scans the contents of the object starting from the specified `start` position -/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character. +/// up to the `end` position, looking for the first occurrence of a newline character. /// It returns the position of the first newline relative to the start of the range. /// /// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range. @@ -474,6 +476,7 @@ async fn find_first_newline( location: &Path, start: usize, end: usize, + newline: u8, ) -> Result { let options = GetOptions { range: Some(GetRange::Bounded(start..end)), @@ -486,7 +489,7 @@ async fn find_first_newline( let mut index = 0; while let Some(chunk) = result_stream.next().await.transpose()? { - if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') { + if let Some(position) = chunk.iter().position(|&byte| byte == newline) { return Ok(index + position); } diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 01d0f4ac39bd4..5906c6a19bb81 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -350,15 +350,33 @@ col2 TEXT LOCATION '../core/tests/data/cr_terminator.csv' OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true'); -# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid -# See the issue: https://github.com/apache/datafusion/issues/12328 -# query TT -# select * from stored_table_with_cr_terminator; -# ---- -# id0 value0 -# id1 value1 -# id2 value2 -# id3 value3 +# Check single-thread reading of CSV with custom line terminator +statement ok +SET datafusion.optimizer.repartition_file_min_size = 10485760; + +query TT +select * from stored_table_with_cr_terminator; +---- +id0 value0 +id1 value1 +id2 value2 +id3 value3 + +# Check repartitioned reading of CSV with custom line terminator +statement ok +SET datafusion.optimizer.repartition_file_min_size = 1; + +query TT +select * from stored_table_with_cr_terminator order by col1; +---- +id0 value0 +id1 value1 +id2 value2 +id3 value3 + +# Reset repartition_file_min_size to default value +statement ok +SET datafusion.optimizer.repartition_file_min_size = 10485760; statement ok drop table stored_table_with_cr_terminator;