Skip to content

Commit

Permalink
Adding record reader config/context param to record transformer (apa…
Browse files Browse the repository at this point in the history
…che#12520)

* Adding record context param to record transformer

* Address comments
  • Loading branch information
swaminathanmanish authored Feb 29, 2024
1 parent 2249be3 commit f0fcbd8
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,11 @@ private Map<String, GenericRowFileManager> doMap()
int totalNumRecordReaders = _recordReaderFileConfigs.size();
GenericRow reuse = new GenericRow();
for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) {
RecordReader recordReader = recordReaderFileConfig.getRecordReader();

// Mapper can terminate midway of reading a file if the intermediate file size has crossed the configured
// threshold. Map phase will continue in the next iteration right where we are leaving off in the current
// iteration.
boolean shouldMapperTerminate =
!completeMapAndTransformRow(recordReader, reuse, observer, count, totalNumRecordReaders);
!completeMapAndTransformRow(recordReaderFileConfig, reuse, observer, count, totalNumRecordReaders);

// Terminate the map phase if intermediate file size has crossed the threshold.
if (shouldMapperTerminate) {
Expand All @@ -164,9 +162,10 @@ private Map<String, GenericRowFileManager> doMap()

// Returns true if the map phase can continue, false if it should terminate based on the configured threshold for
// intermediate file size during map phase.
private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow reuse,
private boolean completeMapAndTransformRow(RecordReaderFileConfig recordReaderFileConfig, GenericRow reuse,
Consumer<Object> observer, int count, int totalCount) throws Exception {
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
RecordReader recordReader = recordReaderFileConfig.getRecordReader();
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);
Expand All @@ -176,13 +175,13 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow
if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
//noinspection unchecked
for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
GenericRow transformedRow = _recordTransformer.transform(row);
GenericRow transformedRow = _recordTransformer.transform(row, recordReaderFileConfig);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
writeRecord(transformedRow);
}
}
} else {
GenericRow transformedRow = _recordTransformer.transform(reuse);
GenericRow transformedRow = _recordTransformer.transform(reuse, recordReaderFileConfig);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
writeRecord(transformedRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;

import org.apache.pinot.spi.data.readers.RecordReaderConfig;

/**
* The {@code CompositeTransformer} class performs multiple transforms based on the inner {@link RecordTransformer}s.
Expand Down Expand Up @@ -115,12 +115,19 @@ public CompositeTransformer(List<RecordTransformer> transformers) {

@Nullable
@Override
public GenericRow transform(GenericRow record) {
public GenericRow transform(GenericRow genericRow) {
return transform(genericRow, null);
}

@Nullable
@Override
public GenericRow transform(GenericRow record,
RecordReaderConfig recordReaderConfig) {
for (RecordTransformer transformer : _transformers) {
if (!IngestionUtils.shouldIngestRow(record)) {
return record;
}
record = transformer.transform(record);
record = transformer.transform(record, recordReaderConfig);
if (record == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;

import org.apache.pinot.spi.data.readers.RecordReaderConfig;

/**
* The record transformer which takes a {@link GenericRow} and transform it based on some custom rules.
Expand All @@ -43,4 +43,14 @@ default boolean isNoOp() {
*/
@Nullable
GenericRow transform(GenericRow record);

/**
* Transforms a record based on some custom rules using record reader context.
* @param record Record to transform
* @return Transformed record, or {@code null} if the record does not follow certain rules.
*/
@Nullable
default GenericRow transform(GenericRow record, RecordReaderConfig recordReaderConfig) {
return transform(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* RecordReader can be initialized just when its about to be used, which avoids early/eager
* initialization/memory allocation.
*/
public class RecordReaderFileConfig {
public class RecordReaderFileConfig implements RecordReaderConfig {
public final FileFormat _fileFormat;
public final File _dataFile;
public final Set<String> _fieldsToRead;
Expand Down

0 comments on commit f0fcbd8

Please sign in to comment.