diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java index 4dbbf6c07d28..27a31b5a038b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java @@ -39,6 +39,11 @@ * the excluded paths will also be excluded, e.g. "$.a.b.c" will be excluded when "$.a.b" is configured * to be excluded. * - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths. + * - indexPaths: Index the given paths, e.g. "*.*", "a.**". Paths matches the indexed paths will be indexed. + * This config could work together with other configs, e.g. includePaths, excludePaths, maxLevels but + * usually does not have to because it should be flexible enough to config any scenarios. By default, it + * is working as "**" this is to allow everything. + * Check {@link org.apache.pinot.spi.utils.JsonSchemaTreeNode} for more details. * - maxValueLength: Exclude field values which are longer than this length. A value of "0" disables this filter. * Excluded values will be replaced with JsonUtils.SKIPPED_VALUE_REPLACEMENT. * - skipInvalidJson: If the raw data is not a valid json string, then replace with {"":SKIPPED_VALUE_REPLACEMENT} @@ -54,6 +59,7 @@ public class JsonIndexConfig extends IndexConfig { private Set _includePaths; private Set _excludePaths; private Set _excludeFields; + private Set _indexPaths; private int _maxValueLength = 0; private boolean _skipInvalidJson = false; @@ -72,6 +78,7 @@ public JsonIndexConfig(@JsonProperty("disabled") Boolean disabled, @JsonProperty @JsonProperty("includePaths") @Nullable Set includePaths, @JsonProperty("excludePaths") @Nullable Set excludePaths, @JsonProperty("excludeFields") @Nullable Set excludeFields, + @JsonProperty("indexPaths") @Nullable Set indexPaths, @JsonProperty("maxValueLength") int maxValueLength, @JsonProperty("skipInvalidJson") boolean skipInvalidJson) { super(disabled); @@ -81,6 +88,7 @@ public JsonIndexConfig(@JsonProperty("disabled") Boolean disabled, @JsonProperty _includePaths = includePaths; _excludePaths = excludePaths; _excludeFields = excludeFields; + _indexPaths = indexPaths; _maxValueLength = maxValueLength; _skipInvalidJson = skipInvalidJson; } @@ -141,6 +149,15 @@ public void setExcludeFields(@Nullable Set excludeFields) { _excludeFields = excludeFields; } + @Nullable + public Set getIndexPaths() { + return _indexPaths; + } + + public void setIndexPaths(@Nullable Set indexPaths) { + _indexPaths = indexPaths; + } + public int getMaxValueLength() { return _maxValueLength; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java index 3429d2ef7d08..2dd3bedf39fc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java @@ -53,8 +53,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.spi.config.table.JsonIndexConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; @@ -73,6 +75,7 @@ private JsonUtils() { // For flattening public static final String VALUE_KEY = ""; public static final String KEY_SEPARATOR = "."; + public static final String GLOBAL_WILDCARD = "**"; // represent all the fields in the current or below levels public static final String ARRAY_PATH = "[*]"; public static final String ARRAY_INDEX_KEY = ".$index"; public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$"; @@ -83,6 +86,7 @@ private JsonUtils() { // For querying public static final String WILDCARD = "*"; + // NOTE: Do not expose the ObjectMapper to prevent configuration change private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader(); @@ -366,7 +370,7 @@ private static Object extractSingleValue(JsonNode jsonValue, DataType dataType) */ protected static List> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) { try { - return flatten(node, jsonIndexConfig, 0, "$", false); + return flatten(node, jsonIndexConfig, 0, "$", false, createTree(jsonIndexConfig)); } catch (OutOfMemoryError oom) { throw new OutOfMemoryError( String.format("Flattening JSON node: %s with config: %s requires too much memory, please adjust the config", @@ -378,13 +382,9 @@ protected static List> flatten(JsonNode node, JsonIndexConfi } private static List> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig, int level, - String path, boolean includePathMatched) { + String path, boolean includePathMatched, JsonSchemaTreeNode indexPathNode) { // Null - if (node.isNull()) { - return Collections.emptyList(); - } - - if (node.isMissingNode()) { + if (node.isNull() || node.isMissingNode() || indexPathNode == null) { return Collections.emptyList(); } @@ -426,7 +426,8 @@ private static List> flatten(JsonNode node, JsonIndexConfig JsonNode childNode = node.get(i); String arrayIndexValue = Integer.toString(i); List> childResults = - flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched); + flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched, + indexPathNode.getChild("")); for (Map childResult : childResults) { Map result = new TreeMap<>(); for (Map.Entry entry : childResult.entrySet()) { @@ -463,7 +464,8 @@ private static List> flatten(JsonNode node, JsonIndexConfig } JsonNode childNode = fieldEntry.getValue(); List> childResults = - flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched); + flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched, + indexPathNode.getChild(field)); int numChildResults = childResults.size(); // Empty list - skip @@ -746,4 +748,118 @@ public static List> flatten(String jsonString, JsonIndexConf } return JsonUtils.flatten(jsonNode, jsonIndexConfig); } + + /** + * Generates the JsonSchemaTreeNode tree from the given json index config indexPaths to represent which path we + * should flatten/index in the json record. + * @param jsonIndexConfig + * @return the root node of the json index paths tree + * @throws IllegalArgumentException + */ + private static JsonSchemaTreeNode createTree(@Nonnull JsonIndexConfig jsonIndexConfig) + throws IllegalArgumentException { + Set indexPaths = jsonIndexConfig.getIndexPaths(); + JsonSchemaTreeNode rootNode = new JsonSchemaTreeNode(""); + // if no index paths are provided, return a global wildcard node + if (indexPaths == null || indexPaths.isEmpty()) { + rootNode.getAndCreateChild(GLOBAL_WILDCARD); + return rootNode; + } + + for (String indexPath : indexPaths) { + String[] paths = StringUtils.splitPreserveAllTokens(indexPath, KEY_SEPARATOR); + JsonSchemaTreeNode currentNode = rootNode; + for (String key : paths) { + currentNode = currentNode.getAndCreateChild(key); + if (GLOBAL_WILDCARD.equals(key)) { + break; + } + } + } + + return rootNode; + } +} + +/** + * JsonSchemaTreeNode represents the tree node when we construct the json schema tree. + * This tree is used to represent how we want to flatten/index the json according to the {@link JsonIndexConfig}. + * The node could be either leaf node or non-leaf node. Both types of node could hold the volume to indicate whether + * we should flatten/index the json at this node. + * For example, the config with *.*, a.b.*.*, a.b.x, a.b.c.**, a.b.d.*.*, e.f.g will have the following tree + * structure: + * root -- * -- * + * -- a -- b -- * -- * + * -- c -- ** + * -- d -- * -- * + * -- e -- f -- g + * The path structure is defined as: + * each key is separated by '.' + * the key without wildcard represents single value field + * the key "*" represents any types of single node + * the key "**" represents any types of leaf node OR a subtree with all its children + * When multiple conditions are matched, e.g. a.b.c, it would match with priority: + * 1. ** + * 2. exact match (either single value or array value) + * 3. * + */ +class JsonSchemaTreeNode { + private Map _children; + private JsonSchemaTreeNode _gloabalWildcardChild; + private JsonSchemaTreeNode _wildcardChild; + private String _key; + + public JsonSchemaTreeNode(String key) { + _key = key; + _children = new HashMap<>(); + } + + /** + * If does not have the child node, add a child node to the current node and return the child node. + * If the child node already exists, return the existing child node. + * @param key + * @return child + */ + public JsonSchemaTreeNode getAndCreateChild(String key) { + // if .** is already added, no need to add any child + if (_gloabalWildcardChild != null) { + return _gloabalWildcardChild; + } + switch (key) { + case JsonUtils.GLOBAL_WILDCARD: + if (_gloabalWildcardChild == null) { + _gloabalWildcardChild = new JsonSchemaTreeNode(key); + } + return _gloabalWildcardChild; + case JsonUtils.WILDCARD: + if (_wildcardChild == null) { + _wildcardChild = new JsonSchemaTreeNode(key); + } + return _wildcardChild; + default: + JsonSchemaTreeNode child = _children.get(key); + if (child == null) { + child = new JsonSchemaTreeNode(key); + _children.put(key, child); + } + return child; + } + } + + @Nullable + public JsonSchemaTreeNode getChild(String key) { + if (JsonUtils.GLOBAL_WILDCARD.equals(_key)) { + return this; + } + if (_gloabalWildcardChild != null) { + return _gloabalWildcardChild; + } + if (_children.containsKey(key)) { + return _children.get(key); + } + if (_wildcardChild != null) { + return _wildcardChild; + } + return null; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java index cf542926d396..fafa5f93a297 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java @@ -641,4 +641,127 @@ public void testEmptyString() List> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); assertTrue(flattenedRecords.isEmpty()); } + + @Test + public void testFlattenWithIndexPaths() + throws IOException { + { + /* input json + [ + { + "country": "us", + "street": "main st", + "number": 1 + }, + { + "country": "ca", + "street": "second st" + "number": 2 + } + ] + */ + JsonNode jsonNode = JsonUtils.stringToJsonNode( + "[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},{\"country\":\"ca\",\"street\":\"second st\"," + + "\"number\":2}]"); + JsonIndexConfig jsonIndexConfig = new JsonIndexConfig(); + List> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + + // flatten everything within 2 layers + jsonIndexConfig.setIndexPaths(Collections.singleton("*.*")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + + // flatten "a." prefix till 2 layers + jsonIndexConfig.setIndexPaths(Collections.singleton("a.*")); + flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + assertTrue(flattenedRecords.isEmpty()); + } + { + /* input json + { + "name": "adam", + "addresses": [ + { + "country": "us", + "street": "main st", + "number": 1 + }, + { + "country": "ca", + "street": "second st" + "number": 2 + } + ] + } + */ + JsonNode jsonNode = JsonUtils.stringToJsonNode( + "{\"name\":\"adam\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1}," + + "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}]}"); + JsonIndexConfig jsonIndexConfig = new JsonIndexConfig(); + List> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + + // flatten everything + jsonIndexConfig.setIndexPaths(Collections.singleton("**")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + + // flatten everything within 3 layers + jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.*")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + + // flatten "name" 1 layer and "addresses" infinite layers + jsonIndexConfig.setIndexPaths(ImmutableSet.of("name", "addresses.**")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + + // flatten everything within 2 layers + jsonIndexConfig.setIndexPaths(Collections.singleton("*.*")); + flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + assertEquals(flattenedRecords.size(), 1); + assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "adam")); + + // flatten "name." prefix with infinite layers + jsonIndexConfig.setIndexPaths(Collections.singleton("name.**")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + } + { + /* input json + { + "name": "charles", + "addresses": [ + { + "country": "us", + "street": "main st", + "types": ["home", "office"] + }, + { + "country": "ca", + "street": "second st" + } + ] + } + */ + JsonNode jsonNode = JsonUtils.stringToJsonNode( + "{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\"," + + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}"); + JsonIndexConfig jsonIndexConfig = new JsonIndexConfig(); + List> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + + // flatten everything + jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.**")); + assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords); + + // flatten addresses array with one more layer + jsonIndexConfig.setIndexPaths(Collections.singleton("addresses..*")); + flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig); + assertEquals(flattenedRecords.size(), 2); + Map flattenedRecord0 = flattenedRecords.get(0); + assertEquals(flattenedRecord0.size(), 3); + assertEquals(flattenedRecord0.get(".addresses.$index"), "0"); + assertEquals(flattenedRecord0.get(".addresses..country"), "us"); + assertEquals(flattenedRecord0.get(".addresses..street"), "main st"); + Map flattenedRecord1 = flattenedRecords.get(1); + assertEquals(flattenedRecord1.size(), 3); + assertEquals(flattenedRecord1.get(".addresses.$index"), "1"); + assertEquals(flattenedRecord1.get(".addresses..country"), "ca"); + assertEquals(flattenedRecord1.get(".addresses..street"), "second st"); + } + } }