diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 77b651d943a..c48966b9946 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -139,13 +139,11 @@ private Map doMap() int totalNumRecordReaders = _recordReaderFileConfigs.size(); GenericRow reuse = new GenericRow(); for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { - RecordReader recordReader = recordReaderFileConfig.getRecordReader(); - // Mapper can terminate midway of reading a file if the intermediate file size has crossed the configured // threshold. Map phase will continue in the next iteration right where we are leaving off in the current // iteration. boolean shouldMapperTerminate = - !completeMapAndTransformRow(recordReader, reuse, observer, count, totalNumRecordReaders); + !completeMapAndTransformRow(recordReaderFileConfig, reuse, observer, count, totalNumRecordReaders); // Terminate the map phase if intermediate file size has crossed the threshold. if (shouldMapperTerminate) { @@ -164,9 +162,10 @@ private Map doMap() // Returns true if the map phase can continue, false if it should terminate based on the configured threshold for // intermediate file size during map phase. - private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow reuse, + private boolean completeMapAndTransformRow(RecordReaderFileConfig recordReaderFileConfig, GenericRow reuse, Consumer observer, int count, int totalCount) throws Exception { observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { reuse = recordReader.next(reuse); _recordEnricherPipeline.run(reuse); @@ -176,13 +175,13 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { //noinspection unchecked for (GenericRow row : (Collection) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { - GenericRow transformedRow = _recordTransformer.transform(row); + GenericRow transformedRow = _recordTransformer.transform(row, recordReaderFileConfig); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { writeRecord(transformedRow); } } } else { - GenericRow transformedRow = _recordTransformer.transform(reuse); + GenericRow transformedRow = _recordTransformer.transform(reuse, recordReaderFileConfig); if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) { writeRecord(transformedRow); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index cf88629f106..21c98946a68 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -28,7 +28,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; - +import org.apache.pinot.spi.data.readers.RecordReaderConfig; /** * The {@code CompositeTransformer} class performs multiple transforms based on the inner {@link RecordTransformer}s. @@ -115,12 +115,19 @@ public CompositeTransformer(List transformers) { @Nullable @Override - public GenericRow transform(GenericRow record) { + public GenericRow transform(GenericRow genericRow) { + return transform(genericRow, null); + } + + @Nullable + @Override + public GenericRow transform(GenericRow record, + RecordReaderConfig recordReaderConfig) { for (RecordTransformer transformer : _transformers) { if (!IngestionUtils.shouldIngestRow(record)) { return record; } - record = transformer.transform(record); + record = transformer.transform(record, recordReaderConfig); if (record == null) { return null; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 72065132ae4..45be363fc37 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -21,7 +21,7 @@ import java.io.Serializable; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; - +import org.apache.pinot.spi.data.readers.RecordReaderConfig; /** * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. @@ -43,4 +43,14 @@ default boolean isNoOp() { */ @Nullable GenericRow transform(GenericRow record); + + /** + * Transforms a record based on some custom rules using record reader context. + * @param record Record to transform + * @return Transformed record, or {@code null} if the record does not follow certain rules. + */ + @Nullable + default GenericRow transform(GenericRow record, RecordReaderConfig recordReaderConfig) { + return transform(record); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index 51e4ed0cfb1..805561e815a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -29,7 +29,7 @@ * RecordReader can be initialized just when its about to be used, which avoids early/eager * initialization/memory allocation. */ -public class RecordReaderFileConfig { +public class RecordReaderFileConfig implements RecordReaderConfig { public final FileFormat _fileFormat; public final File _dataFile; public final Set _fieldsToRead;