diff --git a/server/src/main/java/org/opensearch/common/cache/tier/CacheTierPolicy.java b/server/src/main/java/org/opensearch/common/cache/tier/CacheTierPolicy.java new file mode 100644 index 0000000000000..f242eeb6c53e0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/CacheTierPolicy.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; + +public interface CacheTierPolicy { + /** + * Determines whether this policy allows the data into its cache tier, based on the contents of the BytesReference + * which can be deserialized into class T. + * @param data A BytesReference which can be deserialized into class T + * @return A CheckDataResult object containing whether the data is admitted, and if it isn't, the reason. + * @throws IOException if the input can't be deserialized to the right class. + */ + boolean checkData(BytesReference data) throws IOException; + + /** + * Convert the BytesReference into the type T that is used to check entry into the cache. + * @param data The BytesReference + * @return The BytesReference converted to type T + * @throws IOException if the input can't be deserialized to the right class. + */ + T convertFromBytesReference(BytesReference data) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/DiskTierTookTimePolicy.java b/server/src/main/java/org/opensearch/common/cache/tier/DiskTierTookTimePolicy.java new file mode 100644 index 0000000000000..de0730e4c4098 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/DiskTierTookTimePolicy.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.search.query.QuerySearchResult; + +import java.io.IOException; + +/** + * A cache tier policy which accepts queries whose took time is greater than some threshold, + * which is specified as a dynamic cluster-level setting. The threshold should be set to approximately + * the time it takes to get a result from the cache tier. + */ +public class DiskTierTookTimePolicy implements CacheTierPolicy { + public static final Setting INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting( + "index.requests.cache.disk.tooktime.threshold", + new TimeValue(10), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private TimeValue threshold; + + public DiskTierTookTimePolicy(Settings settings, ClusterSettings clusterSettings) { + this.threshold = INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold); + } + + protected void setThreshold(TimeValue threshold) { // public so that we can manually set value in unit test + this.threshold = threshold; + } + + @Override + public QuerySearchResult convertFromBytesReference(BytesReference data) throws IOException { + try { + return new QuerySearchResult(data.streamInput()); + } catch (IllegalStateException ise) { + throw new IOException(ise); + } + } + + @Override + public boolean checkData(BytesReference data) throws IOException { + QuerySearchResult qsr = convertFromBytesReference(data); + Long tookTimeNanos = qsr.getTookTimeNanos(); + if (tookTimeNanos == null) { + return true; + // Received a null took time -> this QSR is from an old version which does not have took time, we should accept it + } + TimeValue tookTime = TimeValue.timeValueNanos(qsr.getTookTimeNanos()); + if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold + return false; + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 74224d66400da..9690c25cecf6e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -112,6 +112,7 @@ import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.indices.IndicesRequestCacheTookTimePolicy; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.analysis.HunspellService; @@ -671,7 +672,9 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, - IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING + IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, + + IndicesRequestCacheTookTimePolicy.INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index f3cf2c13ecdef..14b239d3270d4 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -131,6 +131,7 @@ public void preProcess(SearchContext context) { } public void execute(SearchContext searchContext) throws QueryPhaseExecutionException { + long startTime = System.nanoTime(); if (searchContext.hasOnlySuggest()) { suggestProcessor.process(searchContext); searchContext.queryResult() @@ -138,6 +139,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), new DocValueFormat[0] ); + searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime); return; } @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep ); searchContext.queryResult().profileResults(shardResults); } + searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime); } // making public for testing @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } - return shouldRescore; } finally { // Search phase has finished, no longer need to check for timeout diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index 7de605a244d09..5a8cf06bd8b0a 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -87,6 +87,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private int nodeQueueSize = -1; private final boolean isNull; + private long tookTimeNanos; public QuerySearchResult() { this(false); @@ -364,6 +365,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc nodeQueueSize = in.readInt(); setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); setRescoreDocIds(new RescoreDocIds(in)); + tookTimeNanos = in.readVLong(); } @Override @@ -406,6 +408,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeInt(nodeQueueSize); out.writeOptionalWriteable(getShardSearchRequest()); getRescoreDocIds().writeTo(out); + out.writeVLong(tookTimeNanos); // VLong as took time should always be positive } public TotalHits getTotalHits() { @@ -415,4 +418,12 @@ public TotalHits getTotalHits() { public float getMaxScore() { return maxScore; } + + public long getTookTimeNanos() { + return tookTimeNanos; + } + + public void setTookTimeNanos(long tookTime) { + tookTimeNanos = tookTime; + } } diff --git a/server/src/test/java/org/opensearch/common/cache/tier/IndicesRequestCacheDiskTierPolicyTests.java b/server/src/test/java/org/opensearch/common/cache/tier/IndicesRequestCacheDiskTierPolicyTests.java new file mode 100644 index 0000000000000..0c1df13692079 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/IndicesRequestCacheDiskTierPolicyTests.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.action.OriginalIndices; +import org.opensearch.action.OriginalIndicesTests; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class IndicesRequestCacheDiskTierPolicyTests extends OpenSearchTestCase { + private DiskTierTookTimePolicy getTookTimePolicy() { + // dummy settings + Settings dummySettings = Settings.EMPTY; + ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings); + } + + public void testQSRSetupFunction() throws IOException { + long ttn = 100000000000L; + BytesReference qsrBytes = getQSRBytesReference(ttn); + QuerySearchResult qsr = new QuerySearchResult(qsrBytes.streamInput()); + assertEquals(ttn, qsr.getTookTimeNanos()); + } + + public void testBadBytesReference() throws Exception { + BytesReference badQSR = new BytesArray("I love bytes!!!"); + DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + assertThrows(IOException.class, () -> tookTimePolicy.checkData(badQSR)); + } + + public void testTookTimePolicy() throws Exception { + DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + + // manually set threshold for test + double threshMillis = 10; + long shortMillis = (long) (0.9 * threshMillis); + long longMillis = (long) (1.5 * threshMillis); + tookTimePolicy.setThreshold(new TimeValue((long) threshMillis)); + BytesReference shortQSR = getQSRBytesReference(shortMillis * 1000000); + BytesReference longQSR = getQSRBytesReference(longMillis * 1000000); + + boolean shortResult = tookTimePolicy.checkData(shortQSR); + assertFalse(shortResult); + boolean longResult = tookTimePolicy.checkData(longQSR); + assertTrue(longResult); + } + + private BytesReference getQSRBytesReference(long tookTimeNanos) throws IOException { + // setup from QuerySearchResultTests.java + ShardId shardId = new ShardId("index", "uuid", randomInt()); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); + ShardSearchRequest shardSearchRequest = new ShardSearchRequest( + OriginalIndicesTests.randomOriginalIndices(), + searchRequest, + shardId, + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + randomNonNegativeLong(), + null, + new String[0] + ); + ShardSearchContextId id = new ShardSearchContextId(UUIDs.base64UUID(), randomLong()); + QuerySearchResult result = new QuerySearchResult( + id, + new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), + shardSearchRequest + ); + TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); + result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); + + result.setTookTimeNanos(tookTimeNanos); + BytesStreamOutput out = new BytesStreamOutput(); + // it appears to need a boolean and then a ShardSearchContextId written to the stream before the QSR in order to deserialize? + out.writeBoolean(false); + id.writeTo(out); + + result.writeToNoId(out); + return out.bytes(); + } +} diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 7c84078af080e..f4657e303fbd8 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -121,6 +121,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -823,6 +824,118 @@ public Scroll scroll() { } } + public void testQuerySearchResultTookTime() throws Exception { + // I wasn't able to introduce a delay in these tests as everything between creation and usage of the QuerySearchResult object + // happen in a single line - we would have to modify QueryPhase.execute() to take a delay parameter + // However this was tested manually + createIndex("index"); + final SearchService service = getInstanceFromNode(SearchService.class); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + final IndexShard indexShard = indexService.getShard(0); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); + + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 2, // must have >1 shards for executeQueryPhase to return the QuerySearchResult + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + -1, + null, + null + ); + + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(request, randomBoolean(), task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + assertEquals(QuerySearchResult.class, searchPhaseResult.getClass()); // 2+ shards -> QuerySearchResult returned + QuerySearchResult qsr = (QuerySearchResult) searchPhaseResult; + assertTrue(qsr.getTookTimeNanos() > 0); // Above zero means it's been set at some point + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testQuerySearchResultTookTimeCacheableRequest() throws Exception { + // Test 2 identical cacheable requests and assert both have the same tookTime + // Similarly, no delay could be added + createIndex("index"); + final SearchService service = getInstanceFromNode(SearchService.class); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + final IndexShard indexShard = indexService.getShard(0); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchRequest.source(searchSourceBuilder); + searchSourceBuilder.scriptField( + "field" + 0, + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()) + ); + searchSourceBuilder.size(0); // from testIgnoreScriptfieldIfSizeZero + + String[] dummyRoutings = new String[] {}; + OriginalIndices dummyOriginalIndices = new OriginalIndices(new String[] { "index'" }, IndicesOptions.LENIENT_EXPAND_OPEN); + + ShardSearchRequest request = new ShardSearchRequest( + dummyOriginalIndices, + searchRequest, + indexShard.shardId(), + 2, // must have >1 shards for executeQueryPhase to return the QuerySearchResult + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + 0L, + // if nowInMillis is negative, it fails when trying to write the shardSearchRequest to cache as it uses WriteVLong which only + // takes positive longs + null, + dummyRoutings // similar for routings + ); + + final CompletableFuture firstResult = new CompletableFuture<>(); + final CompletableFuture secondResult = new CompletableFuture<>(); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(request, randomBoolean(), task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + assertEquals(QuerySearchResult.class, searchPhaseResult.getClass()); // 2+ shards -> QuerySearchResult returned + QuerySearchResult qsr = (QuerySearchResult) searchPhaseResult; + firstResult.complete(qsr.getTookTimeNanos()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + + service.executeQueryPhase(request, randomBoolean(), task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + assertEquals(QuerySearchResult.class, searchPhaseResult.getClass()); // 2+ shards -> QuerySearchResult returned + QuerySearchResult qsr = (QuerySearchResult) searchPhaseResult; + secondResult.complete(qsr.getTookTimeNanos()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + + long firstResultVal = firstResult.get(); + long secondResultVal = secondResult.get(); + assertEquals(firstResultVal, secondResultVal); + assertTrue(firstResultVal > 0); + } + public void testCanMatch() throws Exception { createIndex("index"); final SearchService service = getInstanceFromNode(SearchService.class); @@ -1010,6 +1123,7 @@ public void onFailure(Exception e) { } } }); + latch.await(); } diff --git a/server/src/test/java/org/opensearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/opensearch/search/query/QuerySearchResultTests.java index 41e4e1ae45a73..0d83777dbd4cf 100644 --- a/server/src/test/java/org/opensearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/opensearch/search/query/QuerySearchResultTests.java @@ -99,6 +99,7 @@ private static QuerySearchResult createTestInstance() throws Exception { if (randomBoolean()) { result.aggregations(InternalAggregationsTests.createTestInstance()); } + assertEquals(0, result.getTookTimeNanos()); return result; } @@ -118,6 +119,7 @@ public void testSerialization() throws Exception { assertEquals(aggs.asList(), deserializedAggs.asList()); } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); + assertEquals(querySearchResult.getTookTimeNanos(), deserialized.getTookTimeNanos()); } public void testNullResponse() throws Exception {