Skip to content

Commit

Permalink
misc stuff for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 12, 2023
1 parent aa2d28d commit 354c311
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 38 deletions.
Empty file removed server/disk_cache_tier/.lock
Empty file.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Key and value types
#Wed Oct 11 21:36:14 WGST 2023
#Thu Oct 12 16:18:26 EDT 2023
valueType=org.opensearch.core.common.bytes.BytesReference
keyType=org.opensearch.indices.IndicesRequestCache$Key
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,34 @@ public ShardRequestCache() {

public RequestCacheStats stats() {
// TODO: Change RequestCacheStats to support disk tier stats.
return stats(TierType.ON_HEAP);
}

public RequestCacheStats stats(TierType tierType) {
return new RequestCacheStats(
statsHolder.get(tierType).totalMetric.count(),
statsHolder.get(tierType).evictionsMetric.count(),
statsHolder.get(tierType).hitCount.count(),
statsHolder.get(tierType).missCount.count()
);
}

public RequestCacheStats overallStats() {
long totalSize = 0;
long totalEvictions = 0;
long totalHits = 0;
long totalMisses = 0;
for (TierType tierType : TierType.values()) {
totalSize += statsHolder.get(tierType).totalMetric.count();
totalEvictions += statsHolder.get(tierType).evictionsMetric.count();
totalHits += statsHolder.get(tierType).hitCount.count();
totalMisses += statsHolder.get(tierType).missCount.count();
}
return new RequestCacheStats(
statsHolder.get(TierType.ON_HEAP).totalMetric.count(),
statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(),
statsHolder.get(TierType.ON_HEAP).hitCount.count(),
statsHolder.get(TierType.ON_HEAP).missCount.count()
totalSize,
totalEvictions,
totalHits,
totalMisses
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices;

import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
Expand All @@ -16,7 +17,6 @@
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.internal.statistics.DefaultStatisticsService;
import org.ehcache.core.spi.service.StatisticsService;
import org.ehcache.core.statistics.CacheStatistics;
import org.ehcache.core.statistics.TierStatistics;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
Expand All @@ -25,63 +25,64 @@
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.cache.RemovalListener;
import org.ehcache.Cache;
import org.ehcache.CacheManager;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;

import java.util.Collections;

public class EhcacheDiskCachingTier<K, V> implements CachingTier<K, V>, RemovalListener<K, V> {

private CacheManager cacheManager;
private final Cache<K, V> cache;
private final PersistentCacheManager cacheManager;
public final Cache<K, V> cache; // make private after debug

private final Class<K> keyType; // I think these are needed to pass to newCacheConfigurationBuilder
private final Class<V> valueType;
private final String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value
private RemovalListener<K, V> removalListener;
private final CacheStatistics cacheStats;
private StatisticsService statsService; // non functional
private ExponentiallyWeightedMovingAverage getTimeMillisEWMA;
private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch
private static final int MIN_WRITE_THREADS = 0;
private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes
private static final String cacheAlias = "diskTier";
private final boolean isPersistent;
private int count; // number of entries in cache
// private RBMIntKeyLookupStore keystore;
// private CacheTierPolicy[] policies;
// private IndicesRequestCacheDiskTierPolicy policy;

public EhcacheDiskCachingTier(long maxWeightInBytes, long maxKeystoreWeightInBytes, Class<K> keyType, Class<V> valueType) {
public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long maxKeystoreWeightInBytes, Class<K> keyType, Class<V> valueType) {
this.keyType = keyType;
this.valueType = valueType;
String cacheAlias = "diskTier";
StatisticsService statsService = new DefaultStatisticsService();
this.isPersistent = isPersistent;
this.count = 0;
statsService = new DefaultStatisticsService();

// our EhcacheEventListener should receive events every time an entry is removed, but not when it's created
// our EhcacheEventListener should receive events every time an entry is changed
CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder
.newEventListenerConfiguration(new EhcacheEventListener(this),
.newEventListenerConfiguration(new EhcacheEventListener(this, this),
EventType.EVICTED,
EventType.EXPIRED,
EventType.REMOVED,
EventType.UPDATED)
.ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous()
EventType.UPDATED,
EventType.CREATED);
//.ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous()

PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS)
.build();

this.cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.using(statsService)
.using(statsService) // https://stackoverflow.com/questions/40453859/how-to-get-ehcache-3-1-statistics
.using(threadConfig)
.with(CacheManagerBuilder.persistence(DISK_CACHE_FP))
.withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder(
keyType, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, true))
keyType, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent))
.withService(listenerConfig) // stackoverflow shows .add(), but IDE says this is deprecated. idk
).build(true);
this.cache = cacheManager.getCache(cacheAlias, keyType, valueType);
this.cacheStats = statsService.getCacheStatistics(cacheAlias);
this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10);

