Skip to content

Commit

Permalink
feat(source): Add semicolon delimiter for CSV (#17356)
Browse files Browse the repository at this point in the history
Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored Jun 22, 2024
1 parent 70eb3d0 commit 785cf7f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
create source s with(connector='kafka') FORMAT PLAIN ENCODE JSON;
expected_outputs:
- planner_error
- id: csv_delimiter_tab
- id: csv_delimiter_comma
sql: |
explain create table s0 (v1 int, v2 varchar) with (
connector = 'kafka',
Expand All @@ -23,6 +23,16 @@
) FORMAT PLAIN ENCODE CSV (delimiter = ',', without_header = true);
expected_outputs:
- explain_output
- id: csv_delimiter_semicolon
sql: |
explain create table s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_csv_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (delimiter = ';', without_header = true);
expected_outputs:
- explain_output
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
sql: |
create source s with(connector='kafka') FORMAT PLAIN ENCODE JSON;
planner_error: 'Protocol error: Schema definition is required, either from SQL or schema registry.'
- id: csv_delimiter_tab
- id: csv_delimiter_comma
sql: |
explain create table s0 (v1 int, v2 varchar) with (
connector = 'kafka',
Expand All @@ -28,6 +28,23 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: csv_delimiter_semicolon
sql: |
explain create table s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_csv_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (delimiter = ';', without_header = true);
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: s0, columns: [v1, v2, _row_id] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand Down
3 changes: 2 additions & 1 deletion src/sqlparser/src/ast/legacy_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,10 @@ pub struct CsvInfo {
pub fn get_delimiter(chars: &str) -> Result<u8, StrError> {
match chars {
"," => Ok(b','), // comma
";" => Ok(b';'), // semicolon
"\t" => Ok(b'\t'), // tab
other => Err(StrError(format!(
"The delimiter should be one of ',', E'\\t', but got {other:?}",
"The delimiter should be one of ',', ';', E'\\t', but got {other:?}",
))),
}
}
Expand Down

0 comments on commit 785cf7f

Please sign in to comment.