Skip to content

Commit

Permalink
Json extract index mv (apache#12532)
Browse files Browse the repository at this point in the history
* Json extract index mv
---------

Co-authored-by: Saurabh Dubey <[email protected]>
Co-authored-by: Saurabh Dubey <[email protected]>
  • Loading branch information
3 people authored Mar 21, 2024
1 parent 3cca82c commit ddc3d0b
Show file tree
Hide file tree
Showing 7 changed files with 568 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class JsonExtractIndexTransformFunction extends BaseTransformFunction {
private TransformResultMetadata _resultMetadata;
private JsonIndexReader _jsonIndexReader;
private Object _defaultValue;
private Map<String, RoaringBitmap> _matchingDocsMap;
private Map<String, RoaringBitmap> _valueToMatchingDocsMap;

@Override
public String getName() {
Expand Down Expand Up @@ -90,15 +90,12 @@ public void init(List<TransformFunction> arguments, Map<String, ColumnContext> c
}
String resultsType = ((LiteralTransformFunction) thirdArgument).getStringLiteral().toUpperCase();
boolean isSingleValue = !resultsType.endsWith("_ARRAY");
// TODO: will support array type; the underlying jsonIndexReader.getMatchingDocsMap supports the json path [*]
if (!isSingleValue) {
throw new IllegalArgumentException("jsonExtractIndex only supports single value type");
}
if (isSingleValue && inputJsonPath.contains("[*]")) {
throw new IllegalArgumentException("[*] syntax in json path is unsupported as json_extract_index"
+ "currently does not support returning array types");
throw new IllegalArgumentException(
"[*] syntax in json path is unsupported for singleValue field json_extract_index");
}
DataType dataType = DataType.valueOf(resultsType);
DataType dataType = isSingleValue ? DataType.valueOf(resultsType)
: DataType.valueOf(resultsType.substring(0, resultsType.length() - 6));

if (arguments.size() == 4) {
TransformFunction fourthArgument = arguments.get(3);
Expand All @@ -108,8 +105,12 @@ public void init(List<TransformFunction> arguments, Map<String, ColumnContext> c
_defaultValue = dataType.convert(((LiteralTransformFunction) fourthArgument).getStringLiteral());
}

_resultMetadata = new TransformResultMetadata(dataType, true, false);
_matchingDocsMap = _jsonIndexReader.getMatchingDocsMap(_jsonPathString);
_resultMetadata = new TransformResultMetadata(dataType, isSingleValue, false);
_valueToMatchingDocsMap = _jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString);
if (isSingleValue) {
// For single value result type, it's more efficient to use original docIDs map
_jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
}
}

@Override
Expand All @@ -122,8 +123,8 @@ public int[] transformToIntValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initIntValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[inputDocIds[i]];
if (value == null) {
Expand All @@ -144,8 +145,8 @@ public long[] transformToLongValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initLongValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -166,8 +167,8 @@ public float[] transformToFloatValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initFloatValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -188,8 +189,8 @@ public double[] transformToDoubleValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initDoubleValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -210,8 +211,8 @@ public BigDecimal[] transformToBigDecimalValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initBigDecimalValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -232,8 +233,8 @@ public String[] transformToStringValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initStringValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -251,26 +252,80 @@ public String[] transformToStringValuesSV(ValueBlock valueBlock) {

@Override
public int[][] transformToIntValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initIntValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);

for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_intValuesMV[i] = new int[value.length];
for (int j = 0; j < value.length; j++) {
_intValuesMV[i][j] = Integer.parseInt(value[j]);
}
}
return _intValuesMV;
}

@Override
public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initLongValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_longValuesMV[i] = new long[value.length];
for (int j = 0; j < value.length; j++) {
_longValuesMV[i][j] = Long.parseLong(value[j]);
}
}
return _longValuesMV;
}

@Override
public float[][] transformToFloatValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initFloatValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_floatValuesMV[i] = new float[value.length];
for (int j = 0; j < value.length; j++) {
_floatValuesMV[i][j] = Float.parseFloat(value[j]);
}
}
return _floatValuesMV;
}