// try and feed it an OpenSearch threadpool rather than its default ExecutionService?

// this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes);
// this.policies = new CacheTierPolicy[]{ new IndicesRequestCacheTookTimePolicy(settings, clusterSettings) };
// this.policy = new IndicesRequestCacheDiskTierPolicy(this.policies, true);
Expand Down Expand Up @@ -109,6 +110,7 @@ public void put(K key, V value) {
// CheckDataResult policyResult = policy.checkData(value)
// if (policyResult.isAccepted()) {
cache.put(key, value);
//count++;
// keystore.add(key.hashCode());
// else { do something with policyResult.deniedReason()? }
// }
Expand All @@ -126,6 +128,7 @@ public void invalidate(K key) {

// if (keystore.contains(key.hashCode()) {
cache.remove(key);
//count--;
// keystore.remove(key.hashCode());
// }
}
Expand Down Expand Up @@ -154,8 +157,15 @@ public Iterable<K> keys() {

@Override
public int count() {
// this might be an expensive disk-seek call. Might be better to keep track ourselves?
return (int) getTierStats().getMappings();
return count;
//return (int) getTierStats().getMappings();
}

protected void countInc() {
count++;
}
protected void countDec() {
count--;
}

@Override
Expand All @@ -172,8 +182,11 @@ public double getTimeMillisEWMA() {
return getTimeMillisEWMA.getAverage();
}

private TierStatistics getTierStats() {
return cacheStats.getTierStatistics().get("Disk");
// these aren't really needed, ShardRequestCache handles it
// Also, it seems that ehcache doesn't have functioning statistics anyway!

/*public TierStatistics getTierStats() {
return statsService.getCacheStatistics(cacheAlias).getTierStatistics().get("Disk");
}
public long getHits() {
Expand All @@ -196,32 +209,53 @@ public double getHitRatio() {
TierStatistics ts = getTierStats();
long hits = ts.getHits();
return hits / (hits + ts.getMisses());
}*/

public boolean isPersistent() {
return isPersistent;
}

public void close() {
// Call this method after each test, otherwise the directory will stay locked and you won't be able to
// initialize another IndicesRequestCache
// initialize another IndicesRequestCache (for example in the next test that runs)
cacheManager.removeCache(cacheAlias);
cacheManager.close();
}


// See https://stackoverflow.com/questions/45827753/listenerobject-not-found-in-imports-for-ehcache-3 for API reference
// it's not actually documented by ehcache :(
// This class is used to get the old value from mutating calls to the cache, and it uses those to create a RemovalNotification
private class EhcacheEventListener implements CacheEventListener<Object, Object> { // try making these specific, but i dont think itll work
// It also handles incrementing and decrementing the count for the disk tier, since ehcache's statistics functionality
// does not seem to work
private class EhcacheEventListener implements CacheEventListener<K, V> { // try making these specific, but i dont think itll work
private RemovalListener<K, V> removalListener;
EhcacheEventListener(RemovalListener<K, V> removalListener) {
private EhcacheDiskCachingTier<K, V> tier;
EhcacheEventListener(RemovalListener<K, V> removalListener, EhcacheDiskCachingTier<K, V> tier) {
this.removalListener = removalListener;
this.tier = tier; // needed to handle count changes
}
@Override
public void onEvent(CacheEvent<?, ?> event) {
// send a RemovalNotification
K key = (K) event.getKey(); // I think these casts should be ok?
V oldValue = (V) event.getOldValue();
V newValue = (V) event.getNewValue();
public void onEvent(CacheEvent<? extends K, ? extends V> event) {
K key = event.getKey();
V oldValue = event.getOldValue();
V newValue = event.getNewValue();
EventType eventType = event.getType();

System.out.println("I am eventing!!");

// handle changing count for the disk tier
if (oldValue == null && newValue != null) {
tier.countInc();
} else if (oldValue != null && newValue == null) {
tier.countDec();
}

// handle creating a RemovalReason, unless eventType is CREATED
RemovalReason reason;
switch (eventType) {
case CREATED:
return;
case EVICTED:
reason = RemovalReason.EVICTED; // why is there both RemovalReason.EVICTED and RemovalReason.CAPACITY?
break;
Expand All @@ -236,7 +270,6 @@ public void onEvent(CacheEvent<?, ?> event) {
default:
reason = null;
}
// we don't subscribe to CREATED type, which is the only other option
removalListener.onRemoval(new RemovalNotification<K, V>(key, oldValue, reason));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
private final TimeValue expire;
// private final Cache<Key, BytesReference> cache;

private final TieredCacheHandler<Key, BytesReference> tieredCacheHandler;

//private final TieredCacheHandler<Key, BytesReference> tieredCacheHandler; // made public TieredCacheSpilloverStrategyHandler for testing
public final TieredCacheSpilloverStrategyHandler<Key, BytesReference> tieredCacheHandler;
IndicesRequestCache(Settings settings) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
Expand All @@ -130,7 +130,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic

// changed to Integer for testing of bulk writes
EhcacheDiskCachingTier<Key, BytesReference> diskCachingTier;
diskCachingTier = new EhcacheDiskCachingTier<>(diskTierWeight, 0, Key.class, BytesReference.class);
diskCachingTier = new EhcacheDiskCachingTier<>(true, diskTierWeight, 0, Key.class, BytesReference.class);
tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder<Key, BytesReference>().setOnHeapCachingTier(
openSearchOnHeapCache
).setOnDiskCachingTier(diskCachingTier).setTieredCacheEventListener(this).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public CachingTier<K, V> getOnHeapCachingTier() {
return this.onHeapCachingTier;
}

public EhcacheDiskCachingTier<K, V> getDiskCachingTier() { // change to CachingTier after debug
return this.diskCachingTier;
}

private void setRemovalListeners() {
for (CachingTier<K, V> cachingTier : cachingTierList) {
cachingTier.setRemovalListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.ehcache.core.statistics.TierStatistics;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
Expand Down Expand Up @@ -122,6 +123,74 @@ public void testBasicOperationsCache() throws Exception {
cache.closeDiskTier();
}

public void testAddDirectToEhcache() throws Exception {
ShardRequestCache requestCacheStats = new ShardRequestCache();
Settings.Builder settingsBuilder = Settings.builder();
long heapSizeBytes = 1000;
settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes));
IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build());

// set up a key
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9];
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
IndicesRequestCache.Key key = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);

TestBytesReference value = new TestBytesReference(124);
cache.tieredCacheHandler.getDiskCachingTier().cache.put(key, value);

IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}

public void testSpillover() throws Exception {
// fill the on-heap cache until we spill over
ShardRequestCache requestCacheStats = new ShardRequestCache();
Settings.Builder settingsBuilder = Settings.builder();
long heapSizeBytes = 1000; // each of these queries is 115 bytes, so we can fit 8 in the heap cache
settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes));
IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build());

Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);

TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes());
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9];
for (int i = 0; i < 9; i++) {
TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i));
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
keys[i] = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
System.out.println("On-heap cache size after " + (i+1) + " queries = " + requestCacheStats.stats().getMemorySizeInBytes());
System.out.println("Disk cache size after " + (i+1) + " queries = " + requestCacheStats.stats(TierType.DISK).getMemorySizeInBytes());
}
// attempt to get value from disk cache, the first key should have been evicted
BytesReference firstValue = cache.tieredCacheHandler.get(keys[0]);
System.out.println("Final on-heap cache size = " + requestCacheStats.stats().getMemorySizeInBytes()); // is correctly 920
//System.out.println("Final self-reported disk size = " + cache.tieredCacheHandler.getDiskWeightBytes()); // is 0, should be 115
System.out.println("On-heap tier evictions = " + requestCacheStats.stats().getEvictions()); // is correctly 1
System.out.println("Disk tier hits = " + requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 1, is 0 bc keys not serializable
System.out.println("Disk tier misses = " + requestCacheStats.stats(TierType.DISK).getMissCount()); // should be 9, is 10 bc keys not serializable
//System.out.println("Disk tier self-reported misses = " + cache.tieredCacheHandler.getDiskCachingTier().getMisses()); // should be same as other one
System.out.println("On-heap tier hits = " + requestCacheStats.stats().getHitCount()); // is correctly 0
System.out.println("On-heap tier misses = " + requestCacheStats.stats().getMissCount()); // is correctly 10
System.out.println("Disk count = " + cache.tieredCacheHandler.getDiskCachingTier().count()); // should be 1, is 0
IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}

public void testCacheDifferentReaders() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
Expand Down

0 comments on commit 354c311

Please sign in to comment.