diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4325cd3c2cf..82b9b32baaeaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583) - Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221) +- Introduce new static cluster setting to control slice computation for concurrent segment search. ([#8847](https://github.com/opensearch-project/OpenSearch/pull/8847)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 46a43842451d9..360aa1efc20b0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -44,6 +44,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.search.SearchBootstrapSettings; import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -493,6 +494,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.MAX_OPEN_SCROLL_CONTEXT, SearchService.MAX_OPEN_PIT_CONTEXT, SearchService.MAX_PIT_KEEPALIVE_SETTING, + SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, CreatePitController.PIT_INIT_KEEP_ALIVE, Node.WRITE_PORTS_FILE_SETTING, Node.NODE_NAME_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d768165451a5a..c145843bd1901 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -56,6 +56,7 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.plugins.SearchPipelinePlugin; +import org.opensearch.search.SearchBootstrapSettings; import org.opensearch.telemetry.tracing.NoopTracerFactory; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.TracerFactory; @@ -466,6 +467,7 @@ protected Node( // Ensure to initialize Feature Flags via the settings from opensearch.yml FeatureFlags.initializeFeatureFlags(settings); + SearchBootstrapSettings.initialize(settings); final List identityPlugins = new ArrayList<>(); if (FeatureFlags.isEnabled(FeatureFlags.IDENTITY)) { diff --git a/server/src/main/java/org/opensearch/search/SearchBootstrapSettings.java b/server/src/main/java/org/opensearch/search/SearchBootstrapSettings.java new file mode 100644 index 0000000000000..711c4b9b15acd --- /dev/null +++ b/server/src/main/java/org/opensearch/search/SearchBootstrapSettings.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +/** + * Keeps track of all the search related node level settings which can be accessed via static methods + */ +public class SearchBootstrapSettings { + // settings to configure maximum slice created per search request using OS custom slice computation mechanism. Default lucene + // mechanism will not be used if this setting is set with value > 0 + public static final String CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY = "search.concurrent.max_slice"; + public static final int CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE = -1; + + // value <= 0 means lucene slice computation will be used + public static final Setting CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING = Setting.intSetting( + CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, + CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE, + Setting.Property.NodeScope + ); + private static Settings settings; + + public static void initialize(Settings openSearchSettings) { + settings = openSearchSettings; + } + + public static int getValueAsInt(String settingName, int defaultValue) { + return (settings != null) ? settings.getAsInt(settingName, defaultValue) : defaultValue; + } +} diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index e3ca932eb4699..9068e06d2ef95 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -32,6 +32,8 @@ package org.opensearch.search.internal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; @@ -66,6 +68,7 @@ import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.lease.Releasable; import org.opensearch.search.DocValueFormat; +import org.opensearch.search.SearchBootstrapSettings; import org.opensearch.search.SearchService; import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.profile.ContextualProfileBreakdown; @@ -93,11 +96,13 @@ * @opensearch.internal */ public class ContextIndexSearcher extends IndexSearcher implements Releasable { + + private static final Logger logger = LogManager.getLogger(ContextIndexSearcher.class); /** * The interval at which we check for search cancellation when we cannot use * a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}. */ - private static int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; + private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; private AggregatedDfs aggregatedDfs; private QueryProfiler profiler; @@ -439,6 +444,20 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio return collectionStatistics; } + /** + * Compute the leaf slices that will be used by concurrent segment search to spread work across threads + * @param leaves all the segments + * @return leafSlice group to be executed by different threads + */ + @Override + public LeafSlice[] slices(List leaves) { + final int target_max_slices = SearchBootstrapSettings.getValueAsInt( + SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, + SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE + ); + return slicesInternal(leaves, target_max_slices); + } + public DirectoryReader getDirectoryReader() { final IndexReader reader = getIndexReader(); assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass(); @@ -518,4 +537,19 @@ private boolean shouldReverseLeafReaderContexts() { } return false; } + + // package-private for testing + LeafSlice[] slicesInternal(List leaves, int target_max_slices) { + LeafSlice[] leafSlices; + if (target_max_slices <= 0) { + // use the default lucene slice calculation + leafSlices = super.slices(leaves); + logger.debug("Slice count using lucene default [{}]", leafSlices.length); + } else { + // use the custom slice calculation based on target_max_slices. It will sort + leafSlices = MaxTargetSliceSupplier.getSlices(leaves, target_max_slices); + logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length); + } + return leafSlices; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java new file mode 100644 index 0000000000000..0ee20c02deefc --- /dev/null +++ b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Supplier to compute leaf slices based on passed in leaves and max target slice count to limit the number of computed slices. It sorts + * all the leaves based on document count and then assign each leaf in round-robin fashion to the target slice count slices. Based on + * experiment results as shared in issue-7358 + * we can see this mechanism helps to achieve better tail/median latency over default lucene slice computation. + */ +public class MaxTargetSliceSupplier { + + public static IndexSearcher.LeafSlice[] getSlices(List leaves, int target_max_slice) { + if (target_max_slice <= 0) { + throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + target_max_slice); + } + + // slice count should not exceed the segment count + int target_slice_count = Math.min(target_max_slice, leaves.size()); + + // Make a copy so we can sort: + List sortedLeaves = new ArrayList<>(leaves); + + // Sort by maxDoc, descending: + sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc()))); + + final List> groupedLeaves = new ArrayList<>(); + for (int i = 0; i < target_slice_count; ++i) { + groupedLeaves.add(new ArrayList<>()); + } + // distribute the slices in round-robin fashion + List group; + for (int idx = 0; idx < sortedLeaves.size(); ++idx) { + int currentGroup = idx % target_slice_count; + group = groupedLeaves.get(currentGroup); + group.add(sortedLeaves.get(idx)); + } + + IndexSearcher.LeafSlice[] slices = new IndexSearcher.LeafSlice[target_slice_count]; + int upto = 0; + for (List currentLeaf : groupedLeaves) { + slices[upto] = new IndexSearcher.LeafSlice(currentLeaf); + ++upto; + } + return slices; + } +} diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index f3907355ac6ec..439afcbb2eef1 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -90,6 +90,7 @@ import java.io.UncheckedIOException; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.List; import java.util.Set; import static org.mockito.Mockito.mock; @@ -100,6 +101,7 @@ import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; public class ContextIndexSearcherTests extends OpenSearchTestCase { public void testIntersectScorerAndRoleBits() throws Exception { @@ -304,6 +306,56 @@ public void onRemoval(ShardId shardId, Accountable accountable) { IOUtils.close(reader, w, dir); } + public void testSlicesInternal() throws Exception { + final List leaves = getLeaves(10); + + final Directory directory = newDirectory(); + IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); + Document document = new Document(); + document.add(new StringField("field1", "value1", Field.Store.NO)); + document.add(new StringField("field2", "value1", Field.Store.NO)); + iw.addDocument(document); + iw.commit(); + DirectoryReader directoryReader = DirectoryReader.open(directory); + + SearchContext searchContext = mock(SearchContext.class); + IndexShard indexShard = mock(IndexShard.class); + when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + ContextIndexSearcher searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + null, + searchContext + ); + // Case 1: Verify the slice count when lucene default slice computation is used + IndexSearcher.LeafSlice[] slices = searcher.slicesInternal(leaves, -1); + int expectedSliceCount = 2; + // 2 slices will be created since max segment per slice of 5 will be reached + assertEquals(expectedSliceCount, slices.length); + for (int i=0; i getLeaves(int leafCount) throws Exception { + final Directory directory = newDirectory(); + IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); + for (int i=0; i leaves = directoryReader.leaves(); + directoryReader.close(); + directory.close(); + return leaves; + } +} diff --git a/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java new file mode 100644 index 0000000000000..8577da192db90 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; + +public class MaxTargetSliceSupplierTests extends OpenSearchTestCase { + + public void testSliceCountGreaterThanLeafCount() throws Exception { + int expectedSliceCount = 2; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(expectedSliceCount), 5); + // verify slice count is same as leaf count + assertEquals(expectedSliceCount, slices.length); + for (int i=0; i MaxTargetSliceSupplier.getSlices(new ArrayList<>(), randomIntBetween(-3, 0))); + } + + public void testSingleSliceWithMultipleLeaves() throws Exception { + int leafCount = randomIntBetween(1, 10); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(leafCount), 1); + assertEquals(1, slices.length); + assertEquals(leafCount, slices[0].leaves.length); + } + + public void testSliceCountLessThanLeafCount() throws Exception { + int leafCount = 12; + List leaves = getLeaves(leafCount); + + // Case 1: test with equal number of leaves per slice + int expectedSliceCount = 3; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(leaves, expectedSliceCount); + int expectedLeavesPerSlice = leafCount / expectedSliceCount; + + assertEquals(expectedSliceCount, slices.length); + for (int i=0; i(), 2); + assertEquals(0, slices.length); + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 3564bd667ee2b..e09b69d1e5b1a 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -146,6 +146,7 @@ import org.opensearch.rest.action.RestCancellableNodeClient; import org.opensearch.script.MockScriptService; import org.opensearch.search.MockSearchService; +import org.opensearch.search.SearchBootstrapSettings; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchService; import org.opensearch.test.client.RandomizingClient; @@ -1904,7 +1905,12 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file") - .put(featureFlagSettings()); + .put(featureFlagSettings()) + // By default, for tests we will put the target slice count of 2. This will increase the probability of having multiple slices + // when tests are run with concurrent segment search enabled. When concurrent segment search is disabled then it's a no-op as + // slices are not used + .put(SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, 2); + return builder.build(); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java index 1d7c04227b208..769afb1bdf59c 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java @@ -66,6 +66,7 @@ import org.opensearch.node.NodeValidationException; import org.opensearch.plugins.Plugin; import org.opensearch.script.MockScriptService; +import org.opensearch.search.SearchBootstrapSettings; import org.opensearch.search.internal.SearchContext; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.test.telemetry.MockTelemetryPlugin; @@ -211,7 +212,10 @@ protected final Collection> pluginList(Class