Skip to content

Commit

Permalink
Merge branch 'main' into 13531-range-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Jun 18, 2024
2 parents 48a03a4 + f8213b8 commit cb8cbbf
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 64 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.gradle.develocity` from 3.17.4 to 3.17.5 ([#14397](https://github.com/opensearch-project/OpenSearch/pull/14397))

### Changed
- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568))

### Deprecated

### Removed

### Fixed
- Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379))
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.avast.gradle.dockercompose.ComposeExtension;
import com.avast.gradle.dockercompose.DockerComposePlugin;
import com.avast.gradle.dockercompose.ServiceInfo;
import com.avast.gradle.dockercompose.tasks.ComposeBuild;
import com.avast.gradle.dockercompose.tasks.ComposeDown;
import com.avast.gradle.dockercompose.tasks.ComposePull;
import com.avast.gradle.dockercompose.tasks.ComposeUp;
Expand Down Expand Up @@ -200,6 +201,7 @@ public void execute(Task task) {
maybeSkipTasks(tasks, dockerSupport, getTaskClass("org.opensearch.gradle.test.RestIntegTestTask"));
maybeSkipTasks(tasks, dockerSupport, TestingConventionsTasks.class);
maybeSkipTasks(tasks, dockerSupport, getTaskClass("org.opensearch.gradle.test.AntFixture"));
maybeSkipTasks(tasks, dockerSupport, ComposeBuild.class);
maybeSkipTasks(tasks, dockerSupport, ComposeUp.class);
maybeSkipTasks(tasks, dockerSupport, ComposePull.class);
maybeSkipTasks(tasks, dockerSupport, ComposeDown.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import java.io.StreamTokenizer;
import java.io.StringReader;
import java.text.ParseException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Locale;

Expand All @@ -67,6 +69,7 @@ public class WellKnownText {
public static final String RPAREN = ")";
public static final String COMMA = ",";
public static final String NAN = "NaN";
public static final int MAX_DEPTH_OF_GEO_COLLECTION = 1000;

private final String NUMBER = "<NUMBER>";
private final String EOF = "END-OF-STREAM";
Expand Down Expand Up @@ -278,6 +281,16 @@ public Geometry fromWKT(String wkt) throws IOException, ParseException {
*/
private Geometry parseGeometry(StreamTokenizer stream) throws IOException, ParseException {
final String type = nextWord(stream).toLowerCase(Locale.ROOT);
switch (type) {
case "geometrycollection":
return parseGeometryCollection(stream);
default:
return parseSimpleGeometry(stream, type);
}
}

private Geometry parseSimpleGeometry(StreamTokenizer stream, String type) throws IOException, ParseException {
assert "geometrycollection".equals(type) == false;
switch (type) {
case "point":
return parsePoint(stream);
Expand All @@ -294,7 +307,7 @@ private Geometry parseGeometry(StreamTokenizer stream) throws IOException, Parse
case "bbox":
return parseBBox(stream);
case "geometrycollection":
return parseGeometryCollection(stream);
throw new IllegalStateException("Unexpected type: geometrycollection");
case "circle": // Not part of the standard, but we need it for internal serialization
return parseCircle(stream);
}
Expand All @@ -305,12 +318,56 @@ private GeometryCollection<Geometry> parseGeometryCollection(StreamTokenizer str
if (nextEmptyOrOpen(stream).equals(EMPTY)) {
return GeometryCollection.EMPTY;
}
List<Geometry> shapes = new ArrayList<>();
shapes.add(parseGeometry(stream));
while (nextCloserOrComma(stream).equals(COMMA)) {
shapes.add(parseGeometry(stream));

List<Geometry> topLevelShapes = new ArrayList<>();
Deque<List<Geometry>> deque = new ArrayDeque<>();
deque.push(topLevelShapes);
boolean isFirstIteration = true;
List<Geometry> currentLevelShapes = null;
while (!deque.isEmpty()) {
List<Geometry> previousShapes = deque.pop();
if (currentLevelShapes != null) {
previousShapes.add(new GeometryCollection<>(currentLevelShapes));
}
currentLevelShapes = previousShapes;

if (isFirstIteration == true) {
isFirstIteration = false;
} else {
if (nextCloserOrComma(stream).equals(COMMA) == false) {
// Done with current level, continue with parent level
continue;
}
}
while (true) {
final String type = nextWord(stream).toLowerCase(Locale.ROOT);
if (type.equals("geometrycollection")) {
if (nextEmptyOrOpen(stream).equals(EMPTY) == false) {
// GEOMETRYCOLLECTION() -> 1 depth, GEOMETRYCOLLECTION(GEOMETRYCOLLECTION()) -> 2 depth
// When parsing the top level geometry collection, the queue size is zero.
// When max depth is 1, we don't want to push any sub geometry collection in the queue.
// Therefore, we subtract 2 from max depth.
if (deque.size() >= MAX_DEPTH_OF_GEO_COLLECTION - 2) {
throw new IllegalArgumentException(
"a geometry collection with a depth greater than " + MAX_DEPTH_OF_GEO_COLLECTION + " is not supported"
);
}
deque.push(currentLevelShapes);
currentLevelShapes = new ArrayList<>();
continue;
}
currentLevelShapes.add(GeometryCollection.EMPTY);
} else {
currentLevelShapes.add(parseSimpleGeometry(stream, type));
}

if (nextCloserOrComma(stream).equals(COMMA) == false) {
break;
}
}
}
return new GeometryCollection<>(shapes);

return new GeometryCollection<>(topLevelShapes);
}

private Point parsePoint(StreamTokenizer stream) throws IOException, ParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void testBasicSerialization() throws IOException, ParseException {

assertEquals("GEOMETRYCOLLECTION EMPTY", wkt.toWKT(GeometryCollection.EMPTY));
assertEquals(GeometryCollection.EMPTY, wkt.fromWKT("GEOMETRYCOLLECTION EMPTY)"));

assertEquals(
new GeometryCollection<Geometry>(Arrays.asList(GeometryCollection.EMPTY)),
wkt.fromWKT("GEOMETRYCOLLECTION (GEOMETRYCOLLECTION EMPTY)")
);
}

@SuppressWarnings("ConstantConditions")
Expand All @@ -86,4 +91,29 @@ public void testInitValidation() {

new StandardValidator(true).validate(new GeometryCollection<Geometry>(Collections.singletonList(new Point(20, 10, 30))));
}

public void testDeeplyNestedGeometryCollection() throws IOException, ParseException {
WellKnownText wkt = new WellKnownText(true, new GeographyValidator(true));
StringBuilder validGeometryCollectionHead = new StringBuilder("GEOMETRYCOLLECTION");
StringBuilder validGeometryCollectionTail = new StringBuilder(" EMPTY");
for (int i = 0; i < WellKnownText.MAX_DEPTH_OF_GEO_COLLECTION - 1; i++) {
validGeometryCollectionHead.append(" (GEOMETRYCOLLECTION");
validGeometryCollectionTail.append(")");
}
// Expect no exception
wkt.fromWKT(validGeometryCollectionHead.append(validGeometryCollectionTail).toString());

StringBuilder invalidGeometryCollectionHead = new StringBuilder("GEOMETRYCOLLECTION");
StringBuilder invalidGeometryCollectionTail = new StringBuilder(" EMPTY");
for (int i = 0; i < WellKnownText.MAX_DEPTH_OF_GEO_COLLECTION; i++) {
invalidGeometryCollectionHead.append(" (GEOMETRYCOLLECTION");
invalidGeometryCollectionTail.append(")");
}

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> wkt.fromWKT(invalidGeometryCollectionHead.append(invalidGeometryCollectionTail).toString())
);
assertEquals("a geometry collection with a depth greater than 1000 is not supported", ex.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
Expand Down Expand Up @@ -101,6 +100,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -883,17 +884,20 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
assertEquals(YELLOW, health.getStatus());
assertEquals(2, health.getUnassignedShards());
// shard should be unassigned because of Allocation_Delayed
ClusterAllocationExplainResponse allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
BooleanSupplier delayedShardAllocationStatusVerificationSupplier = () -> AllocationDecision.ALLOCATION_DELAYED.equals(
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);
waitUntil(delayedShardAllocationStatusVerificationSupplier, 2, TimeUnit.MINUTES);

logger.info("--> restarting the node 1");
internalCluster().startDataOnlyNode(
Expand All @@ -903,26 +907,16 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
assertTrue(clusterRerouteResponse.isAcknowledged());
ensureStableCluster(6);
waitUntil(
() -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0
() -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getActiveShards() == 3,
2,
TimeUnit.MINUTES
);

health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(YELLOW, health.getStatus());
assertEquals(1, health.getUnassignedShards());
assertEquals(1, health.getDelayedUnassignedShards());
allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
);

waitUntil(delayedShardAllocationStatusVerificationSupplier, 2, TimeUnit.MINUTES);
logger.info("--> restarting the node 0");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void setup() throws Exception {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.BeforeClass;
Expand All @@ -79,6 +79,7 @@
import static org.opensearch.index.query.QueryBuilders.simpleQueryStringQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING;
import static org.opensearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures;
Expand Down Expand Up @@ -122,7 +123,7 @@ public static void createRandomClusterSetting() {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.build();
}

Expand Down Expand Up @@ -720,6 +721,52 @@ public void testFieldAliasOnDisallowedFieldType() throws Exception {
assertHits(response.getHits(), "1");
}

public void testDynamicClauseCountUpdate() throws Exception {
client().prepareIndex("testdynamic").setId("1").setSource("field", "foo bar baz").get();
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT - 1))
);
refresh();
StringBuilder sb = new StringBuilder("foo");

// create clause_count + 1 clauses to hit error
for (int i = 0; i <= CLUSTER_MAX_CLAUSE_COUNT; i++) {
sb.append(" OR foo" + i);
}

QueryStringQueryBuilder qb = queryStringQuery(sb.toString()).field("field");

SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
client().prepareSearch("testdynamic").setQuery(qb).get();
});

assert (e.getDetailedMessage().contains("maxClauseCount is set to " + (CLUSTER_MAX_CLAUSE_COUNT - 1)));

// increase clause count by 2
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT + 2))
);

Thread.sleep(1);

SearchResponse response = client().prepareSearch("testdynamic").setQuery(qb).get();
assertHitCount(response, 1);
assertHits(response.getHits(), "1");

assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey()))
);
}

private void assertHits(SearchHits hits, String... ids) {
assertThat(hits.getTotalHits().value, equalTo((long) ids.length));
Set<String> hitIds = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
Expand Down Expand Up @@ -540,6 +539,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Expand Down Expand Up @@ -590,7 +590,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ResourceWatcherService.RELOAD_INTERVAL_HIGH,
ResourceWatcherService.RELOAD_INTERVAL_MEDIUM,
ResourceWatcherService.RELOAD_INTERVAL_LOW,
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
Expand Down
Loading

0 comments on commit cb8cbbf

Please sign in to comment.