Skip to content

Commit

Permalink
[Backport 2.x] Change INDEX_SEARCHER threadpool to auto queue to supp…
Browse files Browse the repository at this point in the history
…ort task resource tracking (opensearch-project#7502) (opensearch-project#7765)

(cherry picked from commit 054cccd)

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and gaiksaya committed Jun 26, 2023
1 parent 7e304d5 commit 0513859
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 145 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203))
- Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
- Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628))
- Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.plugins.Plugin;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.tasks.MockTaskManager;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Base IT test class for Tasks ITs
*/
abstract class AbstractTasksIT extends OpenSearchIntegTestCase {

protected Map<Tuple<String, String>, RecordingTaskManagerListener> listeners = new HashMap<>();

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
Collection<Class<? extends Plugin>> mockPlugins = new ArrayList<>(super.getMockPlugins());
mockPlugins.remove(MockTransportService.TestPlugin.class);
return mockPlugins;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true)
.build();
}

@Override
public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(
entry.getValue()
);
}
listeners.clear();
super.tearDown();
}

/**
* Registers recording task event listeners with the given action mask on all nodes
*/
protected void registerTaskManagerListeners(String actionMasks) {
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(","));
((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener);
RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.getName(), actionMasks), listener);
assertNull(oldListener);
}
}

/**
* Resets all recording task event listeners with the given action mask on all nodes
*/
protected void resetTaskManagerListeners(String actionMasks) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
entry.getValue().reset();
}
}
}

/**
* Returns the number of events that satisfy the criteria across all nodes
*
* @param actionMasks action masks to match
* @return number of events that satisfy the criteria
*/
protected int numberOfEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
return findEvents(actionMasks, criteria).size();
}

/**
* Returns all events that satisfy the criteria across all nodes
*
* @param actionMasks action masks to match
* @return number of events that satisfy the criteria
*/
protected List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
List<TaskInfo> events = new ArrayList<>();
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
for (Tuple<Boolean, TaskInfo> taskEvent : entry.getValue().getEvents()) {
if (criteria.apply(taskEvent)) {
events.add(taskEvent.v2());
}
}
}
}
return events;
}

protected Map<Long, List<ThreadResourceInfo>> getThreadStats(String actionMasks, TaskId taskId) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
for (Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>> threadStats : entry.getValue().getThreadStats()) {
if (taskId.equals(threadStats.v1())) {
return threadStats.v2();
}
}
}
}
return new HashMap<>();
}

/**
* Asserts that all tasks in the tasks list have the same parentTask
*/
protected void assertParentTask(List<TaskInfo> tasks, TaskInfo parentTask) {
for (TaskInfo task : tasks) {
assertParentTask(task, parentTask);
}
}

protected void assertParentTask(TaskInfo task, TaskInfo parentTask) {
assertTrue(task.getParentTaskId().isSet());
assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId());
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId()));
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}

protected void expectNotFound(ThrowingRunnable r) {
Exception e = expectThrows(Exception.class, r);
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class);
if (notFound == null) {
throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e);
}
}

/**
* Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff
* about the fetched task and returns a map of it's status.
*/
protected GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException {
GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get();
assertTrue("the task should have been completed before fetching", response.getTask().isCompleted());
TaskInfo info = response.getTask().getTask();
assertEquals(taskId, info.getTaskId());
assertNull(info.getStatus()); // The test task doesn't have any status
return response;
}

protected void indexDocumentsWithRefresh(String indexName, int numDocs) {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName)
.setId("test_id_" + String.valueOf(i))
.setSource("{\"foo_" + String.valueOf(i) + "\": \"bar_" + String.valueOf(i) + "\"}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.search.SearchAction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

/**
* Integration tests for task management API with Concurrent Segment Search
*
* The way the test framework bootstraps the test cluster makes it difficult to parameterize the feature flag.
* Once concurrent search is moved behind a cluster setting we can parameterize these tests behind the setting.
*/
public class ConcurrentSearchTasksIT extends AbstractTasksIT {

private static final int INDEX_SEARCHER_THREADS = 10;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.index_searcher.size", INDEX_SEARCHER_THREADS)
.put("thread_pool.index_searcher.queue_size", INDEX_SEARCHER_THREADS)
.build();
}

private int getSegmentCount(String indexName) {
return client().admin()
.indices()
.segments(new IndicesSegmentsRequest(indexName))
.actionGet()
.getIndices()
.get(indexName)
.getShards()
.get(0)
.getShards()[0].getSegments()
.size();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true);
return featureSettings.build();
}

/**
* Tests the number of threads that worked on a search task.
*
* Currently, we try to control concurrency by creating an index with 7 segments and rely on
* the way concurrent search creates leaf slices from segments. Once more concurrency controls are introduced
* we should improve this test to use those methods.
*/
public void testConcurrentSearchTaskTracking() {
final String INDEX_NAME = "test";
final int NUM_SHARDS = 1;
final int NUM_DOCS = 7;

registerTaskManagerListeners(SearchAction.NAME); // coordinator task
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(INDEX_NAME); // Make sure all shards are allocated to catch replication tasks
indexDocumentsWithRefresh(INDEX_NAME, NUM_DOCS); // Concurrent search requires >5 segments or >250,000 docs to have concurrency, so
// we index 7 docs flushing between each to create new segments
assertSearchResponse(client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get());

// the search operation should produce one coordinator task
List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1);
assertEquals(1, mainTask.size());
TaskInfo mainTaskInfo = mainTask.get(0);

List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1);
assertEquals(NUM_SHARDS, shardTasks.size()); // We should only have 1 shard search task per shard
for (TaskInfo taskInfo : shardTasks) {
MatcherAssert.assertThat(taskInfo.getParentTaskId(), notNullValue());
assertEquals(mainTaskInfo.getTaskId(), taskInfo.getParentTaskId());

Map<Long, List<ThreadResourceInfo>> threadStats = getThreadStats(SearchAction.NAME + "[*]", taskInfo.getTaskId());
// Concurrent search forks each slice of 5 segments to different thread
assertEquals((int) Math.ceil(getSegmentCount(INDEX_NAME) / 5.0), threadStats.size());

// assert that all task descriptions have non-zero length
MatcherAssert.assertThat(taskInfo.getDescription().length(), greaterThan(0));
}
}
}
Loading

0 comments on commit 0513859

Please sign in to comment.