diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index c48966b9946..77b651d943a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -139,11 +139,13 @@ private Map doMap() int totalNumRecordReaders = _recordReaderFileConfigs.size(); GenericRow reuse = new GenericRow(); for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); + // Mapper can terminate midway of reading a file if the intermediate file size has crossed the configured // threshold. Map phase will continue in the next iteration right where we are leaving off in the current // iteration. boolean shouldMapperTerminate = - !completeMapAndTransformRow(recordReaderFileConfig, reuse, observer, count, totalNumRecordReaders); + !completeMapAndTransformRow(recordReader, reuse, observer, count, totalNumRecordReaders); // Terminate the map phase if intermediate file size has crossed the threshold. if (shouldMapperTerminate) { @@ -162,10 +164,9 @@ private Map doMap() // Returns true if the map phase can continue, false if it should terminate based on the configured threshold for // intermediate file size during map phase. - private boolean completeMapAndTransformRow(RecordReaderFileConfig recordReaderFileConfig, GenericRow reuse, + private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow reuse, Consumer observer, int count, int totalCount) throws Exception { observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); - RecordReader recordReader = recordReaderFileConfig.getRecordReader(); while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { reuse = recordReader.next(reuse); _recordEnricherPipeline.run(reuse); @@ -175,13 +176,13 @@ private boolean completeMapAndTransformRow(RecordReaderFileConfig recordReaderFi if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { //noinspection unchecked for (GenericRow row : (Collection) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { - GenericRow transformedRow = _recordTransformer.transform(row, recordReaderFileConfig); + GenericRow transformedRow = _recordTransformer.transform(row); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { writeRecord(transformedRow); } } } else { - GenericRow transformedRow = _recordTransformer.transform(reuse, recordReaderFileConfig); + GenericRow transformedRow = _recordTransformer.transform(reuse); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { writeRecord(transformedRow); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index 21c98946a68..cf88629f106 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -28,7 +28,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReaderConfig; + /** * The {@code CompositeTransformer} class performs multiple transforms based on the inner {@link RecordTransformer}s. @@ -115,19 +115,12 @@ public CompositeTransformer(List transformers) { @Nullable @Override - public GenericRow transform(GenericRow genericRow) { - return transform(genericRow, null); - } - - @Nullable - @Override - public GenericRow transform(GenericRow record, - RecordReaderConfig recordReaderConfig) { + public GenericRow transform(GenericRow record) { for (RecordTransformer transformer : _transformers) { if (!IngestionUtils.shouldIngestRow(record)) { return record; } - record = transformer.transform(record, recordReaderConfig); + record = transformer.transform(record); if (record == null) { return null; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 45be363fc37..72065132ae4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -21,7 +21,7 @@ import java.io.Serializable; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReaderConfig; + /** * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. @@ -43,14 +43,4 @@ default boolean isNoOp() { */ @Nullable GenericRow transform(GenericRow record); - - /** - * Transforms a record based on some custom rules using record reader context. - * @param record Record to transform - * @return Transformed record, or {@code null} if the record does not follow certain rules. - */ - @Nullable - default GenericRow transform(GenericRow record, RecordReaderConfig recordReaderConfig) { - return transform(record); - } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index 805561e815a..51e4ed0cfb1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -29,7 +29,7 @@ * RecordReader can be initialized just when its about to be used, which avoids early/eager * initialization/memory allocation. */ -public class RecordReaderFileConfig implements RecordReaderConfig { +public class RecordReaderFileConfig { public final FileFormat _fileFormat; public final File _dataFile; public final Set _fieldsToRead;