Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tiered stats to request cache response #8

Draft
wants to merge 10 commits into
base: framework-serialized
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ideally add these tests as part of IndicesRequestCacheIT. Lets check what is needed to do that.

public void testDiskTierStats() throws Exception {
int heapSizeBytes = 1800; // enough to fit 2 queries, as each is 687 B
int requestSize = 687; // each request is 687 B
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we calculate this? I guess manually?
Possible to create a request and after which we can estimate the size? Doing this we can dynamically generate this value.

String node = internalCluster().startNode(
Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

int numOnDisk = 5;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 0; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
System.out.println("request number " + i);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

}

System.out.println("Num requests = " + numRequests);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove


// the first request, for "hello0", should have been evicted to the disk tier
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get();
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand Down Expand Up @@ -636,19 +637,13 @@ public void testProfileDisableCache() throws Exception {

public void testCacheWithInvalidation() throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);

Settings.Builder builder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(builder).get());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this change is not required? Remove?

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
Expand All @@ -662,18 +657,31 @@ public void testCacheWithInvalidation() throws Exception {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);
assertCacheState(client, "index", 1, 1, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 1, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
assertCacheState(client, "index", 1, 2, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 2, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was
// invalidated? (prob invalidation isnt in yet)
// yeah - evictions = 0, its not in yet
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
protected static void assertCacheState(
Client client,
String index,
long expectedHits,
long expectedMisses,
TierType tierType,
boolean enforceZeroEvictions
) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
Expand All @@ -683,11 +691,36 @@ private static void assertCacheState(Client client, String index, long expectedH
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);
if (enforceZeroEvictions) {
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(
requestCacheStats.getHitCount(tierType),
requestCacheStats.getMissCount(tierType),
requestCacheStats.getEvictions(tierType)
)
);
} else {
assertEquals(
Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType))
);
}
}

protected static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP, true);
}

protected static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertEquals(expectedEntries, requestCacheStats.getEntries(tierType));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.common.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.LongAdder;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@

package org.opensearch.index.cache.request;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.TierType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Request for the query cache statistics
Expand All @@ -48,69 +52,127 @@
*/
public class RequestCacheStats implements Writeable, ToXContentFragment {

private long memorySize;
private long evictions;
private long hitCount;
private long missCount;
private Map<String, StatsHolder> map;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe initialize this map inline here. This way you don't need to worry about this not being intialized.
Like

map = new HashMap<>(){{
       for(TierType tierType: TierType.values()) {
           put(tierType.getStringValue(), new StatsHolder());
       }
    }};


public RequestCacheStats() {}
public RequestCacheStats() {
this.map = new HashMap<>();
for (TierType tierType : TierType.values()) {
map.put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}
}

public RequestCacheStats(StreamInput in) throws IOException {
memorySize = in.readVLong();
evictions = in.readVLong();
hitCount = in.readVLong();
missCount = in.readVLong();
this();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling to initialize this map but looks error prone and wrong logically. As logically all variables inside this constructor should be initialized with StreamInput values. You can initialize the map inline as suggested above and we don't need this.

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.map = in.readMap(StreamInput::readString, StatsHolder::new);
} else {
// objects from earlier versions only contain on-heap info, and do not have entries info
long memorySize = in.readVLong();
long evictions = in.readVLong();
long hitCount = in.readVLong();
long missCount = in.readVLong();
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0));
}
}

public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount) {
this.memorySize = memorySize;
this.evictions = evictions;
this.hitCount = hitCount;
this.missCount = missCount;
public RequestCacheStats(Map<TierType, StatsHolder> inputMap) {
// Create a RequestCacheStats with multiple tiers' statistics
this();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again remove this.

for (TierType tierType : inputMap.keySet()) {
map.put(tierType.getStringValue(), inputMap.get(tierType));
}
}

public void add(RequestCacheStats stats) {
this.memorySize += stats.memorySize;
this.evictions += stats.evictions;
this.hitCount += stats.hitCount;
this.missCount += stats.missCount;
for (String tier : stats.map.keySet()) {
map.get(tier).add(stats.map.get(tier));
}
}

private StatsHolder getTierStats(TierType tierType) {
return map.get(tierType.getStringValue());
}

public long getMemorySizeInBytes(TierType tierType) {
return getTierStats(tierType).totalMetric.count();
}

public ByteSizeValue getMemorySize(TierType tierType) {
return new ByteSizeValue(getMemorySizeInBytes(tierType));
}

public long getEvictions(TierType tierType) {
return getTierStats(tierType).evictionsMetric.count();
}

public long getHitCount(TierType tierType) {
return getTierStats(tierType).hitCount.count();
}

public long getMissCount(TierType tierType) {
return getTierStats(tierType).missCount.count();
}

public long getEntries(TierType tierType) {
return getTierStats(tierType).entries.count();
}

// By default, return on-heap stats if no tier is specified

public long getMemorySizeInBytes() {
return this.memorySize;
return getMemorySizeInBytes(TierType.ON_HEAP);
}

public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySize);
return getMemorySize(TierType.ON_HEAP);
}

public long getEvictions() {
return this.evictions;
return getEvictions(TierType.ON_HEAP);
}

public long getHitCount() {
return this.hitCount;
return getHitCount(TierType.ON_HEAP);
}

public long getMissCount() {
return this.missCount;
return getMissCount(TierType.ON_HEAP);
}

public long getEntries() {
return getEntries(TierType.ON_HEAP);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);
out.writeVLong(evictions);
out.writeVLong(hitCount);
out.writeVLong(missCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ?
} else {
// Write only on-heap values, and don't write entries metric
StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue());
out.writeVLong(heapStats.getMemorySize());
out.writeVLong(heapStats.getEvictions());
out.writeVLong(heapStats.getHitCount());
out.writeVLong(heapStats.getMissCount());
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REQUEST_CACHE_STATS);
builder.humanReadableField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, getMemorySize());
builder.field(Fields.EVICTIONS, getEvictions());
builder.field(Fields.HIT_COUNT, getHitCount());
builder.field(Fields.MISS_COUNT, getMissCount());
// write on-heap stats outside of tiers object
getTierStats(TierType.ON_HEAP).toXContent(builder, params);
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
map.get(tier).toXContent(builder, params);
builder.endObject();
}
}
builder.endObject();
builder.endObject();
return builder;
}
Expand All @@ -122,10 +184,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
*/
static final class Fields {
static final String REQUEST_CACHE_STATS = "request_cache";
static final String TIERS = "tiers";
static final String MEMORY_SIZE = "memory_size";
static final String MEMORY_SIZE_IN_BYTES = "memory_size_in_bytes";
static final String EVICTIONS = "evictions";
static final String HIT_COUNT = "hit_count";
static final String MISS_COUNT = "miss_count";
static final String ENTRIES = "entries";
}
}
Loading