Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching policy #4

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;

public interface CacheTierPolicy<T> {
/**
* Determines whether this policy allows the data into its cache tier, based on the contents of the BytesReference
* which can be deserialized into class T.
* @param data A BytesReference which can be deserialized into class T
* @return A CheckDataResult object containing whether the data is admitted, and if it isn't, the reason.
* @throws IOException if the input can't be deserialized to the right class.
*/
boolean checkData(BytesReference data) throws IOException;

/**
* Convert the BytesReference into the type T that is used to check entry into the cache.
* @param data The BytesReference
* @return The BytesReference converted to type T
* @throws IOException if the input can't be deserialized to the right class.
*/
T convertFromBytesReference(BytesReference data) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold,
* which is specified as a dynamic cluster-level setting. The threshold should be set to approximately
* the time it takes to get a result from the cache tier.
*/
public class DiskTierTookTimePolicy implements CacheTierPolicy<QuerySearchResult> {
public static final Setting<TimeValue> INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"index.requests.cache.disk.tooktime.threshold",
new TimeValue(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue threshold;

public DiskTierTookTimePolicy(Settings settings, ClusterSettings clusterSettings) {
this.threshold = INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold);
}

protected void setThreshold(TimeValue threshold) { // public so that we can manually set value in unit test
this.threshold = threshold;
}

@Override
public QuerySearchResult convertFromBytesReference(BytesReference data) throws IOException {
try {
return new QuerySearchResult(data.streamInput());
} catch (IllegalStateException ise) {
throw new IOException(ise);
}
}

@Override
public boolean checkData(BytesReference data) throws IOException {
QuerySearchResult qsr = convertFromBytesReference(data);
Long tookTimeNanos = qsr.getTookTimeNanos();
if (tookTimeNanos == null) {
return true;
// Received a null took time -> this QSR is from an old version which does not have took time, we should accept it
}
TimeValue tookTime = TimeValue.timeValueNanos(qsr.getTookTimeNanos());
if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesRequestCacheTookTimePolicy;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.analysis.HunspellService;
Expand Down Expand Up @@ -671,7 +672,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,

IndicesRequestCacheTookTimePolicy.INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
long startTime = System.nanoTime();
if (searchContext.hasOnlySuggest()) {
suggestProcessor.process(searchContext);
searchContext.queryResult()
.topDocs(
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]
);
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
return;
}

Expand Down Expand Up @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
);
searchContext.queryResult().profileResults(shardResults);
}
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
}

// making public for testing
Expand Down Expand Up @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
private int nodeQueueSize = -1;

private final boolean isNull;
private long tookTimeNanos;

public QuerySearchResult() {
this(false);
Expand Down Expand Up @@ -364,6 +365,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc
nodeQueueSize = in.readInt();
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
setRescoreDocIds(new RescoreDocIds(in));
tookTimeNanos = in.readVLong();
}

@Override
Expand Down Expand Up @@ -406,6 +408,7 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeInt(nodeQueueSize);
out.writeOptionalWriteable(getShardSearchRequest());
getRescoreDocIds().writeTo(out);
out.writeVLong(tookTimeNanos); // VLong as took time should always be positive
}

public TotalHits getTotalHits() {
Expand All @@ -415,4 +418,12 @@ public TotalHits getTotalHits() {
public float getMaxScore() {
return maxScore;
}

public long getTookTimeNanos() {
return tookTimeNanos;
}

public void setTookTimeNanos(long tookTime) {
tookTimeNanos = tookTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.OriginalIndicesTests;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

public class IndicesRequestCacheDiskTierPolicyTests extends OpenSearchTestCase {
private DiskTierTookTimePolicy getTookTimePolicy() {
// dummy settings
Settings dummySettings = Settings.EMPTY;
ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings);
}

public void testQSRSetupFunction() throws IOException {
long ttn = 100000000000L;
BytesReference qsrBytes = getQSRBytesReference(ttn);
QuerySearchResult qsr = new QuerySearchResult(qsrBytes.streamInput());
assertEquals(ttn, qsr.getTookTimeNanos());
}

public void testBadBytesReference() throws Exception {
BytesReference badQSR = new BytesArray("I love bytes!!!");
DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy();
assertThrows(IOException.class, () -> tookTimePolicy.checkData(badQSR));
}

public void testTookTimePolicy() throws Exception {
DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy();

// manually set threshold for test
double threshMillis = 10;
long shortMillis = (long) (0.9 * threshMillis);
long longMillis = (long) (1.5 * threshMillis);
tookTimePolicy.setThreshold(new TimeValue((long) threshMillis));
BytesReference shortQSR = getQSRBytesReference(shortMillis * 1000000);
BytesReference longQSR = getQSRBytesReference(longMillis * 1000000);

boolean shortResult = tookTimePolicy.checkData(shortQSR);
assertFalse(shortResult);
boolean longResult = tookTimePolicy.checkData(longQSR);
assertTrue(longResult);
}

private BytesReference getQSRBytesReference(long tookTimeNanos) throws IOException {
// setup from QuerySearchResultTests.java
ShardId shardId = new ShardId("index", "uuid", randomInt());
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean());
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
OriginalIndicesTests.randomOriginalIndices(),
searchRequest,
shardId,
1,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f,
randomNonNegativeLong(),
null,
new String[0]
);
ShardSearchContextId id = new ShardSearchContextId(UUIDs.base64UUID(), randomLong());
QuerySearchResult result = new QuerySearchResult(
id,
new SearchShardTarget("node", shardId, null, OriginalIndices.NONE),
shardSearchRequest
);
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);

result.setTookTimeNanos(tookTimeNanos);
BytesStreamOutput out = new BytesStreamOutput();
// it appears to need a boolean and then a ShardSearchContextId written to the stream before the QSR in order to deserialize?
out.writeBoolean(false);
id.writeTo(out);

result.writeToNoId(out);
return out.bytes();
}
}
Loading