diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java index 636574a19ba9..68958480f955 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java @@ -20,16 +20,12 @@ import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -53,284 +49,199 @@ public class CSVRecordReader implements RecordReader { private static final Logger LOGGER = LoggerFactory.getLogger(CSVRecordReader.class); - private File _dataFile; - private CSVFormat _format; - private CSVParser _parser; - private Iterator _iterator; - private CSVRecordExtractor _recordExtractor; - private Map _headerMap = new HashMap<>(); + private static final Map CSV_FORMAT_MAP = new HashMap<>(); - private BufferedReader _bufferedReader; - private CSVRecordReaderConfig _config = null; + static { + for (CSVFormat.Predefined format : CSVFormat.Predefined.values()) { + CSV_FORMAT_MAP.put(canonicalize(format.name()), format.getFormat()); + } + } - public CSVRecordReader() { + private static String canonicalize(String format) { + return StringUtils.remove(format, '_').toUpperCase(); } - private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) { - if (config.getFileFormat() == null) { + private static CSVFormat getCSVFormat(@Nullable String format) { + if (format == null) { return CSVFormat.DEFAULT; } - switch (config.getFileFormat().toUpperCase()) { - case "EXCEL": - return CSVFormat.EXCEL; - case "MYSQL": - return CSVFormat.MYSQL; - case "RFC4180": - return CSVFormat.RFC4180; - case "TDF": - return CSVFormat.TDF; - default: - return CSVFormat.DEFAULT; - } - } - - private static Map parseHeaderMapFromLine(CSVFormat format, String line) { - try (StringReader stringReader = new StringReader(line)) { - try (CSVParser parser = format.parse(stringReader)) { - return parser.getHeaderMap(); - } - } catch (IOException e) { - throw new RuntimeException("Failed to parse header from line: " + line, e); + CSVFormat csvFormat = CSV_FORMAT_MAP.get(canonicalize(format)); + if (csvFormat != null) { + return csvFormat; + } else { + LOGGER.warn("Failed to find CSV format for: {}, using DEFAULT format", format); + return CSVFormat.DEFAULT; } } - private static Character getMultiValueDelimiter(CSVRecordReaderConfig config) { - if (config == null) { - return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; - } else if (config.isMultiValueDelimiterEnabled()) { - return config.getMultiValueDelimiter(); - } - return null; - } + private File _dataFile; + private CSVRecordReaderConfig _config; + private CSVFormat _format; + private BufferedReader _reader; + private CSVParser _parser; + private List _columns; + private Iterator _iterator; + private CSVRecordExtractor _recordExtractor; - private static boolean useLineIterator(CSVRecordReaderConfig config) { - return config != null && config.isSkipUnParseableLines(); - } + // Following fields are used to handle exceptions in hasNext() method + private int _nextLineId; + private int _numSkippedLines; + private RuntimeException _exceptionInHasNext; + private CSVFormat _recoveryFormat; @Override public void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { _dataFile = dataFile; - _config = (CSVRecordReaderConfig) recordReaderConfig; - _format = createCSVFormat(); - - // If header is provided by the client, use it. Otherwise, parse the header from the first line of the file. - // Overwrite the format with the header information. - Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header -> { - _headerMap = parseHeaderMapFromLine(_format, header); - _format = _format.builder().setHeader(_headerMap.keySet().toArray(new String[0])).build(); - }); - - validateHeaderWithDelimiter(); - initIterator(); - initRecordExtractor(fieldsToRead); - } - - private void initRecordExtractor(Set fieldsToRead) { - final CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); - recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config)); - recordExtractorConfig.setColumnNames(_headerMap.keySet()); - _recordExtractor = new CSVRecordExtractor(); - _recordExtractor.init(fieldsToRead, recordExtractorConfig); - } - - private CSVFormat createCSVFormat() { - if (_config == null) { - return CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); + _config = recordReaderConfig != null ? (CSVRecordReaderConfig) recordReaderConfig : new CSVRecordReaderConfig(); + _format = getCSVFormat(); + _reader = RecordReaderUtils.getBufferedReader(_dataFile); + _parser = _format.parse(_reader); + _columns = _parser.getHeaderNames(); + _iterator = _parser.iterator(); + _recordExtractor = getRecordExtractor(fieldsToRead); + _nextLineId = (int) _parser.getCurrentLineNumber(); + + // Read the first record, and validate if the header uses the configured delimiter + // (address https://github.com/apache/pinot/issues/7187) + boolean hasNext; + try { + hasNext = _iterator.hasNext(); + } catch (RuntimeException e) { + throw new IOException("Failed to read first record from file: " + _dataFile, e); + } + if (hasNext) { + CSVRecord record = _iterator.next(); + if (record.size() > 1 && _columns.size() <= 1) { + throw new IllegalStateException("Header does not contain the configured delimiter"); + } + _reader.close(); + _reader = RecordReaderUtils.getBufferedReader(_dataFile); + _parser = _format.parse(_reader); + _iterator = _parser.iterator(); } + } - final CSVFormat.Builder builder = baseCsvFormat(_config).builder() + private CSVFormat getCSVFormat() { + CSVFormat.Builder builder = getCSVFormat(_config.getFileFormat()).builder() + .setHeader() // Parse header from the file .setDelimiter(_config.getDelimiter()) - .setHeader() - .setSkipHeaderRecord(_config.isSkipHeader()) - .setCommentMarker(_config.getCommentMarker()) - .setEscape(_config.getEscapeCharacter()) .setIgnoreEmptyLines(_config.isIgnoreEmptyLines()) .setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces()) .setQuote(_config.getQuoteCharacter()); - - Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode); - Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator); - Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString); - - return builder.build(); - } - - private void initIterator() - throws IOException { - if (useLineIterator(_config)) { - _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 32); // 32KB buffer size - _iterator = new LineIterator(); - } else { - _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); - _headerMap = _parser.getHeaderMap(); - _iterator = _parser.iterator(); + if (_config.getCommentMarker() != null) { + builder.setCommentMarker(_config.getCommentMarker()); } - } - - private void validateHeaderWithDelimiter() - throws IOException { - if (_config == null || _config.getHeader() == null || useLineIterator(_config)) { - return; + if (_config.getEscapeCharacter() != null) { + builder.setEscape(_config.getEscapeCharacter()); } - final CSVParser parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); - final Iterator iterator = parser.iterator(); - if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && delimiterNotPresentInHeader( - _config.getDelimiter(), _config.getHeader())) { - throw new IllegalArgumentException("Configured header does not contain the configured delimiter"); + if (_config.getNullStringValue() != null) { + builder.setNullString(_config.getNullStringValue()); } + if (_config.getQuoteMode() != null) { + builder.setQuoteMode(QuoteMode.valueOf(_config.getQuoteMode())); + } + if (_config.getRecordSeparator() != null) { + builder.setRecordSeparator(_config.getRecordSeparator()); + } + CSVFormat format = builder.build(); + String header = _config.getHeader(); + if (header == null) { + return format; + } + // Parse header using the current format, and set it into the builder + try (CSVParser parser = CSVParser.parse(header, format)) { + format = builder.setHeader(parser.getHeaderNames().toArray(new String[0])) + .setSkipHeaderRecord(_config.isSkipHeader()).build(); + } catch (IOException e) { + throw new RuntimeException("Failed to parse header from line: " + header, e); + } + return format; } - private boolean recordHasMultipleValues(CSVRecord record) { - return record.size() > 1; - } - - private boolean delimiterNotPresentInHeader(char delimiter, String csvHeader) { - return !StringUtils.contains(csvHeader, delimiter); + private CSVRecordExtractor getRecordExtractor(@Nullable Set fieldsToRead) { + CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); + if (_config.isMultiValueDelimiterEnabled()) { + recordExtractorConfig.setMultiValueDelimiter(_config.getMultiValueDelimiter()); + } + recordExtractorConfig.setColumnNames(new HashSet<>(_columns)); + CSVRecordExtractor recordExtractor = new CSVRecordExtractor(); + recordExtractor.init(fieldsToRead, recordExtractorConfig); + return recordExtractor; } - /** - * Returns a copy of the header map that iterates in column order. - *

- * The map keys are column names. The map values are 0-based indices. - *

- * @return a copy of the header map that iterates in column order. - */ - public Map getCSVHeaderMap() { - // if header row is not configured and input file doesn't contain a valid header record, the returned map would - // contain values from the first row in the input file. - return _headerMap; + public List getColumns() { + return _columns; } @Override public boolean hasNext() { - return _iterator.hasNext(); + try { + return _iterator.hasNext(); + } catch (RuntimeException e) { + if (_config.isStopOnError()) { + LOGGER.warn("Caught exception while reading CSV file: {}, stopping processing", _dataFile, e); + return false; + } else { + // Cache exception here and throw it in next() method + _exceptionInHasNext = e; + return true; + } + } } @Override public GenericRow next(GenericRow reuse) throws IOException { + if (_exceptionInHasNext != null) { + // When hasNext() throws an exception, recreate the reader and skip to the next line, then throw the exception + // TODO: This is very expensive. Consider marking the reader then reset it. The challenge here is that the reader + // offset is not the same as parsed offset, and we need to mark at the correct offset. + _reader.close(); + _reader = RecordReaderUtils.getBufferedReader(_dataFile); + _numSkippedLines = _nextLineId + 1; + for (int i = 0; i < _numSkippedLines; i++) { + _reader.readLine(); + } + _nextLineId = _numSkippedLines; + // Create recovery format if not created yet. Recovery format has header preset, and does not skip header record. + if (_recoveryFormat == null) { + _recoveryFormat = + _format.builder().setHeader(_columns.toArray(new String[0])).setSkipHeaderRecord(false).build(); + } + _parser = _recoveryFormat.parse(_reader); + _iterator = _parser.iterator(); + + RuntimeException exception = _exceptionInHasNext; + _exceptionInHasNext = null; + LOGGER.warn("Caught exception while reading CSV file: {}, recovering from line: {}", _dataFile, _numSkippedLines, + exception); + + throw exception; + } + CSVRecord record = _iterator.next(); _recordExtractor.extract(record, reuse); + _nextLineId = _numSkippedLines + (int) _parser.getCurrentLineNumber(); return reuse; } @Override public void rewind() throws IOException { - if (_parser != null && !_parser.isClosed()) { - _parser.close(); - } - closeIterator(); - initIterator(); + _reader.close(); + _reader = RecordReaderUtils.getBufferedReader(_dataFile); + _parser = _format.parse(_reader); + _iterator = _parser.iterator(); + _nextLineId = (int) _parser.getCurrentLineNumber(); + _numSkippedLines = 0; } @Override public void close() throws IOException { - closeIterator(); - - if (_parser != null && !_parser.isClosed()) { - _parser.close(); - } - } - - private void closeIterator() - throws IOException { - // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized - // once in the constructor - if (useLineIterator(_config) && _config.getHeader() == null) { - _headerMap.clear(); - } - - if (_bufferedReader != null) { - _bufferedReader.close(); - } - } - - class LineIterator implements Iterator { - private String _nextLine; - private CSVRecord _current; - - public LineIterator() { - init(); - } - - private void init() { - try { - if (_config.getHeader() != null) { - if (_config.isSkipHeader()) { - // When skip header config is set and header is supplied – skip the first line from the input file - _bufferedReader.readLine(); - // turn off the property so that it doesn't interfere with further parsing - _format = _format.builder().setSkipHeaderRecord(false).build(); - } - } else { - // read the first line - String headerLine = _bufferedReader.readLine(); - _headerMap = parseHeaderMapFromLine(_format, headerLine); - // If header isn't provided, the first line would be set as header and the 'skipHeader' property - // is set to false. - _format = _format.builder() - .setSkipHeaderRecord(false) - .setHeader(_headerMap.keySet().toArray(new String[0])) - .build(); - } - _nextLine = _bufferedReader.readLine(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private CSVRecord getNextRecord() { - while (_nextLine != null) { - try (Reader reader = new StringReader(_nextLine)) { - try (CSVParser csvParser = _format.parse(reader)) { - List csvRecords = csvParser.getRecords(); - if (csvRecords == null || csvRecords.isEmpty()) { - // Can be thrown on: 1) Empty lines 2) Commented lines - throw new NoSuchElementException("Failed to find any records"); - } - // There would be only one record as lines are read one after the other - CSVRecord csvRecord = csvRecords.get(0); - - // move the pointer to the next line - _nextLine = _bufferedReader.readLine(); - return csvRecord; - } catch (Exception e) { - // Find the next line that can be parsed - _nextLine = _bufferedReader.readLine(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return null; - } - - @Override - public boolean hasNext() { - if (_current == null) { - _current = getNextRecord(); - } - - return _current != null; - } - - @Override - public CSVRecord next() { - CSVRecord next = _current; - _current = null; - - if (next == null) { - // hasNext() wasn't called before - next = getNextRecord(); - if (next == null) { - throw new NoSuchElementException("No more CSV records available"); - } - } - - return next; + if (_reader != null) { + _reader.close(); } } } diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java index bdc4ae2ed8ef..6d99f9712e1c 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java @@ -37,13 +37,14 @@ public class CSVRecordReaderConfig implements RecordReaderConfig { private Character _escapeCharacter; // Default is null private String _nullStringValue; private boolean _skipHeader; - private boolean _skipUnParseableLines = false; private boolean _ignoreEmptyLines = true; private boolean _ignoreSurroundingSpaces = true; private Character _quoteCharacter = '"'; private String _quoteMode; private String _recordSeparator; + // When set to true, the record reader will stop processing the file if it encounters an error. + private boolean _stopOnError; public String getFileFormat() { return _fileFormat; @@ -77,14 +78,6 @@ public void setMultiValueDelimiter(char multiValueDelimiter) { _multiValueDelimiter = multiValueDelimiter; } - public boolean isSkipUnParseableLines() { - return _skipUnParseableLines; - } - - public void setSkipUnParseableLines(boolean skipUnParseableLines) { - _skipUnParseableLines = skipUnParseableLines; - } - public boolean isMultiValueDelimiterEnabled() { return _multiValueDelimiterEnabled; } @@ -165,6 +158,14 @@ public void setRecordSeparator(String recordSeparator) { _recordSeparator = recordSeparator; } + public boolean isStopOnError() { + return _stopOnError; + } + + public void setStopOnError(boolean stopOnError) { + _stopOnError = stopOnError; + } + @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java index 6a1d86a48db3..12303657b032 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java @@ -21,11 +21,12 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.apache.commons.lang3.StringUtils; @@ -33,43 +34,49 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.data.readers.RecordReader; -import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + public class CSVRecordReaderTest extends AbstractRecordReaderTest { private static final char CSV_MULTI_VALUE_DELIMITER = '\t'; + private static final CSVRecordReaderConfig[] NULL_AND_EMPTY_CONFIGS = new CSVRecordReaderConfig[]{ + null, new CSVRecordReaderConfig() + }; @Override protected RecordReader createRecordReader(File file) throws Exception { - CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); - csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - CSVRecordReader csvRecordReader = new CSVRecordReader(); - csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig); - return csvRecordReader; + CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); + readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); + CSVRecordReader recordReader = new CSVRecordReader(); + recordReader.init(file, _sourceFields, readerConfig); + return recordReader; } @Override - protected void writeRecordsToFile(List> recordsToWrite) + protected void writeRecordsToFile(List> records) throws Exception { - Schema pinotSchema = getPinotSchema(); - String[] columns = pinotSchema.getColumnNames().toArray(new String[0]); - try (FileWriter fileWriter = new FileWriter(_dataFile); - CSVPrinter csvPrinter = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(columns))) { - - for (Map r : recordsToWrite) { - Object[] record = new Object[columns.length]; - for (int i = 0; i < columns.length; i++) { - if (pinotSchema.getFieldSpecFor(columns[i]).isSingleValueField()) { - record[i] = r.get(columns[i]); + Schema schema = getPinotSchema(); + String[] columns = schema.getColumnNames().toArray(new String[0]); + int numColumns = columns.length; + try (CSVPrinter csvPrinter = new CSVPrinter(new FileWriter(_dataFile), + CSVFormat.Builder.create().setHeader(columns).build())) { + for (Map record : records) { + Object[] values = new Object[numColumns]; + for (int i = 0; i < numColumns; i++) { + if (schema.getFieldSpecFor(columns[i]).isSingleValueField()) { + values[i] = record.get(columns[i]); } else { - record[i] = StringUtils.join(((List) r.get(columns[i])).toArray(), CSV_MULTI_VALUE_DELIMITER); + values[i] = StringUtils.join(((List) record.get(columns[i])).toArray(), CSV_MULTI_VALUE_DELIMITER); } } - csvPrinter.printRecord(record); + csvPrinter.printRecord(values); } } } @@ -80,590 +87,572 @@ protected String getDataFileName() { } @Override - protected void checkValue(RecordReader recordReader, List> expectedRecordsMap, + protected void checkValue(RecordReader recordReader, List> expectedRecords, List expectedPrimaryKeys) throws Exception { - for (int i = 0; i < expectedRecordsMap.size(); i++) { - Map expectedRecord = expectedRecordsMap.get(i); + int numRecords = expectedRecords.size(); + for (int i = 0; i < numRecords; i++) { + Map expectedRecord = expectedRecords.get(i); GenericRow actualRecord = recordReader.next(); for (FieldSpec fieldSpec : _pinotSchema.getAllFieldSpecs()) { - String fieldSpecName = fieldSpec.getName(); + String column = fieldSpec.getName(); if (fieldSpec.isSingleValueField()) { - Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(), - expectedRecord.get(fieldSpecName).toString()); + assertEquals(actualRecord.getValue(column).toString(), expectedRecord.get(column).toString()); } else { - List expectedRecords = (List) expectedRecord.get(fieldSpecName); - if (expectedRecords.size() == 1) { - Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(), expectedRecords.get(0).toString()); + List expectedValues = (List) expectedRecord.get(column); + if (expectedValues.size() == 1) { + assertEquals(actualRecord.getValue(column).toString(), expectedValues.get(0).toString()); } else { - Object[] actualRecords = (Object[]) actualRecord.getValue(fieldSpecName); - Assert.assertEquals(actualRecords.length, expectedRecords.size()); - for (int j = 0; j < actualRecords.length; j++) { - Assert.assertEquals(actualRecords[j].toString(), expectedRecords.get(j).toString()); + Object[] actualValues = (Object[]) actualRecord.getValue(column); + assertEquals(actualValues.length, expectedValues.size()); + for (int j = 0; j < actualValues.length; j++) { + assertEquals(actualValues[j].toString(), expectedValues.get(j).toString()); } } } - PrimaryKey primaryKey = actualRecord.getPrimaryKey(getPrimaryKeyColumns()); - for (int j = 0; j < primaryKey.getValues().length; j++) { - Assert.assertEquals(primaryKey.getValues()[j].toString(), expectedPrimaryKeys.get(i)[j].toString()); + Object[] expectedPrimaryKey = expectedPrimaryKeys.get(i); + Object[] actualPrimaryKey = actualRecord.getPrimaryKey(getPrimaryKeyColumns()).getValues(); + for (int j = 0; j < actualPrimaryKey.length; j++) { + assertEquals(actualPrimaryKey[j].toString(), expectedPrimaryKey[j].toString()); } } } - Assert.assertFalse(recordReader.hasNext()); + assertFalse(recordReader.hasNext()); } @Test - public void testInvalidDelimiterInHeader() { - // setup - CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); - csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - csvRecordReaderConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10"); - csvRecordReaderConfig.setDelimiter(','); - CSVRecordReader csvRecordReader = new CSVRecordReader(); - - //execute and assert - Assert.assertThrows(IllegalArgumentException.class, - () -> csvRecordReader.init(_dataFile, null, csvRecordReaderConfig)); + public void testInvalidDelimiterInHeader() + throws IOException { + CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); + readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); + readerConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10"); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + assertThrows(IllegalStateException.class, () -> recordReader.init(_dataFile, null, readerConfig)); + } } @Test public void testValidDelimiterInHeader() throws IOException { - //setup - CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); - csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - csvRecordReaderConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10"); - csvRecordReaderConfig.setDelimiter(','); - CSVRecordReader csvRecordReader = new CSVRecordReader(); - - //read all fields - //execute and assert - csvRecordReader.init(_dataFile, null, csvRecordReaderConfig); - Assert.assertEquals(10, csvRecordReader.getCSVHeaderMap().size()); - Assert.assertTrue(csvRecordReader.getCSVHeaderMap().containsKey("col1")); - Assert.assertTrue(csvRecordReader.hasNext()); - } - - /** - * When CSV records contain a single value, then no exception should be throw while initialising. - * This test requires a different setup from the rest of the tests as it requires a single-column - * CSV. Therefore, we re-write already generated records into a new file, but only the first - * column. - * - * @throws IOException - */ - @Test - public void testHeaderDelimiterSingleColumn() - throws IOException { - //setup - - //create a single value CSV - Schema pinotSchema = getPinotSchema(); - //write only the first column in the schema - String column = pinotSchema.getColumnNames().toArray(new String[0])[0]; - //use a different file name so that other tests aren't affected - File file = new File(_tempDir, "data1.csv"); - try (FileWriter fileWriter = new FileWriter(file); - CSVPrinter csvPrinter = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(column))) { - for (Map r : _records) { - Object[] record = new Object[1]; - record[0] = r.get(column); - csvPrinter.printRecord(record); - } + CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); + readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); + readerConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10"); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(_dataFile, null, readerConfig); + assertEquals(recordReader.getColumns(), + List.of("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10")); + assertTrue(recordReader.hasNext()); } - - CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); - csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - csvRecordReaderConfig.setHeader("col1"); - CSVRecordReader csvRecordReader = new CSVRecordReader(); - - //execute and assert - csvRecordReader.init(file, null, csvRecordReaderConfig); - Assert.assertTrue(csvRecordReader.hasNext()); } @Test - public void testNullValueString() - throws IOException { - //setup - String nullString = "NULL"; - //create a single value CSV - Schema pinotSchema = getPinotSchema(); - //write only the first column in the schema - String column = pinotSchema.getColumnNames().toArray(new String[0])[0]; - //use a different file name so that other tests aren't affected - File file = new File(_tempDir, "data1.csv"); - try (FileWriter fileWriter = new FileWriter(file); - CSVPrinter csvPrinter = new CSVPrinter(fileWriter, - CSVFormat.DEFAULT.withHeader("col1", "col2", "col3").withNullString(nullString))) { - for (Map r : _records) { - Object[] record = new Object[3]; - record[0] = r.get(column); - csvPrinter.printRecord(record); - } + public void testReadingDataFileBasic() + throws IOException { + File dataFile = getDataFile("dataFileBasic.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "name", "John"), + createMap("id", "101", "name", "Jane"), + createMap("id", "102", "name", "Alice"), + createMap("id", "103", "name", "Bob") + )); } - - CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); - csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - csvRecordReaderConfig.setHeader("col1,col2,col3"); - csvRecordReaderConfig.setNullStringValue(nullString); - CSVRecordReader csvRecordReader = new CSVRecordReader(); - - //execute and assert - csvRecordReader.init(file, null, csvRecordReaderConfig); - Assert.assertTrue(csvRecordReader.hasNext()); - csvRecordReader.next(); - - GenericRow row = csvRecordReader.next(); - Assert.assertNotNull(row.getValue("col1")); - Assert.assertNull(row.getValue("col2")); - Assert.assertNull(row.getValue("col3")); } @Test - public void testReadingDataFileWithCommentedLines() - throws IOException, URISyntaxException { - URI uri = ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithSingleColumn() + throws IOException { + File dataFile = getDataFile("dataFileWithSingleColumn.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("name", "John"), + createMap("name", "Jane"), + createMap("name", "Jen") + )); + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setCommentMarker('#'); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + readerConfig.setHeader("firstName,lastName,id"); + readerConfig.setSkipHeader(true); + validate(dataFile, readerConfig, List.of( + createMap("firstName", "John", "lastName", null, "id", null), + createMap("firstName", "Jane", "lastName", null, "id", null), + createMap("firstName", "Jen", "lastName", null, "id", null) + )); } @Test - public void testReadingDataFileWithEmptyLines() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithEmptyLines.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithInvalidHeader() + throws IOException { + File dataFile = getDataFile("dataFileWithInvalidHeader.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + try (CSVRecordReader recordReader = new CSVRecordReader()) { + assertThrows(IllegalStateException.class, () -> recordReader.init(dataFile, null, readerConfig)); + } + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(5, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(5, genericRows.size()); + readerConfig.setHeader("firstName,lastName,id"); + readerConfig.setSkipHeader(true); + validate(dataFile, readerConfig, List.of( + createMap("firstName", "John", "lastName", "Doe", "id", "100"), + createMap("firstName", "Jane", "lastName", "Doe", "id", "101"), + createMap("firstName", "Jen", "lastName", "Doe", "id", "102") + )); } @Test - public void testReadingDataFileWithEscapedQuotes() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithEscapedQuotes.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator + public void testReadingDataFileWithAlternateDelimiter() + throws IOException { + File dataFile = getDataFile("dataFileWithAlternateDelimiter.csv"); CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(2, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(2, genericRows.size()); + readerConfig.setDelimiter('|'); + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "firstName", "John", "lastName", "Doe"), + createMap("id", "101", "firstName", "Jane", "lastName", "Doe"), + createMap("id", "102", "firstName", "Jen", "lastName", "Doe") + )); } @Test - public void testReadingDataFileWithNoHeader() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithNoHeader.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithSurroundingSpaces() + throws IOException { + File dataFile = getDataFile("dataFileWithSurroundingSpaces.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("firstName", "John", "lastName", "Doe", "id", "100"), + createMap("firstName", "Jane", "lastName", "Doe", "id", "101"), + createMap("firstName", "Jen", "lastName", "Doe", "id", "102") + )); + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setHeader("id,name"); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + readerConfig.setIgnoreSurroundingSpaces(false); + validate(dataFile, readerConfig, List.of( + createMap(" firstName ", "John ", " lastName ", " Doe", " id", "100"), + createMap(" firstName ", "Jane", " lastName ", " Doe", " id", " 101"), + createMap(" firstName ", "Jen", " lastName ", "Doe ", " id", "102") + )); } @Test - public void testReadingDataFileWithQuotedHeaders() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithQuotedHeaders.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(2, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(2, genericRows.size()); + public void testReadingDataFileWithQuotes() + throws IOException { + File dataFile = getDataFile("dataFileWithQuotes.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("key", "key00", "num0", "12.3", "num1", "8.42"), + createMap("key", "key01", "num0", null, "num1", "7.1"), + createMap("key", "key02", "num0", null, "num1", "16.81"), + createMap("key", "key03", "num0", null, "num1", "7.12") + )); + } } @Test - public void testLineIteratorReadingDataFileWithUnparseableLines() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI(); - File dataFile = new File(uri); - + public void testReadingDataFileWithCustomNull() + throws IOException { + File dataFile = getDataFile("dataFileWithCustomNull.csv"); CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(1, genericRows.size()); + readerConfig.setNullStringValue("NULL"); + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "name", null), + createMap("id", null, "name", "Jane"), + createMap("id", null, "name", null), + createMap("id", null, "name", null) + )); } @Test - public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithCommentedLines() + throws IOException { + File dataFile = getDataFile("dataFileWithCommentedLines.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + // Verify first row + validate(dataFile, readerConfig, 5, List.of(createMap("id", "# ignore line#1", "name", null))); + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - final List genericRows1 = readCSVRecords(dataFile, readerConfig, null, true); - Assert.assertEquals(3, genericRows1.size()); - - // Start reading again; results should be same - final List genericRows2 = readCSVRecords(dataFile, readerConfig, null, true); - Assert.assertEquals(3, genericRows2.size()); - - // Check that the rows are the same - for (int i = 0; i < genericRows1.size(); i++) { - Assert.assertEquals(genericRows1.get(i), genericRows2.get(i)); - } + readerConfig.setCommentMarker('#'); + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "name", "Jane"), + createMap("id", "101", "name", "John"), + createMap("id", "102", "name", "Sam") + )); } @Test - public void testReadingDataFileWithRewind() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI(); - File dataFile = new File(uri); - - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - List genericRows1 = readCSVRecords(dataFile, readerConfig, null, true); - Assert.assertEquals(4, genericRows1.size()); - - // Start reading again; results should be same - List genericRows2 = readCSVRecords(dataFile, readerConfig, null, true); - Assert.assertEquals(4, genericRows2.size()); - - // Check that the rows are the same - for (int i = 0; i < genericRows1.size(); i++) { - Assert.assertEquals(genericRows1.get(i), genericRows2.get(i)); + public void testReadingDataFileWithEmptyLines() + throws IOException { + File dataFile = getDataFile("dataFileWithEmptyLines.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, 5); } - } - - @Test (expectedExceptions = RuntimeException.class) - public void testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI(); - File dataFile = new File(uri); CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readCSVRecords(dataFile, readerConfig, null, false); + readerConfig.setIgnoreEmptyLines(false); + validate(dataFile, readerConfig, 8); } @Test - public void testLineIteratorReadingDataFileWithMultipleCombinations() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithEscapedQuotes() + throws IOException { + File dataFile = getDataFile("dataFileWithEscapedQuotes.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("\\\"id\\\"", "\\\"100\\\"", "\\\"name\\\"", "\\\"Jane\\\""), + createMap("\\\"id\\\"", "\\\"101\\\"", "\\\"name\\\"", "\\\"John\\\"") + )); + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setCommentMarker('#'); - readerConfig.setIgnoreEmptyLines(true); - - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(7, genericRows.size()); + readerConfig.setEscapeCharacter('\\'); + validate(dataFile, readerConfig, List.of( + createMap("\"id\"", "\"100\"", "\"name\"", "\"Jane\""), + createMap("\"id\"", "\"101\"", "\"name\"", "\"John\"") + )); } @Test - public void testDefaultCsvReaderReadingDataFileWithMultipleCombinations() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithNoHeader() + throws IOException { + File dataFile = getDataFile("dataFileWithNoHeader.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("100", "101", "Jane", "John"), + createMap("100", "102", "Jane", "Sam") + )); + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setCommentMarker('#'); - readerConfig.setIgnoreEmptyLines(true); - - List genericRows = readCSVRecords(dataFile, readerConfig, new GenericRow(), false); - Assert.assertEquals(7, genericRows.size()); + readerConfig.setHeader("id,name"); + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "name", "Jane"), + createMap("id", "101", "name", "John"), + createMap("id", "102", "name", "Sam") + )); } @Test - public void testLineIteratorRewindMethod() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithNoHeaderAndEmptyValues() + throws IOException { + File dataFile = getDataFile("dataFileWithNoHeaderAndEmptyValues.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("key00", "key01", "12.3", null, "8.42", "7.1"), + createMap("key00", "key02", "12.3", null, "8.42", "16.81"), + createMap("key00", "key03", "12.3", null, "8.42", "7.12") + )); + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setCommentMarker('#'); - readerConfig.setIgnoreEmptyLines(true); - readCSVRecords(dataFile, readerConfig, null, true); - - // Start reading again; results should be same - List genericRows = readCSVRecords(dataFile, readerConfig, new GenericRow(), false); - Assert.assertEquals(7, genericRows.size()); + readerConfig.setHeader("key,num0,num1"); + validate(dataFile, readerConfig, List.of( + createMap("key", "key00", "num0", "12.3", "num1", "8.42"), + createMap("key", "key01", "num0", null, "num1", "7.1"), + createMap("key", "key02", "num0", null, "num1", "16.81"), + createMap("key", "key03", "num0", null, "num1", "7.12") + )); } @Test - public void testDefaultCsvReaderRewindMethod() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileWithNoRecords() + throws IOException { + File dataFile = getDataFile("dataFileWithNoRecords.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, 0); + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setCommentMarker('#'); - readerConfig.setIgnoreEmptyLines(true); - readCSVRecords(dataFile, readerConfig, null, true); - - // Start reading again; results should be same - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(7, genericRows.size()); + readerConfig.setHeader("id,name"); + readerConfig.setSkipHeader(true); + validate(dataFile, readerConfig, 0); } @Test - public void testReadingDataFileWithInvalidHeader() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI(); - File dataFile = new File(uri); + public void testReadingDataFileEmpty() + throws IOException { + File dataFile = getDataFile("dataFileEmpty.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, 0); + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setHeader("firstName,lastName,id"); - readerConfig.setSkipHeader(true); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + readerConfig.setHeader("id,name"); + validate(dataFile, readerConfig, 0); } @Test - public void testReadingDataFileWithAlternateDelimiter() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithAlternateDelimiter.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setDelimiter('|'); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + public void testReadingDataFileWithMultiLineValues() + throws IOException { + File dataFile = getDataFile("dataFileWithMultiLineValues.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + validate(dataFile, readerConfig, List.of( + createMap("id", "100", "name", "John\n101,Jane"), + createMap("id", "102", "name", "Alice") + )); + } } @Test - public void testReadingDataFileWithSpaceAroundHeaderFields() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setIgnoreSurroundingSpaces(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig); - - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig); + public void testReadingDataFileWithUnparseableFirstLine() + throws IOException { + File dataFile = getDataFile("dataFileWithUnparseableFirstLine.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + try (CSVRecordReader recordReader = new CSVRecordReader()) { + assertThrows(IOException.class, () -> recordReader.init(dataFile, null, readerConfig)); + } + } } @Test - public void testReadingDataFileWithSpaceAroundHeaderAreRetained() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI(); - File dataFile = new File(uri); + public void testLineIteratorReadingDataFileWithUnparseableLine() + throws IOException { + File dataFile = getDataFile("dataFileWithUnparseableLine.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testUnparseableLine(recordReader); + recordReader.rewind(); + testUnparseableLine(recordReader); + } + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setIgnoreSurroundingSpaces(false); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - validateSpaceAroundHeadersAreRetained(dataFile, readerConfig); + readerConfig.setStopOnError(true); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testUnparseableLineStopOnError(recordReader); + recordReader.rewind(); + testUnparseableLineStopOnError(recordReader); + } + } - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); - validateSpaceAroundHeadersAreRetained(dataFile, readerConfig); + private void testUnparseableLine(CSVRecordReader recordReader) + throws IOException { + // First line is parseable + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + // Second line is unparseable, should throw exception when next() is called, and being skipped + assertTrue(recordReader.hasNext()); + assertThrows(UncheckedIOException.class, recordReader::next); + // Third line is parseable + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "102", "name", "Alice")); + // 3 lines in total + assertFalse(recordReader.hasNext()); + } + + private void testUnparseableLineStopOnError(CSVRecordReader recordReader) + throws IOException { + // First line is parseable + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + // Second line is unparseable, stop here + assertFalse(recordReader.hasNext()); } @Test - public void testRewindMethodAndSkipHeader() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI(); - File dataFile = new File(uri); + public void testLineIteratorReadingDataFileWithUnparseableLastLine() + throws IOException { + File dataFile = getDataFile("dataFileWithUnparseableLastLine.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testUnparseableLastLine(recordReader); + recordReader.rewind(); + testUnparseableLastLine(recordReader); + } + } CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setHeader("id,name"); - readerConfig.setSkipHeader(true); - readCSVRecords(dataFile, readerConfig, new GenericRow(), true); - - // Start reading again; results should be same - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + readerConfig.setStopOnError(true); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testUnparseableLastLineStopOnError(recordReader); + recordReader.rewind(); + testUnparseableLastLineStopOnError(recordReader); + } + } - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - readCSVRecords(dataFile, readerConfig, null, true); + private void testUnparseableLastLine(CSVRecordReader recordReader) + throws IOException { + // First line is parseable + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + // Second line is unparseable, should throw exception when next() is called, and being skipped + assertTrue(recordReader.hasNext()); + assertThrows(UncheckedIOException.class, recordReader::next); + // 2 lines in total + assertFalse(recordReader.hasNext()); + } - // Start reading again; results should be same - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(3, genericRows.size()); + private void testUnparseableLastLineStopOnError(CSVRecordReader recordReader) + throws IOException { + // First line is parseable + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + // Second line is unparseable, stop here + assertFalse(recordReader.hasNext()); } @Test public void testReadingDataFileWithPartialLastRow() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithPartialLastRow.csv").toURI(); - File dataFile = new File(uri); + throws IOException { + File dataFile = getDataFile("dataFileWithPartialLastRow.csv"); + for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) { + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testPartialLastRow(recordReader); + recordReader.rewind(); + testPartialLastRow(recordReader); + } + } - // test using line iterator CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(2, genericRows.size()); - - // Note: The default CSVRecordReader cannot handle unparseable rows + readerConfig.setStopOnError(true); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testPartialLastRowStopOnError(recordReader); + recordReader.rewind(); + testPartialLastRowStopOnError(recordReader); + } } - @Test - public void testReadingDataFileWithNoRecords() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithNoRecords.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(0, genericRows.size()); - - // Note: The default CSVRecordReader cannot handle unparseable rows + private void testPartialLastRow(CSVRecordReader recordReader) + throws IOException { + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), + createMap("id", "100", "firstName", "jane", "lastName", "doe", "appVersion", "1.0.0", "active", "yes")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), + createMap("id", "101", "firstName", "john", "lastName", "doe", "appVersion", "1.0.1", "active", "yes")); + assertTrue(recordReader.hasNext()); + assertThrows(UncheckedIOException.class, recordReader::next); + assertFalse(recordReader.hasNext()); + } + + private void testPartialLastRowStopOnError(CSVRecordReader recordReader) + throws IOException { + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), + createMap("id", "100", "firstName", "jane", "lastName", "doe", "appVersion", "1.0.0", "active", "yes")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), + createMap("id", "101", "firstName", "john", "lastName", "doe", "appVersion", "1.0.1", "active", "yes")); + assertFalse(recordReader.hasNext()); } @Test - public void testReadingDataFileWithNoHeaderAndDataRecordsWithEmptyValues() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithNoHeader2.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator + public void testLineIteratorReadingDataFileWithMultipleCombinations() + throws IOException { + File dataFile = getDataFile("dataFileWithMultipleCombinations.csv"); CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - readerConfig.setHeader("key,num0,num1"); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(4, genericRows.size()); + readerConfig.setCommentMarker('#'); + readerConfig.setEscapeCharacter('\\'); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testCombinations(recordReader); + recordReader.rewind(); + testCombinations(recordReader); + } - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(4, genericRows.size()); + readerConfig.setStopOnError(true); + try (CSVRecordReader recordReader = new CSVRecordReader()) { + recordReader.init(dataFile, null, readerConfig); + testCombinationsStopOnError(recordReader); + recordReader.rewind(); + testCombinationsStopOnError(recordReader); + } } - @Test - public void testReadingDataFileWithValidHeaders() - throws URISyntaxException, IOException { - URI uri = ClassLoader.getSystemResource("dataFileWithValidHeaders.csv").toURI(); - File dataFile = new File(uri); - - // test using line iterator - CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); - readerConfig.setSkipUnParseableLines(true); - // No explicit header is set and attempt to skip the header should be ignored. 1st line would be treated as the - // header line. - readerConfig.setSkipHeader(false); - List genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(4, genericRows.size()); + private void testCombinations(CSVRecordReader recordReader) + throws IOException { + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "101", "name", "Jane")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "102", "name", "Jerry")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "103", "name", "Suzanne")); + // NOTE: Here we need to skip twice because the first line is a comment line + assertTrue(recordReader.hasNext()); + assertThrows(UncheckedIOException.class, recordReader::next); + assertTrue(recordReader.hasNext()); + assertThrows(UncheckedIOException.class, recordReader::next); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "105", "name", "Zack\nZack")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "\"106\"", "name", "\"Ze\"")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "107", "name", "Zu")); + assertFalse(recordReader.hasNext()); + } + + private void testCombinationsStopOnError(CSVRecordReader recordReader) + throws IOException { + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "100", "name", "John")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "101", "name", "Jane")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "102", "name", "Jerry")); + assertTrue(recordReader.hasNext()); + assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", "103", "name", "Suzanne")); + assertFalse(recordReader.hasNext()); + } - // test using default CSVRecordReader - readerConfig.setSkipUnParseableLines(false); - genericRows = readCSVRecords(dataFile, readerConfig, null, false); - Assert.assertEquals(4, genericRows.size()); + private File getDataFile(String fileName) { + return new File(ClassLoader.getSystemResource(fileName).getFile()); } - private List readCSVRecords(File dataFile, - CSVRecordReaderConfig readerConfig, GenericRow genericRow, boolean rewind) + private void validate(File dataFile, @Nullable CSVRecordReaderConfig readerConfig, int expectedNumRows, + @Nullable List> expectedRows) throws IOException { - List genericRows = new ArrayList<>(); + List genericRows = new ArrayList<>(expectedNumRows); try (CSVRecordReader recordReader = new CSVRecordReader()) { recordReader.init(dataFile, null, readerConfig); - GenericRow reuse = new GenericRow(); while (recordReader.hasNext()) { - if (genericRow != null) { - recordReader.next(reuse); - genericRows.add(reuse); - } else { - GenericRow nextRow = recordReader.next(); - genericRows.add(nextRow); - } + genericRows.add(recordReader.next()); } + assertEquals(genericRows.size(), expectedNumRows); - if (rewind) { - // rewind the reader after reading all the lines - recordReader.rewind(); + // Rewind the reader and read again + recordReader.rewind(); + for (GenericRow row : genericRows) { + GenericRow genericRow = recordReader.next(); + assertEquals(genericRow, row); } + assertFalse(recordReader.hasNext()); } - return genericRows; - } - private void validateSpaceAroundHeadersAreTrimmed(File dataFile, CSVRecordReaderConfig readerConfig) - throws IOException { - try (CSVRecordReader recordReader = new CSVRecordReader()) { - recordReader.init(dataFile, null, readerConfig); - Map headerMap = recordReader.getCSVHeaderMap(); - Assert.assertEquals(3, headerMap.size()); - List headers = List.of("firstName", "lastName", "id"); - for (String header : headers) { - // surrounding spaces in headers are trimmed - Assert.assertTrue(headerMap.containsKey(header)); + if (expectedRows != null) { + int rowId = 0; + for (Map expectedRow : expectedRows) { + GenericRow genericRow = genericRows.get(rowId++); + assertEquals(genericRow.getFieldToValueMap(), expectedRow); } } } - private void validateSpaceAroundHeadersAreRetained(File dataFile, CSVRecordReaderConfig readerConfig) + private void validate(File dataFile, @Nullable CSVRecordReaderConfig readerConfig, int expectedNumRows) throws IOException { - try (CSVRecordReader recordReader = new CSVRecordReader()) { - recordReader.init(dataFile, null, readerConfig); - Map headerMap = recordReader.getCSVHeaderMap(); - Assert.assertEquals(3, headerMap.size()); - List headers = List.of(" firstName ", " lastName ", " id"); - for (String header : headers) { - // surrounding spaces in headers are trimmed - Assert.assertTrue(headerMap.containsKey(header)); - } + validate(dataFile, readerConfig, expectedNumRows, null); + } + + private void validate(File dataFile, @Nullable CSVRecordReaderConfig readerConfig, + List> expectedRows) + throws IOException { + validate(dataFile, readerConfig, expectedRows.size(), expectedRows); + } + + private static Map createMap(String... keyValues) { + Map map = new HashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + map.put(keyValues[i], keyValues[i + 1]); } + return map; } } diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv index c2b0fe32624e..c2b74454269c 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv @@ -1,5 +1,5 @@ id,name -"100","John" -"101","Jane" -"102","Alice" -"103","Bob" +100,John +101,Jane +102,Alice +103,Bob diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv index 09d3a2ce2c61..4b4c2fffe946 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv @@ -1,4 +1,4 @@ -id|fisrtName|lastName +id|firstName|lastName 100|John|Doe 101|Jane|Doe 102|Jen|Doe diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv new file mode 100644 index 000000000000..af119fc1a1cc --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv @@ -0,0 +1,5 @@ +id,name +100,NULL +,Jane +NULL,NULL +, diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv new file mode 100644 index 000000000000..927983fa9893 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv @@ -0,0 +1,4 @@ +id,name +100,"John +101,Jane" +102,Alice diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv index 5f173f4b6ceb..8590793e1e99 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv @@ -10,8 +10,14 @@ id,name 102,Jerry 103,Suzanne -# below line is unparseable by the commons-csv library +# below line is unparseable by the commons-csv library "104","Yu"s" -"105","Zack" + +# below line is multi-line value +"105","Zack +Zack" + +# below line is escaped quotes \"106\",\"Ze\" + 107,Zu diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv deleted file mode 100644 index d804f0410167..000000000000 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv +++ /dev/null @@ -1,15 +0,0 @@ -id,name - - -100,John -# ignore line 1 - -# ignore line 2 - -101,Jane -102,Jerry - -103,Suzanne -"105","Zack" -\"106\",\"Ze\" -107,Zu diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv similarity index 100% rename from pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv rename to pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv deleted file mode 100644 index ec7033d1d4c2..000000000000 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv +++ /dev/null @@ -1,3 +0,0 @@ -"id","name" -"100","Jane" -"101","John" diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv similarity index 100% rename from pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv rename to pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv new file mode 100644 index 000000000000..856771f62155 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv @@ -0,0 +1,4 @@ +name +John +Jane +Jen diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv deleted file mode 100644 index c0f140a69c7f..000000000000 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv +++ /dev/null @@ -1,4 +0,0 @@ - firstName , lastName , id -John,Doe,100 -Jane,Doe,101 -Jen,Doe,102 diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv new file mode 100644 index 000000000000..40678051c391 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv @@ -0,0 +1,4 @@ + firstName , lastName , id +John , Doe,100 +Jane, Doe, 101 +Jen,Doe ,102 diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv similarity index 100% rename from pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv rename to pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv new file mode 100644 index 000000000000..ef9b5942fb31 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv @@ -0,0 +1,3 @@ +id,name +"100","John" +"101","Jane"s" diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv new file mode 100644 index 000000000000..e0a711badf5c --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv @@ -0,0 +1,4 @@ +id,name +"100","John" +"101","Jane"s" +"102","Alice" diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv deleted file mode 100644 index 80e9a736c3f7..000000000000 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv +++ /dev/null @@ -1,5 +0,0 @@ -id,name -"100","John"s" -"101","Jane" -"102","Alice" -"103","Bob" diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java index 020b2b313582..db3a455da346 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java @@ -46,11 +46,14 @@ void init(File dataFile, @Nullable Set fieldsToRead, @Nullable RecordRea /** * Return true if more records remain to be read. + *

This method should not throw exception. Caller is not responsible for handling exceptions from this method. */ boolean hasNext(); /** * Get the next record. + *

This method should be called only if {@link #hasNext()} returns true. Caller is responsible for + * handling exceptions from this method and skip the row if user wants to continue reading the remaining rows. */ default GenericRow next() throws IOException { @@ -60,6 +63,8 @@ default GenericRow next() /** * Get the next record. Re-use the given row to reduce garbage. *

The passed in row should be cleared before calling this method. + *

This method should be called only if {@link #hasNext()} returns true. Caller is responsible for + * handling exceptions from this method and skip the row if user wants to continue reading the remaining rows. * * TODO: Consider clearing the row within the record reader to simplify the caller */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index e7566cb0ff5b..cd8c60039989 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -99,10 +99,13 @@ public void closeRecordReader() // Return true if RecordReader is done processing. public boolean isRecordReaderDone() { - if (_isRecordReaderInitialized) { - return !_recordReader.hasNext(); + if (!_isRecordReaderInitialized) { + return false; + } + if (_isRecordReaderClosed) { + return true; } - return false; + return !_recordReader.hasNext(); } // For testing purposes only.