@Override
public double[][] transformToDoubleValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initDoubleValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_doubleValuesMV[i] = new double[value.length];
for (int j = 0; j < value.length; j++) {
_doubleValuesMV[i][j] = Double.parseDouble(value[j]);
}
}
return _doubleValuesMV;
}

@Override
public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initStringValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_stringValuesMV[i] = new String[value.length];
System.arraycopy(value, 0, _stringValuesMV[i], 0, value.length);
}
return _stringValuesMV;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.operator.transform.function;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.math.BigDecimal;
import java.sql.Timestamp;
Expand Down Expand Up @@ -46,6 +48,7 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -155,10 +158,13 @@ public void setUp()
_stringSVValues[i] = df.format(_intSVValues[i] * RANDOM.nextDouble());
_jsonSVValues[i] = String.format(
"{\"intVal\":%s, \"longVal\":%s, \"floatVal\":%s, \"doubleVal\":%s, \"bigDecimalVal\":%s, "
+ "\"stringVal\":\"%s\", "
+ "\"stringVal\":\"%s\", \"arrayField\": [{\"arrIntField\": 1, \"arrStringField\": \"abc\"}, "
+ "{\"arrIntField\": 2, \"arrStringField\": \"xyz\"},"
+ "{\"arrIntField\": 5, \"arrStringField\": \"wxy\"},"
+ "{\"arrIntField\": 0}], "
+ "\"intVals\":[0,1], \"longVals\":[0,1], \"floatVals\":[0.0,1.0], \"doubleVals\":[0.0,1.0], "
+ "\"bigDecimalVals\":[0.0,1.0], \"stringVals\":[\"0\",\"1\"]}",
RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), RANDOM.nextDouble(),
RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), RANDOM.nextDouble(),
BigDecimal.valueOf(RANDOM.nextDouble()).multiply(BigDecimal.valueOf(RANDOM.nextInt())),
df.format(RANDOM.nextInt() * RANDOM.nextDouble()));
_stringAlphaNumericSVValues[i] = RandomStringUtils.randomAlphanumeric(26);
Expand Down Expand Up @@ -276,7 +282,7 @@ public void setUp()
.addSingleValueDimension(DOUBLE_SV_COLUMN, FieldSpec.DataType.DOUBLE)
.addMetric(BIG_DECIMAL_SV_COLUMN, FieldSpec.DataType.BIG_DECIMAL)
.addSingleValueDimension(STRING_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(JSON_STRING_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(JSON_STRING_SV_COLUMN, FieldSpec.DataType.STRING, 5000, "{}")
.addSingleValueDimension(STRING_SV_NULL_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_ALPHANUM_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_ALPHANUM_NULL_SV_COLUMN, FieldSpec.DataType.STRING)
Expand All @@ -303,10 +309,19 @@ public void setUp()
.addDateTime(TIMESTAMP_COLUMN, FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.addDateTime(TIMESTAMP_COLUMN_NULL, FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();

List<FieldConfig> fieldConfigList = new ArrayList<>();
ObjectNode jsonIndexProps = JsonNodeFactory.instance.objectNode();
jsonIndexProps.put("disableCrossArrayUnnest", true);
ObjectNode indexNode = JsonNodeFactory.instance.objectNode();
indexNode.put("json", jsonIndexProps);
FieldConfig jsonFieldConfig =
new FieldConfig(JSON_STRING_SV_COLUMN, FieldConfig.EncodingType.DICTIONARY, null, null, null, null, indexNode,
null, null);
fieldConfigList.add(jsonFieldConfig);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN)
.setJsonIndexColumns(List.of(JSON_STRING_SV_COLUMN))
.setNullHandlingEnabled(true).build();
.setFieldConfigList(fieldConfigList).setNullHandlingEnabled(true).build();

SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR_PATH);
Expand Down
Loading

0 comments on commit ddc3d0b

Please sign in to comment.