Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching policy #4

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesRequestCacheTookTimePolicy;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.analysis.HunspellService;
Expand Down Expand Up @@ -671,7 +672,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,

IndicesRequestCacheTookTimePolicy.INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING
)
)
);
Expand Down
48 changes: 48 additions & 0 deletions server/src/main/java/org/opensearch/indices/CacheTierPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these under suggested common/cache/tier package


import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;

public interface CacheTierPolicy<T> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this generic T being used?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to make it explicit what type the BytesReference should be turned into. I could separate out the lines which transform BytesReference -> T as their own function in the interface, as all implementations should need that?

/**
* Determines whether this policy allows the data into its cache tier, based on the contents of the BytesReference
* which can be deserialized into class T.
* @param data A BytesReference which can be deserialized into class T
* @return A CheckDataResult object containing whether the data is admitted, and if it isn't, the reason.
* @throws IOException if the input can't be deserialized to the right class.
*/
CheckDataResult checkData(BytesReference data) throws IOException;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually now I think, this CheckDataResult is not really needed. Just returning a boolean should be fine as it avoids unnecessary allocation of new object which can add overhead to GC.

We can just do like cachingPolicy1 && cachingPolicy2....

}
64 changes: 64 additions & 0 deletions server/src/main/java/org/opensearch/indices/CheckDataResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

/**
* A class used by a CacheTierPolicy to return a result for some data input.
* The results can be chained together in a short-circuiting way.
*/
public class CheckDataResult {
private boolean isAccepted;
private String deniedReason; // null if the data was accepted, has an explanation if data was rejected
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be Optional ?


public CheckDataResult(boolean isAccepted, String deniedReason) {
this.isAccepted = isAccepted;
this.deniedReason = deniedReason;
}

public CheckDataResult composeWith(CheckDataResult other) {
if (!this.isAccepted) {
return this;
} else {
return other;
}
}

public boolean isAccepted() {
return isAccepted;
}

public String getDeniedReason() {
return deniedReason;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;

/**
* This policy takes in an array containing an instance of all policies we want to apply to the IRC disk tier.
* It short circuits these policies' checks together, in the provided order, to get one overall check.
*/
public class IndicesRequestCacheDiskTierPolicy implements CacheTierPolicy<QuerySearchResult> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not really need this class. As this logic can be fit inside DiskCachingTier itself, where we can maintain a list of policies object, and do these all checks which seems minimal.

private final CacheTierPolicy<QuerySearchResult>[] policies;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use List<>

private final int numPolicies;
private final boolean allowedByDefault;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? If there are no policies mentioned ie list is empty, it should be allowedByDefault?

public static String DEFAULT_DENIED_REASON =
"No policies were supplied to IndicesRequestCacheDiskTierPolicy and allowedByDefault = false";
// available here for testing purposes

public IndicesRequestCacheDiskTierPolicy(CacheTierPolicy<QuerySearchResult>[] policies, boolean allowedByDefault) {
this.policies = policies;
this.numPolicies = policies.length;
this.allowedByDefault = allowedByDefault; // default behavior if no other policies are supplied
}

@Override
public CheckDataResult checkData(BytesReference data) throws IOException {
if (numPolicies == 0) {
// still need to check the data can be deserialized into QuerySearchResult
QuerySearchResult qsr;
try {
qsr = new QuerySearchResult(data.streamInput());
} catch (IllegalStateException ise) {
throw new IOException(ise);
}

if (allowedByDefault) {
return new CheckDataResult(true, null);
} else {
return new CheckDataResult(false, DEFAULT_DENIED_REASON);
}
}
CheckDataResult result = policies[0].checkData(data);
if (numPolicies > 1) {
for (int i = 1; i < numPolicies; i++) {
result = result.composeWith(policies[i].checkData(data));
if (!result.isAccepted()) {
break;
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold,
* which is specified as a dynamic cluster-level setting. The threshold should be set to approximately
* the time it takes to get a result from the cache tier.
*/
public class IndicesRequestCacheTookTimePolicy implements CacheTierPolicy<QuerySearchResult> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this to DiskTierTookTimePolicy

public static final Setting<TimeValue> INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"index.requests.cache.disk.tooktime.threshold",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we might rename this class to generic DiskTierTookTimePolicy, you will have to take setting prefix "indices.request.cache" as a parameter in constructor. It is done in a similar way in framework PR.

new TimeValue(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue threshold;

public IndicesRequestCacheTookTimePolicy(Settings settings, ClusterSettings clusterSettings) {
this.threshold = INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold);
}

public void setThreshold(TimeValue threshold) { // public so that we can manually set value in unit test
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Unit test, making it protected should suffice?

this.threshold = threshold;
}

protected String buildDeniedString(TimeValue tookTime, TimeValue threshold) {
// separating out for use in testing
return "Query took time " + tookTime.getMillis() + " ms is less than threshold value " + threshold.getMillis() + " ms";
}

@Override
public CheckDataResult checkData(BytesReference data) throws IOException {
QuerySearchResult qsr;
try {
qsr = new QuerySearchResult(data.streamInput());
} catch (IllegalStateException ise) {
throw new IOException(ise);
}
TimeValue tookTime = TimeValue.timeValueNanos(qsr.getTookTimeNanos());
boolean isAccepted = true;
String deniedReason = null;
if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold
isAccepted = false;
deniedReason = buildDeniedString(tookTime, threshold);
}
return new CheckDataResult(isAccepted, deniedReason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
long startTime = System.nanoTime();
if (searchContext.hasOnlySuggest()) {
suggestProcessor.process(searchContext);
searchContext.queryResult()
.topDocs(
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]
);
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
return;
}

Expand Down Expand Up @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
);
searchContext.queryResult().profileResults(shardResults);
}
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
}

// making public for testing
Expand Down Expand Up @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
private int nodeQueueSize = -1;

private final boolean isNull;
private long tookTimeNanos;

public QuerySearchResult() {
this(false);
Expand Down Expand Up @@ -364,6 +365,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc
nodeQueueSize = in.readInt();
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
setRescoreDocIds(new RescoreDocIds(in));
tookTimeNanos = in.readVLong();
}

@Override
Expand Down Expand Up @@ -406,6 +408,7 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeInt(nodeQueueSize);
out.writeOptionalWriteable(getShardSearchRequest());
getRescoreDocIds().writeTo(out);
out.writeVLong(tookTimeNanos); // VLong as took time should always be positive
}

public TotalHits getTotalHits() {
Expand All @@ -415,4 +418,12 @@ public TotalHits getTotalHits() {
public float getMaxScore() {
return maxScore;
}

public long getTookTimeNanos() {
return tookTimeNanos;
}

public void setTookTimeNanos(long tookTime) {
tookTimeNanos = tookTime;
}
}
Loading