Skip to content

Commit

Permalink
Integrated misc changes from other branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Dec 8, 2023
1 parent 64c339a commit 47453e1
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 102 deletions.
4 changes: 4 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ ${path.logs}
# Settings related to tiered caching:
# index.requests.cache.tiered.disk.keystore.size: 0.05%
# index.requests.cache.tiered.disk.stale_cleanup_threshold: 0.5
# indices.requests.cache.tiered.disk.ehcache.min_threads: 2
# indices.requests.cache.tiered.disk.ehcache.max_threads: 2
# indices.requests.cache.tiered.disk.ehcache.write_concurrency: 2
# indices.requests.cache.tiered.disk.ehcache.segments: 16
#
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings())
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true")
.put(FeatureFlags.TIERED_CACHING, "true")
.build();
}

public void testDiskTierStats() throws Exception {
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Expand Down Expand Up @@ -119,30 +121,6 @@ public void testDiskTierStats() throws Exception {
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 3, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests + 1, TierType.DISK, false);
assertDiskTierSpecificStats(client, "index", 2, tookTimeSoFar, tookTimeSoFar);

}

public void testRBMSizeSetting() throws Exception {
int heapSizeBytes = 0;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of
// took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;
}

private long getCacheSizeBytes(Client client, String index, TierType tierType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public static Collection<Object[]> parameters() {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings())
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true")
.put(FeatureFlags.TIERED_CACHING, "true")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.io.File;
Expand Down Expand Up @@ -46,7 +46,6 @@
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.serialization.SerializerException;


/**
* An ehcache-based disk tier implementation.
* @param <K> The key type of cache entries
Expand All @@ -63,14 +62,18 @@ public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {
// Ehcache disk write minimum threads for its pool
public static final Setting<Integer> REQUEST_CACHE_DISK_MIN_THREADS = Setting.intSetting(
"indices.requests.cache.tiered.disk.ehcache.min_threads",
2, 1, 5,
2,
1,
5,
Property.NodeScope
);

// Ehcache disk write maximum threads for its pool
public static final Setting<Integer> REQUEST_CACHE_DISK_MAX_THREADS = Setting.intSetting(
"indices.requests.cache.tiered.disk.ehcache.max_threads",
2, 1, 20,
2,
1,
20,
Property.NodeScope
);

Expand All @@ -80,15 +83,19 @@ public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {
// queue ie write concurrency is 1. Check OffHeapDiskStoreConfiguration and DiskWriteThreadPool.
public static final Setting<Integer> REQUEST_CACHE_DISK_WRITE_CONCURRENCY = Setting.intSetting(
"indices.requests.cache.tiered.disk.ehcache.write_concurrency",
2, 1, 3,
2,
1,
3,
Property.NodeScope
);

// Defines how many segments the disk cache is separated into. Higher number achieves greater concurrency but
// will hold that many file pointers.
public static final Setting<Integer> REQUEST_CACHE_DISK_SEGMENTS = Setting.intSetting(
"indices.requests.cache.tiered.disk.ehcache.segments",
16, 1, 32,
16,
1,
32,
Property.NodeScope
);

Expand Down Expand Up @@ -144,7 +151,6 @@ private EhCacheDiskCachingTier(Builder<K, V> builder) {
this.clusterSettings = Objects.requireNonNull(builder.clusterSettings, "ClusterSettings object shouldn't be null");
Objects.requireNonNull(builder.settingPrefix, "Setting prefix shouldn't be null");


// In test cases, there might be leftover cache managers and caches hanging around, from nodes created in the test case setup
// Destroy them before recreating them
close();
Expand All @@ -163,7 +169,11 @@ private PersistentCacheManager buildCacheManager() {
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default", 1, 3) // Default pool used for other tasks like
// event listeners
.pool(this.threadPoolAlias, clusterSettings.get(REQUEST_CACHE_DISK_MIN_THREADS), clusterSettings.get(REQUEST_CACHE_DISK_MAX_THREADS))
.pool(
this.threadPoolAlias,
clusterSettings.get(REQUEST_CACHE_DISK_MIN_THREADS),
clusterSettings.get(REQUEST_CACHE_DISK_MAX_THREADS)
)
.build()
)
.build(true);
Expand Down Expand Up @@ -303,7 +313,7 @@ public void close() {

private void setStaleKeyThreshold(double newThreshold) {
return;
//TODO: Fill in once Kiran's code is merged
// TODO: Fill in once Kiran's code is merged
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
package org.opensearch.common.cache.tier.keystore;

import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.roaringbitmap.RoaringBitmap;

/**
Expand All @@ -54,7 +54,7 @@
*/
public class RBMIntKeyLookupStore implements KeyLookupStore<Integer> {
public static final Setting<ByteSizeValue> INDICES_CACHE_KEYSTORE_SIZE = Setting.memorySizeSetting(
"index.requests.cache.tiered.disk.keystore.size",
"indices.requests.cache.tiered.disk.keystore.size",
"0.05%", // 5% of INDICES_CACHE_QUERY_SIZE
Setting.Property.Dynamic,
Setting.Property.NodeScope
Expand Down Expand Up @@ -92,7 +92,7 @@ public int getValue() {
protected final Lock readLock = lock.readLock();
protected final Lock writeLock = lock.writeLock();
private long mostRecentByteEstimate;
protected static final int REFRESH_SIZE_EST_INTERVAL = 10000;
public static final int REFRESH_SIZE_EST_INTERVAL = 10000;
// Refresh size estimate every X new elements. Refreshes use the RBM's internal size estimator, which takes ~0.01 ms,
// so we don't want to do it on every get(), and it doesn't matter much if there are +- 10000 keys in this store
// in terms of storage impact
Expand Down Expand Up @@ -120,7 +120,7 @@ public RBMIntKeyLookupStore(KeystoreModuloValue moduloValue, long memSizeCapInBy
this.collidedIntCounters = new HashMap<>();
this.removalSets = new HashMap<>();
this.mostRecentByteEstimate = 0L;

clusterSettings.addSettingsUpdateConsumer(INDICES_CACHE_KEYSTORE_SIZE, this::setMemSizeCap);
}

private int transform(int value) {
Expand All @@ -147,7 +147,7 @@ public boolean add(Integer value) {
stats.numAddAttempts.inc();

if (getSize() % REFRESH_SIZE_EST_INTERVAL == 0) {
mostRecentByteEstimate = getMemorySizeInBytes();
mostRecentByteEstimate = computeMemorySizeInBytes();
}
if (getMemorySizeCapInBytes() > 0 && mostRecentByteEstimate > getMemorySizeCapInBytes()) {
stats.atCapacity.set(true);
Expand Down Expand Up @@ -321,6 +321,10 @@ static double getRBMSizeMultiplier(int numEntries, int modulo) {

@Override
public long getMemorySizeInBytes() {
return mostRecentByteEstimate;
}

private long computeMemorySizeInBytes() {
double multiplier = getRBMSizeMultiplier((int) stats.size.count(), modulo);
return (long) (rbm.getSizeInBytes() * multiplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.common.cache.tier.TierType;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -193,18 +194,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
// write on-heap stats outside of tiers object
getTierStats(TierType.ON_HEAP).toXContent(builder, params);
getTierSpecificStats(TierType.ON_HEAP).toXContent(builder, params);
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
defaultStatsMap.get(tier).toXContent(builder, params);
tierSpecificStatsMap.get(tier).toXContent(builder, params);
builder.endObject();
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_CACHING)) {
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
defaultStatsMap.get(tier).toXContent(builder, params);
tierSpecificStatsMap.get(tier).toXContent(builder, params);
builder.endObject();
}
}
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.common.cache.tier.BytesReferenceSerializer;
import org.opensearch.common.cache.tier.CachePolicyInfoWrapper;
import org.opensearch.common.cache.tier.CacheValue;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.EhCacheDiskCachingTier;
import org.opensearch.common.cache.tier.OnHeapCachingTier;
import org.opensearch.common.cache.tier.OpenSearchOnHeapCache;
Expand Down Expand Up @@ -143,18 +142,19 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
};

// Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on.
TieredCacheSpilloverStrategyService.Builder<Key, BytesReference> tieredCacheServiceBuilder = new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>()
.setOnHeapCachingTier(openSearchOnHeapCache)
.setTieredCacheEventListener(this);
TieredCacheSpilloverStrategyService.Builder<Key, BytesReference> tieredCacheServiceBuilder =
new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>().setOnHeapCachingTier(openSearchOnHeapCache)
.setTieredCacheEventListener(this);

if (FeatureFlags.isEnabled(FeatureFlags.TIERED_CACHING)) {
// enabling this for testing purposes. Remove/tweak!!
long CACHE_SIZE_IN_BYTES = 1000000L;
String SETTING_PREFIX = "indices.request.cache";
String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache";

EhCacheDiskCachingTier<Key, BytesReference> diskTier = new EhCacheDiskCachingTier.Builder<Key, BytesReference>()
.setKeyType(Key.class)
EhCacheDiskCachingTier<Key, BytesReference> diskTier = new EhCacheDiskCachingTier.Builder<Key, BytesReference>().setKeyType(
Key.class
)
.setValueType(BytesReference.class)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setSettings(settings)
Expand Down
Loading

0 comments on commit 47453e1

Please sign in to comment.