Skip to content

Commit

Permalink
Support the custom terminator for the CSV file format (apache#12263)
Browse files Browse the repository at this point in the history
* add terminator config to CsvConfig

* add test and fix missing builder

* remove the debug message and fix the doc

* support EscapedStringLiteral

* add create external table tests

* refactor the error assertion

* add issue reference
  • Loading branch information
goldmedal authored Sep 5, 2024
1 parent ab1e3e2 commit 008c942
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 3 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async fn main() -> Result<()> {
true,
b',',
b'"',
None,
object_store,
Some(b'#'),
);
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,7 @@ config_namespace! {
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub terminator: Option<u8>, default = None
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
Expand Down Expand Up @@ -1672,6 +1673,13 @@ impl CsvOptions {
self
}

/// The character that terminates a row.
/// - default to None (CRLF)
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}

/// The escape character in a row.
/// - default is None
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
Expand Down Expand Up @@ -1718,6 +1726,11 @@ impl CsvOptions {
self.quote
}

/// The terminator character.
pub fn terminator(&self) -> Option<u8> {
self.terminator
}

/// The escape character.
pub fn escape(&self) -> Option<u8> {
self.escape
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ impl CsvFormat {
self
}

/// The character used to indicate the end of a row.
/// - default to None (CRLF)
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.options.terminator = terminator;
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
Expand Down Expand Up @@ -359,6 +366,7 @@ impl FileFormat for CsvFormat {
.with_has_header(has_header)
.with_delimeter(self.options.delimiter)
.with_quote(self.options.quote)
.with_terminator(self.options.terminator)
.with_escape(self.options.escape)
.with_comment(self.options.comment)
.with_newlines_in_values(newlines_in_values)
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct CsvReadOptions<'a> {
pub delimiter: u8,
/// An optional quote character. Defaults to `b'"'`.
pub quote: u8,
/// An optional terminator character. Defaults to None (CRLF).
pub terminator: Option<u8>,
/// An optional escape character. Defaults to None.
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
Expand Down Expand Up @@ -102,6 +104,7 @@ impl<'a> CsvReadOptions<'a> {
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
quote: b'"',
terminator: None,
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
Expand Down Expand Up @@ -136,6 +139,12 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specify terminator to use for CSV read
pub fn terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}

/// Specify delimiter to use for CSV read
pub fn escape(mut self, escape: u8) -> Self {
self.escape = Some(escape);
Expand Down Expand Up @@ -511,6 +520,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
Expand Down
Loading

0 comments on commit 008c942

Please sign in to comment.