diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java index f22474354c5f..cae1e3c17bc0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java @@ -25,6 +25,7 @@ import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.TypeRef; import com.jayway.jsonpath.spi.json.JacksonJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import java.math.BigDecimal; @@ -58,6 +59,16 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator { private static final float[] EMPTY_FLOATS = new float[0]; private static final double[] EMPTY_DOUBLES = new double[0]; private static final String[] EMPTY_STRINGS = new String[0]; + private static final TypeRef> INTEGER_LIST_TYPE = new TypeRef>() { + }; + private static final TypeRef> LONG_LIST_TYPE = new TypeRef>() { + }; + private static final TypeRef> FLOAT_LIST_TYPE = new TypeRef>() { + }; + private static final TypeRef> DOUBLE_LIST_TYPE = new TypeRef>() { + }; + private static final TypeRef> STRING_LIST_TYPE = new TypeRef>() { + }; public static JsonPathEvaluator create(String jsonPath, @Nullable Object defaultValue) { try { @@ -274,23 +285,23 @@ public void evaluateBlock(int[] docIds, in reader.readDictIds(docIds, length, dictIdsBuffer, context); if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer); } } else { for (int i = 0; i < length; i++) { - processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromString(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer); } } } else { switch (reader.getStoredType()) { case STRING: for (int i = 0; i < length; i++) { - processList(i, extractFromString(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromString(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer); } break; case BYTES: for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromBytes(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer); } break; default: @@ -305,23 +316,23 @@ public void evaluateBlock(int[] docIds, in reader.readDictIds(docIds, length, dictIdsBuffer, context); if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer); } } else { for (int i = 0; i < length; i++) { - processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromString(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer); } } } else { switch (reader.getStoredType()) { case STRING: for (int i = 0; i < length; i++) { - processList(i, extractFromString(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromString(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer); } break; case BYTES: for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromBytes(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer); } break; default: @@ -336,23 +347,23 @@ public void evaluateBlock(int[] docIds, in reader.readDictIds(docIds, length, dictIdsBuffer, context); if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer); } } else { for (int i = 0; i < length; i++) { - processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromString(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer); } } } else { switch (reader.getStoredType()) { case STRING: for (int i = 0; i < length; i++) { - processList(i, extractFromString(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromString(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer); } break; case BYTES: for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromBytes(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer); } break; default: @@ -367,23 +378,23 @@ public void evaluateBlock(int[] docIds, in reader.readDictIds(docIds, length, dictIdsBuffer, context); if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer); } } else { for (int i = 0; i < length; i++) { - processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromString(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer); } } } else { switch (reader.getStoredType()) { case STRING: for (int i = 0; i < length; i++) { - processList(i, extractFromString(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromString(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer); } break; case BYTES: for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromBytes(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer); } break; default: @@ -398,23 +409,23 @@ public void evaluateBlock(int[] docIds, in reader.readDictIds(docIds, length, dictIdsBuffer, context); if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer); } } else { for (int i = 0; i < length; i++) { - processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer); + processList(i, extractFromString(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer); } } } else { switch (reader.getStoredType()) { case STRING: for (int i = 0; i < length; i++) { - processList(i, extractFromString(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromString(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer); } break; case BYTES: for (int i = 0; i < length; i++) { - processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer); + processList(i, extractFromBytes(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer); } break; default: @@ -432,6 +443,15 @@ private T extractFromBytes(Dictionary dictionary, int dictId) { } } + @Nullable + private T extractFromBytes(Dictionary dictionary, int dictId, TypeRef ref) { + try { + return JSON_PARSER_CONTEXT.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath, ref); + } catch (Exception e) { + return null; + } + } + @Nullable private T extractFromBytes(ForwardIndexReader reader, R context, int docId) { @@ -442,6 +462,16 @@ private T extractFromBytes(ForwardIndex } } + @Nullable + private T extractFromBytes(ForwardIndexReader reader, R context, + int docId, TypeRef ref) { + try { + return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref); + } catch (Exception e) { + return null; + } + } + @Nullable private T extractFromBytesWithExactBigDecimal(Dictionary dictionary, int dictId) { try { @@ -470,6 +500,15 @@ private T extractFromString(Dictionary dictionary, int dictId) { } } + @Nullable + private T extractFromString(Dictionary dictionary, int dictId, TypeRef ref) { + try { + return JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath, ref); + } catch (Exception e) { + return null; + } + } + @Nullable private T extractFromString(ForwardIndexReader reader, R context, int docId) { @@ -480,6 +519,16 @@ private T extractFromString(ForwardInde } } + @Nullable + private T extractFromString(ForwardIndexReader reader, R context, + int docId, TypeRef ref) { + try { + return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref); + } catch (Exception e) { + return null; + } + } + @Nullable private T extractFromStringWithExactBigDecimal(Dictionary dictionary, int dictId) { try { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java new file mode 100644 index 000000000000..c7b304667c3b --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.evaluators; + +import java.nio.charset.StandardCharsets; +import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.spi.data.FieldSpec; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + + +public class DefaultJsonPathEvaluatorTest { + @Test + public void testNonDictIntegerArray() { + String json = "{\"values\": [1, 2, 3, 4, 5]}"; + String path = "$.values[0:3]"; + JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{}); + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8)); + when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING); + when(reader.createContext()).thenReturn(null); + + // Read as ints + int[][] buffer = new int[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer); + assertArrayEquals(buffer, new int[][]{{1, 2, 3}}); + + // Read as longs + long[][] longBuffer = new long[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer); + assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}}); + + // Read as floats + float[][] floatBuffer = new float[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer); + assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}}); + + // Read as doubles + double[][] doubleBuffer = new double[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer); + assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}}); + + // Read as strings + String[][] stringBuffer = new String[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer); + assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}}); + } + + @Test + public void testNonDictStringArray() { + String json = "{\"values\": [\"1\", \"2\", \"3\", \"4\", \"5\"]}"; + String path = "$.values[0:3]"; + JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{}); + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8)); + when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING); + when(reader.createContext()).thenReturn(null); + + // Read as ints + int[][] buffer = new int[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer); + assertArrayEquals(buffer, new int[][]{{1, 2, 3}}); + + // Read as longs + long[][] longBuffer = new long[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer); + assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}}); + + // Read as floats + float[][] floatBuffer = new float[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer); + assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}}); + + // Read as doubles + double[][] doubleBuffer = new double[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer); + assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}}); + + // Read as strings + String[][] stringBuffer = new String[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer); + assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}}); + } + + @Test + public void testNonDictDoubleArray() { + String json = "{\"values\": [1.0, 2.0, 3.0, 4.0, 5.0]}"; + String path = "$.values[0:3]"; + JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{}); + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8)); + when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING); + when(reader.createContext()).thenReturn(null); + + // Read as ints + int[][] buffer = new int[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer); + assertArrayEquals(buffer, new int[][]{{1, 2, 3}}); + + // Read as longs + long[][] longBuffer = new long[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer); + assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}}); + + // Read as floats + float[][] floatBuffer = new float[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer); + assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}}); + + // Read as doubles + double[][] doubleBuffer = new double[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer); + assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}}); + + // Read as strings + String[][] stringBuffer = new String[1][3]; + evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer); + assertArrayEquals(stringBuffer, new String[][]{{"1.0", "2.0", "3.0"}}); + } +}