Skip to content

Commit

Permalink
Pass explicit TypeRef when evaluating MV jsonPath (apache#12524)
Browse files Browse the repository at this point in the history
Co-authored-by: Saurabh Dubey <[email protected]>
  • Loading branch information
saurabhd336 and Saurabh Dubey authored Feb 29, 2024
1 parent 35c89c8 commit bddd361
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.TypeRef;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import java.math.BigDecimal;
Expand Down Expand Up @@ -58,6 +59,16 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
private static final float[] EMPTY_FLOATS = new float[0];
private static final double[] EMPTY_DOUBLES = new double[0];
private static final String[] EMPTY_STRINGS = new String[0];
private static final TypeRef<List<Integer>> INTEGER_LIST_TYPE = new TypeRef<List<Integer>>() {
};
private static final TypeRef<List<Long>> LONG_LIST_TYPE = new TypeRef<List<Long>>() {
};
private static final TypeRef<List<Float>> FLOAT_LIST_TYPE = new TypeRef<List<Float>>() {
};
private static final TypeRef<List<Double>> DOUBLE_LIST_TYPE = new TypeRef<List<Double>>() {
};
private static final TypeRef<List<String>> STRING_LIST_TYPE = new TypeRef<List<String>>() {
};

public static JsonPathEvaluator create(String jsonPath, @Nullable Object defaultValue) {
try {
Expand Down Expand Up @@ -274,23 +285,23 @@ public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, in
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromString(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromString(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromBytes(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer);
}
break;
default:
Expand All @@ -305,23 +316,23 @@ public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, in
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromString(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromString(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromBytes(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer);
}
break;
default:
Expand All @@ -336,23 +347,23 @@ public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, in
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromString(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromString(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromBytes(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer);
}
break;
default:
Expand All @@ -367,23 +378,23 @@ public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, in
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromString(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromString(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromBytes(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer);
}
break;
default:
Expand All @@ -398,23 +409,23 @@ public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, in
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
processList(i, extractFromString(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromString(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
processList(i, extractFromBytes(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer);
}
break;
default:
Expand All @@ -432,6 +443,15 @@ private <T> T extractFromBytes(Dictionary dictionary, int dictId) {
}
}

@Nullable
private <T> T extractFromBytes(Dictionary dictionary, int dictId, TypeRef<T> ref) {
try {
return JSON_PARSER_CONTEXT.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath, ref);
} catch (Exception e) {
return null;
}
}

@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndexReader<R> reader, R context,
int docId) {
Expand All @@ -442,6 +462,16 @@ private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndex
}
}

@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndexReader<R> reader, R context,
int docId, TypeRef<T> ref) {
try {
return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref);
} catch (Exception e) {
return null;
}
}

@Nullable
private <T> T extractFromBytesWithExactBigDecimal(Dictionary dictionary, int dictId) {
try {
Expand Down Expand Up @@ -470,6 +500,15 @@ private <T> T extractFromString(Dictionary dictionary, int dictId) {
}
}

@Nullable
private <T> T extractFromString(Dictionary dictionary, int dictId, TypeRef<T> ref) {
try {
return JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath, ref);
} catch (Exception e) {
return null;
}
}

@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardIndexReader<R> reader, R context,
int docId) {
Expand All @@ -480,6 +519,16 @@ private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardInde
}
}

@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardIndexReader<R> reader, R context,
int docId, TypeRef<T> ref) {
try {
return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref);
} catch (Exception e) {
return null;
}
}

@Nullable
private <T> T extractFromStringWithExactBigDecimal(Dictionary dictionary, int dictId) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.common.evaluators;

import java.nio.charset.StandardCharsets;
import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.spi.data.FieldSpec;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;


public class DefaultJsonPathEvaluatorTest {
@Test
public void testNonDictIntegerArray() {
String json = "{\"values\": [1, 2, 3, 4, 5]}";
String path = "$.values[0:3]";
JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
when(reader.isDictionaryEncoded()).thenReturn(false);
when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
when(reader.createContext()).thenReturn(null);

// Read as ints
int[][] buffer = new int[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
assertArrayEquals(buffer, new int[][]{{1, 2, 3}});

// Read as longs
long[][] longBuffer = new long[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});

// Read as floats
float[][] floatBuffer = new float[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});

// Read as doubles
double[][] doubleBuffer = new double[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});

// Read as strings
String[][] stringBuffer = new String[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
}

@Test
public void testNonDictStringArray() {
String json = "{\"values\": [\"1\", \"2\", \"3\", \"4\", \"5\"]}";
String path = "$.values[0:3]";
JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
when(reader.isDictionaryEncoded()).thenReturn(false);
when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
when(reader.createContext()).thenReturn(null);

// Read as ints
int[][] buffer = new int[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
assertArrayEquals(buffer, new int[][]{{1, 2, 3}});

// Read as longs
long[][] longBuffer = new long[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});

// Read as floats
float[][] floatBuffer = new float[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});

// Read as doubles
double[][] doubleBuffer = new double[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});

// Read as strings
String[][] stringBuffer = new String[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
}

@Test
public void testNonDictDoubleArray() {
String json = "{\"values\": [1.0, 2.0, 3.0, 4.0, 5.0]}";
String path = "$.values[0:3]";
JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
when(reader.isDictionaryEncoded()).thenReturn(false);
when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
when(reader.createContext()).thenReturn(null);

// Read as ints
int[][] buffer = new int[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
assertArrayEquals(buffer, new int[][]{{1, 2, 3}});

// Read as longs
long[][] longBuffer = new long[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});

// Read as floats
float[][] floatBuffer = new float[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});

// Read as doubles
double[][] doubleBuffer = new double[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});

// Read as strings
String[][] stringBuffer = new String[1][3];
evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
assertArrayEquals(stringBuffer, new String[][]{{"1.0", "2.0", "3.0"}});
}
}

0 comments on commit bddd361

Please sign in to comment.