Skip to content

Commit

Permalink
added code to remove NaN from multivalued columns and modified order …
Browse files Browse the repository at this point in the history
…of transformers.
  • Loading branch information
aishikbh authored and snleee committed Nov 24, 2023
1 parent 2c05f8d commit 06e91c1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ public class CompositeTransformer implements RecordTransformer {
* </li>
* <li>
* {@link SpecialValueTransformer} after {@link DataTypeTransformer} so that we already have the values complying
* with the schema before handling special values
* with the schema before handling special values and before {@link NullValueTransformer} so that it transforms
* all the null values properly
* </li>
* </ul>
*/
public static List<RecordTransformer> getDefaultTransformers(TableConfig tableConfig, Schema schema) {
return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
new SpecialValueTransformer(schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp())
new TimeValidationTransformer(tableConfig, schema), new SpecialValueTransformer(schema),
new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp())
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.segment.local.recordtransformer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -65,10 +67,10 @@ private Object transformNegativeZero(Object value) {
private Object transformNaN(Object value) {
if ((value instanceof Float) && ((Float) value).isNaN()) {
LOGGER.info("Float.NaN detected, converting to default null.");
value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT;
value = null;
} else if ((value instanceof Double) && ((Double) value).isNaN()) {
LOGGER.info("Double.NaN detected, converting to default null.");
value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE;
value = null;
}
return value;
}
Expand All @@ -86,12 +88,16 @@ public GenericRow transform(GenericRow record) {
// Multi-valued column.
Object[] values = (Object[]) value;
int numValues = values.length;
for (int i = 0; i < numValues; i++) {
if (values[i] != null) {
values[i] = transformNegativeZero(values[i]);
values[i] = transformNaN(values[i]);
List<Object> negativeZeroNanSanitizedValues = new ArrayList<>(numValues);
int numberOfElements = values.length;
for (Object o : values) {
Object zeroTransformedValue = transformNegativeZero(o);
Object nanTransformedValue = transformNaN(zeroTransformedValue);
if (nanTransformedValue != null) {
negativeZeroNanSanitizedValues.add(nanTransformedValue);
}
}
record.putValue(element,negativeZeroNanSanitizedValues.toArray());
} else {
// Single-valued column.
Object zeroTransformedValue = transformNegativeZero(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.segment.local.recordtransformer;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -286,12 +288,12 @@ record = transformer.transform(record);
Double.doubleToRawLongBits(0.0d));
assertEquals(record.getValue("mvFloatNegativeZero"), new Float[]{0.0f, 1.0f, 0.0f, 3.0f});
assertEquals(record.getValue("mvDoubleNegativeZero"), new Double[]{0.0d, 1.0d, 0.0d, 3.0d});
assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
assertNull(record.getValue("svFloatNaN"));
assertNull(record.getValue("svDoubleNaN"));
assertEquals(record.getValue("mvFloatNaN"),
new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f});
new Float[]{0.0f, 2.0f});
assertEquals(record.getValue("mvDoubleNaN"),
new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d});
new Double[]{0.0d, 2.0d});
}
}

Expand All @@ -302,7 +304,8 @@ public void testOrderForTransformers() {
// Build Schema and ingestionConfig in such a way that all the transformers are loaded.
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.INT)
.addSingleValueDimension("svDouble", DataType.DOUBLE)
.addSingleValueDimension("expressionTestColumn", DataType.INT).addSingleValueDimension("svNaN", DataType.FLOAT)
.addSingleValueDimension("expressionTestColumn", DataType.INT)
.addSingleValueDimension("svNaN", DataType.FLOAT).addMultiValueDimension("mvNaN",DataType.FLOAT)
.addSingleValueDimension("emptyDimensionForNullValueTransformer", DataType.FLOAT)
.addSingleValueDimension("svStringWithNullCharacters", DataType.STRING)
.addSingleValueDimension("indexableExtras", DataType.JSON)
Expand All @@ -327,8 +330,8 @@ public void testOrderForTransformers() {
List<RecordTransformer> expectedListOfTransformers =
List.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
new SpecialValueTransformer(schema), new SanitizationTransformer(schema));
new TimeValidationTransformer(tableConfig, schema), new SpecialValueTransformer(schema),
new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema));

// Check that the number of current transformers match the expected number of transformers.
assertEquals(currentListOfTransformers.size(), NUMBER_OF_TRANSFORMERS);
Expand All @@ -349,6 +352,7 @@ public void testOrderForTransformers() {

// Data for SpecialValue Transformer.
record.putValue("svNaN", Float.NaN);
record.putValue("mvNaN",new Float[]{1.0f,Float.NaN,2.0f});

// Data for sanitization transformer.
record.putValue("svStringWithNullCharacters", "1\0002\0003");
Expand Down Expand Up @@ -645,10 +649,10 @@ record = transformer.transform(record);
assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
assertEquals(record.getValue("mvFloatNaN"),
new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f});
new Float[]{0.0f, 2.0f});
assertEquals(record.getValue("mvDoubleNaN"),
new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d});
assertTrue(record.getNullValueFields().isEmpty());
new Double[]{0.0d, 2.0d});
assertEquals(new ArrayList<>(record.getNullValueFields()), new ArrayList<>(Arrays.asList("svFloatNaN","svDoubleNaN")));
}

// Test empty record
Expand Down

0 comments on commit 06e91c1

Please sign in to comment.