-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Backport 2.x] Change INDEX_SEARCHER threadpool to auto queue to supp…
…ort task resource tracking (#7502) (cherry picked from commit 054cccd) Signed-off-by: Jay Deng <[email protected]>
- Loading branch information
Showing
7 changed files
with
388 additions
and
145 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
190 changes: 190 additions & 0 deletions
190
...ernalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/AbstractTasksIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.cluster.node.tasks; | ||
|
||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.ResourceNotFoundException; | ||
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.Strings; | ||
import org.opensearch.common.collect.Tuple; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.tasks.TaskId; | ||
import org.opensearch.tasks.TaskInfo; | ||
import org.opensearch.tasks.ThreadResourceInfo; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.opensearch.test.tasks.MockTaskManager; | ||
import org.opensearch.test.transport.MockTransportService; | ||
import org.opensearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Base IT test class for Tasks ITs | ||
*/ | ||
abstract class AbstractTasksIT extends OpenSearchIntegTestCase { | ||
|
||
protected Map<Tuple<String, String>, RecordingTaskManagerListener> listeners = new HashMap<>(); | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> getMockPlugins() { | ||
Collection<Class<? extends Plugin>> mockPlugins = new ArrayList<>(super.getMockPlugins()); | ||
mockPlugins.remove(MockTransportService.TestPlugin.class); | ||
return mockPlugins; | ||
} | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class); | ||
} | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
.put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public void tearDown() throws Exception { | ||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) { | ||
((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener( | ||
entry.getValue() | ||
); | ||
} | ||
listeners.clear(); | ||
super.tearDown(); | ||
} | ||
|
||
/** | ||
* Registers recording task event listeners with the given action mask on all nodes | ||
*/ | ||
protected void registerTaskManagerListeners(String actionMasks) { | ||
for (String nodeName : internalCluster().getNodeNames()) { | ||
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); | ||
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(",")); | ||
((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener); | ||
RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.getName(), actionMasks), listener); | ||
assertNull(oldListener); | ||
} | ||
} | ||
|
||
/** | ||
* Resets all recording task event listeners with the given action mask on all nodes | ||
*/ | ||
protected void resetTaskManagerListeners(String actionMasks) { | ||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) { | ||
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { | ||
entry.getValue().reset(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Returns the number of events that satisfy the criteria across all nodes | ||
* | ||
* @param actionMasks action masks to match | ||
* @return number of events that satisfy the criteria | ||
*/ | ||
protected int numberOfEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) { | ||
return findEvents(actionMasks, criteria).size(); | ||
} | ||
|
||
/** | ||
* Returns all events that satisfy the criteria across all nodes | ||
* | ||
* @param actionMasks action masks to match | ||
* @return number of events that satisfy the criteria | ||
*/ | ||
protected List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) { | ||
List<TaskInfo> events = new ArrayList<>(); | ||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) { | ||
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { | ||
for (Tuple<Boolean, TaskInfo> taskEvent : entry.getValue().getEvents()) { | ||
if (criteria.apply(taskEvent)) { | ||
events.add(taskEvent.v2()); | ||
} | ||
} | ||
} | ||
} | ||
return events; | ||
} | ||
|
||
protected Map<Long, List<ThreadResourceInfo>> getThreadStats(String actionMasks, TaskId taskId) { | ||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) { | ||
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { | ||
for (Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>> threadStats : entry.getValue().getThreadStats()) { | ||
if (taskId.equals(threadStats.v1())) { | ||
return threadStats.v2(); | ||
} | ||
} | ||
} | ||
} | ||
return new HashMap<>(); | ||
} | ||
|
||
/** | ||
* Asserts that all tasks in the tasks list have the same parentTask | ||
*/ | ||
protected void assertParentTask(List<TaskInfo> tasks, TaskInfo parentTask) { | ||
for (TaskInfo task : tasks) { | ||
assertParentTask(task, parentTask); | ||
} | ||
} | ||
|
||
protected void assertParentTask(TaskInfo task, TaskInfo parentTask) { | ||
assertTrue(task.getParentTaskId().isSet()); | ||
assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId()); | ||
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); | ||
assertEquals(parentTask.getId(), task.getParentTaskId().getId()); | ||
} | ||
|
||
protected void expectNotFound(ThrowingRunnable r) { | ||
Exception e = expectThrows(Exception.class, r); | ||
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); | ||
if (notFound == null) { | ||
throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e); | ||
} | ||
} | ||
|
||
/** | ||
* Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff | ||
* about the fetched task and returns a map of it's status. | ||
*/ | ||
protected GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException { | ||
GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get(); | ||
assertTrue("the task should have been completed before fetching", response.getTask().isCompleted()); | ||
TaskInfo info = response.getTask().getTask(); | ||
assertEquals(taskId, info.getTaskId()); | ||
assertNull(info.getStatus()); // The test task doesn't have any status | ||
return response; | ||
} | ||
|
||
protected void indexDocumentsWithRefresh(String indexName, int numDocs) { | ||
for (int i = 0; i < numDocs; i++) { | ||
client().prepareIndex(indexName) | ||
.setId("test_id_" + String.valueOf(i)) | ||
.setSource("{\"foo_" + String.valueOf(i) + "\": \"bar_" + String.valueOf(i) + "\"}", XContentType.JSON) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.get(); | ||
} | ||
} | ||
} |
118 changes: 118 additions & 0 deletions
118
...sterTest/java/org/opensearch/action/admin/cluster/node/tasks/ConcurrentSearchTasksIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.cluster.node.tasks; | ||
|
||
import org.hamcrest.MatcherAssert; | ||
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; | ||
import org.opensearch.action.search.SearchAction; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.collect.Tuple; | ||
import org.opensearch.common.settings.FeatureFlagSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.tasks.TaskInfo; | ||
import org.opensearch.tasks.ThreadResourceInfo; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; | ||
|
||
/** | ||
* Integration tests for task management API with Concurrent Segment Search | ||
* | ||
* The way the test framework bootstraps the test cluster makes it difficult to parameterize the feature flag. | ||
* Once concurrent search is moved behind a cluster setting we can parameterize these tests behind the setting. | ||
*/ | ||
public class ConcurrentSearchTasksIT extends AbstractTasksIT { | ||
|
||
private static final int INDEX_SEARCHER_THREADS = 10; | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
.put("thread_pool.index_searcher.size", INDEX_SEARCHER_THREADS) | ||
.put("thread_pool.index_searcher.queue_size", INDEX_SEARCHER_THREADS) | ||
.build(); | ||
} | ||
|
||
private int getSegmentCount(String indexName) { | ||
return client().admin() | ||
.indices() | ||
.segments(new IndicesSegmentsRequest(indexName)) | ||
.actionGet() | ||
.getIndices() | ||
.get(indexName) | ||
.getShards() | ||
.get(0) | ||
.getShards()[0].getSegments() | ||
.size(); | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { | ||
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); | ||
} | ||
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true); | ||
return featureSettings.build(); | ||
} | ||
|
||
/** | ||
* Tests the number of threads that worked on a search task. | ||
* | ||
* Currently, we try to control concurrency by creating an index with 7 segments and rely on | ||
* the way concurrent search creates leaf slices from segments. Once more concurrency controls are introduced | ||
* we should improve this test to use those methods. | ||
*/ | ||
public void testConcurrentSearchTaskTracking() { | ||
final String INDEX_NAME = "test"; | ||
final int NUM_SHARDS = 1; | ||
final int NUM_DOCS = 7; | ||
|
||
registerTaskManagerListeners(SearchAction.NAME); // coordinator task | ||
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task | ||
createIndex( | ||
INDEX_NAME, | ||
Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.build() | ||
); | ||
ensureGreen(INDEX_NAME); // Make sure all shards are allocated to catch replication tasks | ||
indexDocumentsWithRefresh(INDEX_NAME, NUM_DOCS); // Concurrent search requires >5 segments or >250,000 docs to have concurrency, so | ||
// we index 7 docs flushing between each to create new segments | ||
assertSearchResponse(client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get()); | ||
|
||
// the search operation should produce one coordinator task | ||
List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1); | ||
assertEquals(1, mainTask.size()); | ||
TaskInfo mainTaskInfo = mainTask.get(0); | ||
|
||
List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1); | ||
assertEquals(NUM_SHARDS, shardTasks.size()); // We should only have 1 shard search task per shard | ||
for (TaskInfo taskInfo : shardTasks) { | ||
MatcherAssert.assertThat(taskInfo.getParentTaskId(), notNullValue()); | ||
assertEquals(mainTaskInfo.getTaskId(), taskInfo.getParentTaskId()); | ||
|
||
Map<Long, List<ThreadResourceInfo>> threadStats = getThreadStats(SearchAction.NAME + "[*]", taskInfo.getTaskId()); | ||
// Concurrent search forks each slice of 5 segments to different thread | ||
assertEquals((int) Math.ceil(getSegmentCount(INDEX_NAME) / 5.0), threadStats.size()); | ||
|
||
// assert that all task descriptions have non-zero length | ||
MatcherAssert.assertThat(taskInfo.getDescription().length(), greaterThan(0)); | ||
} | ||
} | ||
} |
Oops, something went wrong.