Skip to content

Commit

Permalink
Add a flexible json config way (apache#14229)
Browse files Browse the repository at this point in the history
* Add a flexible json config way

* fix style

* Fix import

* rerun

* rerun

* Add more comments in UT

* rerun

* rerun
  • Loading branch information
lnbest0707-uber authored Nov 23, 2024
1 parent f0c6bba commit ca78729
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
* the excluded paths will also be excluded, e.g. "$.a.b.c" will be excluded when "$.a.b" is configured
* to be excluded.
* - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths.
* - indexPaths: Index the given paths, e.g. "*.*", "a.**". Paths matches the indexed paths will be indexed.
* This config could work together with other configs, e.g. includePaths, excludePaths, maxLevels but
* usually does not have to because it should be flexible enough to config any scenarios. By default, it
* is working as "**" this is to allow everything.
* Check {@link org.apache.pinot.spi.utils.JsonSchemaTreeNode} for more details.
* - maxValueLength: Exclude field values which are longer than this length. A value of "0" disables this filter.
* Excluded values will be replaced with JsonUtils.SKIPPED_VALUE_REPLACEMENT.
* - skipInvalidJson: If the raw data is not a valid json string, then replace with {"":SKIPPED_VALUE_REPLACEMENT}
Expand All @@ -54,6 +59,7 @@ public class JsonIndexConfig extends IndexConfig {
private Set<String> _includePaths;
private Set<String> _excludePaths;
private Set<String> _excludeFields;
private Set<String> _indexPaths;
private int _maxValueLength = 0;
private boolean _skipInvalidJson = false;

Expand All @@ -72,6 +78,7 @@ public JsonIndexConfig(@JsonProperty("disabled") Boolean disabled, @JsonProperty
@JsonProperty("includePaths") @Nullable Set<String> includePaths,
@JsonProperty("excludePaths") @Nullable Set<String> excludePaths,
@JsonProperty("excludeFields") @Nullable Set<String> excludeFields,
@JsonProperty("indexPaths") @Nullable Set<String> indexPaths,
@JsonProperty("maxValueLength") int maxValueLength,
@JsonProperty("skipInvalidJson") boolean skipInvalidJson) {
super(disabled);
Expand All @@ -81,6 +88,7 @@ public JsonIndexConfig(@JsonProperty("disabled") Boolean disabled, @JsonProperty
_includePaths = includePaths;
_excludePaths = excludePaths;
_excludeFields = excludeFields;
_indexPaths = indexPaths;
_maxValueLength = maxValueLength;
_skipInvalidJson = skipInvalidJson;
}
Expand Down Expand Up @@ -141,6 +149,15 @@ public void setExcludeFields(@Nullable Set<String> excludeFields) {
_excludeFields = excludeFields;
}

@Nullable
public Set<String> getIndexPaths() {
return _indexPaths;
}

public void setIndexPaths(@Nullable Set<String> indexPaths) {
_indexPaths = indexPaths;
}

public int getMaxValueLength() {
return _maxValueLength;
}
Expand Down
134 changes: 125 additions & 9 deletions pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand All @@ -73,6 +75,7 @@ private JsonUtils() {
// For flattening
public static final String VALUE_KEY = "";
public static final String KEY_SEPARATOR = ".";
public static final String GLOBAL_WILDCARD = "**"; // represent all the fields in the current or below levels
public static final String ARRAY_PATH = "[*]";
public static final String ARRAY_INDEX_KEY = ".$index";
public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
Expand All @@ -83,6 +86,7 @@ private JsonUtils() {
// For querying
public static final String WILDCARD = "*";


// NOTE: Do not expose the ObjectMapper to prevent configuration change
private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader();
Expand Down Expand Up @@ -366,7 +370,7 @@ private static Object extractSingleValue(JsonNode jsonValue, DataType dataType)
*/
protected static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
try {
return flatten(node, jsonIndexConfig, 0, "$", false);
return flatten(node, jsonIndexConfig, 0, "$", false, createTree(jsonIndexConfig));
} catch (OutOfMemoryError oom) {
throw new OutOfMemoryError(
String.format("Flattening JSON node: %s with config: %s requires too much memory, please adjust the config",
Expand All @@ -378,13 +382,9 @@ protected static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfi
}

private static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig, int level,
String path, boolean includePathMatched) {
String path, boolean includePathMatched, JsonSchemaTreeNode indexPathNode) {
// Null
if (node.isNull()) {
return Collections.emptyList();
}

if (node.isMissingNode()) {
if (node.isNull() || node.isMissingNode() || indexPathNode == null) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -426,7 +426,8 @@ private static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig
JsonNode childNode = node.get(i);
String arrayIndexValue = Integer.toString(i);
List<Map<String, String>> childResults =
flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched);
flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched,
indexPathNode.getChild(""));
for (Map<String, String> childResult : childResults) {
Map<String, String> result = new TreeMap<>();
for (Map.Entry<String, String> entry : childResult.entrySet()) {
Expand Down Expand Up @@ -463,7 +464,8 @@ private static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig
}
JsonNode childNode = fieldEntry.getValue();
List<Map<String, String>> childResults =
flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched);
flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched,
indexPathNode.getChild(field));
int numChildResults = childResults.size();

// Empty list - skip
Expand Down Expand Up @@ -746,4 +748,118 @@ public static List<Map<String, String>> flatten(String jsonString, JsonIndexConf
}
return JsonUtils.flatten(jsonNode, jsonIndexConfig);
}

/**
* Generates the JsonSchemaTreeNode tree from the given json index config indexPaths to represent which path we
* should flatten/index in the json record.
* @param jsonIndexConfig
* @return the root node of the json index paths tree
* @throws IllegalArgumentException
*/
private static JsonSchemaTreeNode createTree(@Nonnull JsonIndexConfig jsonIndexConfig)
throws IllegalArgumentException {
Set<String> indexPaths = jsonIndexConfig.getIndexPaths();
JsonSchemaTreeNode rootNode = new JsonSchemaTreeNode("");
// if no index paths are provided, return a global wildcard node
if (indexPaths == null || indexPaths.isEmpty()) {
rootNode.getAndCreateChild(GLOBAL_WILDCARD);
return rootNode;
}

for (String indexPath : indexPaths) {
String[] paths = StringUtils.splitPreserveAllTokens(indexPath, KEY_SEPARATOR);
JsonSchemaTreeNode currentNode = rootNode;
for (String key : paths) {
currentNode = currentNode.getAndCreateChild(key);
if (GLOBAL_WILDCARD.equals(key)) {
break;
}
}
}

return rootNode;
}
}

/**
* JsonSchemaTreeNode represents the tree node when we construct the json schema tree.
* This tree is used to represent how we want to flatten/index the json according to the {@link JsonIndexConfig}.
* The node could be either leaf node or non-leaf node. Both types of node could hold the volume to indicate whether
* we should flatten/index the json at this node.
* For example, the config with *.*, a.b.*.*, a.b.x, a.b.c.**, a.b.d.*.*, e.f.g will have the following tree
* structure:
* root -- * -- *
* -- a -- b -- * -- *
* -- c -- **
* -- d -- * -- *
* -- e -- f -- g
* The path structure is defined as:
* each key is separated by '.'
* the key without wildcard represents single value field
* the key "*" represents any types of single node
* the key "**" represents any types of leaf node OR a subtree with all its children
* When multiple conditions are matched, e.g. a.b.c, it would match with priority:
* 1. **
* 2. exact match (either single value or array value)
* 3. *
*/
class JsonSchemaTreeNode {
private Map<String, JsonSchemaTreeNode> _children;
private JsonSchemaTreeNode _gloabalWildcardChild;
private JsonSchemaTreeNode _wildcardChild;
private String _key;

public JsonSchemaTreeNode(String key) {
_key = key;
_children = new HashMap<>();
}

/**
* If does not have the child node, add a child node to the current node and return the child node.
* If the child node already exists, return the existing child node.
* @param key
* @return child
*/
public JsonSchemaTreeNode getAndCreateChild(String key) {
// if .** is already added, no need to add any child
if (_gloabalWildcardChild != null) {
return _gloabalWildcardChild;
}
switch (key) {
case JsonUtils.GLOBAL_WILDCARD:
if (_gloabalWildcardChild == null) {
_gloabalWildcardChild = new JsonSchemaTreeNode(key);
}
return _gloabalWildcardChild;
case JsonUtils.WILDCARD:
if (_wildcardChild == null) {
_wildcardChild = new JsonSchemaTreeNode(key);
}
return _wildcardChild;
default:
JsonSchemaTreeNode child = _children.get(key);
if (child == null) {
child = new JsonSchemaTreeNode(key);
_children.put(key, child);
}
return child;
}
}

@Nullable
public JsonSchemaTreeNode getChild(String key) {
if (JsonUtils.GLOBAL_WILDCARD.equals(_key)) {
return this;
}
if (_gloabalWildcardChild != null) {
return _gloabalWildcardChild;
}
if (_children.containsKey(key)) {
return _children.get(key);
}
if (_wildcardChild != null) {
return _wildcardChild;
}
return null;
}
}
123 changes: 123 additions & 0 deletions pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,4 +641,127 @@ public void testEmptyString()
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
assertTrue(flattenedRecords.isEmpty());
}

@Test
public void testFlattenWithIndexPaths()
throws IOException {
{
/* input json
[
{
"country": "us",
"street": "main st",
"number": 1
},
{
"country": "ca",
"street": "second st"
"number": 2
}
]
*/
JsonNode jsonNode = JsonUtils.stringToJsonNode(
"[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},{\"country\":\"ca\",\"street\":\"second st\","
+ "\"number\":2}]");
JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);

// flatten everything within 2 layers
jsonIndexConfig.setIndexPaths(Collections.singleton("*.*"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);

// flatten "a." prefix till 2 layers
jsonIndexConfig.setIndexPaths(Collections.singleton("a.*"));
flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
assertTrue(flattenedRecords.isEmpty());
}
{
/* input json
{
"name": "adam",
"addresses": [
{
"country": "us",
"street": "main st",
"number": 1
},
{
"country": "ca",
"street": "second st"
"number": 2
}
]
}
*/
JsonNode jsonNode = JsonUtils.stringToJsonNode(
"{\"name\":\"adam\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},"
+ "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}]}");
JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);

// flatten everything
jsonIndexConfig.setIndexPaths(Collections.singleton("**"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);

// flatten everything within 3 layers
jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.*"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);

// flatten "name" 1 layer and "addresses" infinite layers
jsonIndexConfig.setIndexPaths(ImmutableSet.of("name", "addresses.**"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);

// flatten everything within 2 layers
jsonIndexConfig.setIndexPaths(Collections.singleton("*.*"));
flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
assertEquals(flattenedRecords.size(), 1);
assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "adam"));

// flatten "name." prefix with infinite layers
jsonIndexConfig.setIndexPaths(Collections.singleton("name.**"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
}
{
/* input json
{
"name": "charles",
"addresses": [
{
"country": "us",
"street": "main st",
"types": ["home", "office"]
},
{
"country": "ca",
"street": "second st"
}
]
}
*/
JsonNode jsonNode = JsonUtils.stringToJsonNode(
"{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\","
+ "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}");
JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);

// flatten everything
jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.**"));
assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);

// flatten addresses array with one more layer
jsonIndexConfig.setIndexPaths(Collections.singleton("addresses..*"));
flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
assertEquals(flattenedRecords.size(), 2);
Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
assertEquals(flattenedRecord0.size(), 3);
assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
assertEquals(flattenedRecord0.get(".addresses..country"), "us");
assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
assertEquals(flattenedRecord1.size(), 3);
assertEquals(flattenedRecord1.get(".addresses.$index"), "1");
assertEquals(flattenedRecord1.get(".addresses..country"), "ca");
assertEquals(flattenedRecord1.get(".addresses..street"), "second st");
}
}
}

0 comments on commit ca78729

Please sign in to comment.