Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Change INDEX_SEARCHER threadpool to auto queue to support task resource tracking #7765

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203))
- Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
- Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628))
- Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.plugins.Plugin;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.tasks.MockTaskManager;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Base IT test class for Tasks ITs
*/
abstract class AbstractTasksIT extends OpenSearchIntegTestCase {

protected Map<Tuple<String, String>, RecordingTaskManagerListener> listeners = new HashMap<>();

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
Collection<Class<? extends Plugin>> mockPlugins = new ArrayList<>(super.getMockPlugins());
mockPlugins.remove(MockTransportService.TestPlugin.class);
return mockPlugins;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true)
.build();
}

@Override
public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(
entry.getValue()
);
}
listeners.clear();
super.tearDown();
}

/**
* Registers recording task event listeners with the given action mask on all nodes
*/
protected void registerTaskManagerListeners(String actionMasks) {
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(","));
((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener);
RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.getName(), actionMasks), listener);
assertNull(oldListener);
}
}

/**
* Resets all recording task event listeners with the given action mask on all nodes
*/
protected void resetTaskManagerListeners(String actionMasks) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
entry.getValue().reset();
}
}
}

/**
* Returns the number of events that satisfy the criteria across all nodes
*
* @param actionMasks action masks to match
* @return number of events that satisfy the criteria
*/
protected int numberOfEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
return findEvents(actionMasks, criteria).size();
}

/**
* Returns all events that satisfy the criteria across all nodes
*
* @param actionMasks action masks to match
* @return number of events that satisfy the criteria
*/
protected List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
List<TaskInfo> events = new ArrayList<>();
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
for (Tuple<Boolean, TaskInfo> taskEvent : entry.getValue().getEvents()) {
if (criteria.apply(taskEvent)) {
events.add(taskEvent.v2());
}
}
}
}
return events;
}

protected Map<Long, List<ThreadResourceInfo>> getThreadStats(String actionMasks, TaskId taskId) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
for (Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>> threadStats : entry.getValue().getThreadStats()) {
if (taskId.equals(threadStats.v1())) {
return threadStats.v2();
}
}
}
}
return new HashMap<>();
}

/**
* Asserts that all tasks in the tasks list have the same parentTask
*/
protected void assertParentTask(List<TaskInfo> tasks, TaskInfo parentTask) {
for (TaskInfo task : tasks) {
assertParentTask(task, parentTask);
}
}

protected void assertParentTask(TaskInfo task, TaskInfo parentTask) {
assertTrue(task.getParentTaskId().isSet());
assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId());
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId()));
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}

protected void expectNotFound(ThrowingRunnable r) {
Exception e = expectThrows(Exception.class, r);
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class);
if (notFound == null) {
throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e);
}
}

/**
* Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff
* about the fetched task and returns a map of it's status.
*/
protected GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException {
GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get();
assertTrue("the task should have been completed before fetching", response.getTask().isCompleted());
TaskInfo info = response.getTask().getTask();
assertEquals(taskId, info.getTaskId());
assertNull(info.getStatus()); // The test task doesn't have any status
return response;
}

protected void indexDocumentsWithRefresh(String indexName, int numDocs) {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName)
.setId("test_id_" + String.valueOf(i))
.setSource("{\"foo_" + String.valueOf(i) + "\": \"bar_" + String.valueOf(i) + "\"}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.search.SearchAction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

/**
* Integration tests for task management API with Concurrent Segment Search
*
* The way the test framework bootstraps the test cluster makes it difficult to parameterize the feature flag.
* Once concurrent search is moved behind a cluster setting we can parameterize these tests behind the setting.
*/
public class ConcurrentSearchTasksIT extends AbstractTasksIT {

private static final int INDEX_SEARCHER_THREADS = 10;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.index_searcher.size", INDEX_SEARCHER_THREADS)
.put("thread_pool.index_searcher.queue_size", INDEX_SEARCHER_THREADS)
.build();
}

private int getSegmentCount(String indexName) {
return client().admin()
.indices()
.segments(new IndicesSegmentsRequest(indexName))
.actionGet()
.getIndices()
.get(indexName)
.getShards()
.get(0)
.getShards()[0].getSegments()
.size();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true);
return featureSettings.build();
}

/**
* Tests the number of threads that worked on a search task.
*
* Currently, we try to control concurrency by creating an index with 7 segments and rely on
* the way concurrent search creates leaf slices from segments. Once more concurrency controls are introduced
* we should improve this test to use those methods.
*/
public void testConcurrentSearchTaskTracking() {
final String INDEX_NAME = "test";
final int NUM_SHARDS = 1;
final int NUM_DOCS = 7;

registerTaskManagerListeners(SearchAction.NAME); // coordinator task
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(INDEX_NAME); // Make sure all shards are allocated to catch replication tasks
indexDocumentsWithRefresh(INDEX_NAME, NUM_DOCS); // Concurrent search requires >5 segments or >250,000 docs to have concurrency, so
// we index 7 docs flushing between each to create new segments
assertSearchResponse(client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get());

// the search operation should produce one coordinator task
List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1);
assertEquals(1, mainTask.size());
TaskInfo mainTaskInfo = mainTask.get(0);

List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1);
assertEquals(NUM_SHARDS, shardTasks.size()); // We should only have 1 shard search task per shard
for (TaskInfo taskInfo : shardTasks) {
MatcherAssert.assertThat(taskInfo.getParentTaskId(), notNullValue());
assertEquals(mainTaskInfo.getTaskId(), taskInfo.getParentTaskId());

Map<Long, List<ThreadResourceInfo>> threadStats = getThreadStats(SearchAction.NAME + "[*]", taskInfo.getTaskId());
// Concurrent search forks each slice of 5 segments to different thread
assertEquals((int) Math.ceil(getSegmentCount(INDEX_NAME) / 5.0), threadStats.size());

// assert that all task descriptions have non-zero length
MatcherAssert.assertThat(taskInfo.getDescription().length(), greaterThan(0));
}
}
}
Loading