Skip to content

Commit

Permalink
addressed comments and added test for ensuring order of transformers.
Browse files Browse the repository at this point in the history
  • Loading branch information
aishikbh authored and snleee committed Nov 24, 2023
1 parent 5c8f43e commit 2c05f8d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -35,6 +37,8 @@
* {@link FieldSpec}.
*/
public class SpecialValueTransformer implements RecordTransformer {

private static final Logger LOGGER = LoggerFactory.getLogger(NullValueTransformer.class);
private final HashSet<String> _specialValuesKeySet = new HashSet<>();

public SpecialValueTransformer(Schema schema) {
Expand All @@ -48,18 +52,22 @@ public SpecialValueTransformer(Schema schema) {

private Object transformNegativeZero(Object value) {
if ((value instanceof Float) && (Float.floatToRawIntBits((float) value) == Float.floatToRawIntBits(-0.0f))) {
LOGGER.info("-0.0f value detected, converting to 0.0.");
value = 0.0f;
} else if ((value instanceof Double) && (Double.doubleToLongBits((double) value) == Double.doubleToLongBits(
-0.0d))) {
LOGGER.info("-0.0d value detected, converting to 0.0.");
value = 0.0d;
}
return value;
}

private Object transformNaN(Object value) {
if ((value instanceof Float) && ((Float) value).isNaN()) {
LOGGER.info("Float.NaN detected, converting to default null.");
value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT;
} else if ((value instanceof Double) && ((Double) value).isNaN()) {
LOGGER.info("Double.NaN detected, converting to default null.");
value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE;
}
return value;
Expand All @@ -74,21 +82,23 @@ public boolean isNoOp() {
public GenericRow transform(GenericRow record) {
for (String element : _specialValuesKeySet) {
Object value = record.getValue(element);
if (value instanceof Float || value instanceof Double) {
if (value instanceof Object[]) {
// Multi-valued column.
Object[] values = (Object[]) value;
int numValues = values.length;
for (int i = 0; i < numValues; i++) {
if (values[i] != null) {
values[i] = transformNegativeZero(values[i]);
values[i] = transformNaN(values[i]);
}
}
} else {
// Single-valued column.
Object zeroTransformedValue = transformNegativeZero(value);
Object nanTransformedValue = transformNaN(zeroTransformedValue);
if (nanTransformedValue != value) {
record.putValue(element, nanTransformedValue);
}
} else if (value instanceof Object[]) {
// Multi-valued column.
Object[] values = (Object[]) value;
int numValues = values.length;
for (int i = 0; i < numValues; i++) {
values[i] = transformNegativeZero(values[i]);
values[i] = transformNaN(values[i]);
}
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.sql.Timestamp;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -69,6 +72,7 @@ public class RecordTransformerTest {

// Transform multiple times should return the same result
private static final int NUM_ROUNDS = 5;
private static final int NUMBER_OF_TRANSFORMERS = 8;

private static GenericRow getRecord() {
GenericRow record = new GenericRow();
Expand Down Expand Up @@ -291,6 +295,72 @@ record = transformer.transform(record);
}
}

@Test
public void testOrderForTransformers() {
// This test checks that the specified order is maintained for different transformers.

// Build Schema and ingestionConfig in such a way that all the transformers are loaded.
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.INT)
.addSingleValueDimension("svDouble", DataType.DOUBLE)
.addSingleValueDimension("expressionTestColumn", DataType.INT).addSingleValueDimension("svNaN", DataType.FLOAT)
.addSingleValueDimension("emptyDimensionForNullValueTransformer", DataType.FLOAT)
.addSingleValueDimension("svStringWithNullCharacters", DataType.STRING)
.addSingleValueDimension("indexableExtras", DataType.JSON)
.addDateTime("timeCol", DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS").build();

IngestionConfig ingestionConfig = new IngestionConfig();
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
.setTimeColumnName("timeCol").build();
ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123 AND svDouble <= 200"));
ingestionConfig.setTransformConfigs(List.of(new TransformConfig("expressionTestColumn", "plus(x,10)")));
ingestionConfig.setSchemaConformingTransformerConfig(
new SchemaConformingTransformerConfig("indexableExtras", null, null, null));
ingestionConfig.setRowTimeValueCheck(true);
ingestionConfig.setContinueOnError(false);

// Get the list of transformers.
List<RecordTransformer> currentListOfTransformers =
CompositeTransformer.getDefaultTransformers(tableConfig, schema);

// Create a list of transformers to compare.
List<RecordTransformer> expectedListOfTransformers =
List.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
new SpecialValueTransformer(schema), new SanitizationTransformer(schema));

// Check that the number of current transformers match the expected number of transformers.
assertEquals(currentListOfTransformers.size(), NUMBER_OF_TRANSFORMERS);

GenericRow record = new GenericRow();

// Data for expression Transformer.
record.putValue("expressionTestColumn", 100);

// Data for filter transformer.
record.putValue("svDouble", 123d);

// Data for DataType Transformer.
record.putValue("svInt", (byte) 123);

// Data for TimeValidation transformer.
record.putValue("timeCol", System.currentTimeMillis());

// Data for SpecialValue Transformer.
record.putValue("svNaN", Float.NaN);

// Data for sanitization transformer.
record.putValue("svStringWithNullCharacters", "1\0002\0003");

for (int i = 0; i < NUMBER_OF_TRANSFORMERS; i++) {
GenericRow currentRecord = currentListOfTransformers.get(i).transform(record);
GenericRow expectedRecord = expectedListOfTransformers.get(i).transform(record);
assertEquals(currentRecord, expectedRecord);
record = expectedRecord;
}
}

@Test
public void testScalarOps() {
IngestionConfig ingestionConfig = new IngestionConfig();
Expand Down Expand Up @@ -566,7 +636,6 @@ record = transformer.transform(record);
assertEquals(record.getValue("mvString1"), new Object[]{"123", "123", "123", "123.0", "123.0"});
assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", "123.0", "123.0", "123"});
assertNull(record.getValue("$virtual"));
assertTrue(record.getNullValueFields().isEmpty());
assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")),
Float.floatToRawIntBits(0.0f));
assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")),
Expand All @@ -579,6 +648,7 @@ record = transformer.transform(record);
new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f});
assertEquals(record.getValue("mvDoubleNaN"),
new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d});
assertTrue(record.getNullValueFields().isEmpty());
}

// Test empty record
Expand Down

0 comments on commit 2c05f8d

Please sign in to comment.