Skip to content

Commit

Permalink
add support for the folsom client
Browse files Browse the repository at this point in the history
  • Loading branch information
tootedom committed Apr 5, 2015
1 parent 64af824 commit dc81c15
Show file tree
Hide file tree
Showing 30 changed files with 916 additions and 220 deletions.
63 changes: 38 additions & 25 deletions README.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -709,33 +709,34 @@ With the YammerMetricsRecorder the following metrics are placed inside the Metri

[width="25%",options="header"]
|=========================================
|Method | Description |
|value_calculation_cache_hitrate | The cache hits per second on the internal future cache |
|value_calculation_cache_missrate| The cache misses per second on the internal future cache |
|value_calculation_cache_hitcount| The cache hits in total on the internal future cache |
|value_calculation_cache_misscount| The cache misses in total on the internal future cache |
|value_calculation_success_count | The number of successful runs of the Supplier<T> function |
|value_calculation_failure_count | The number of failed runs of the Supplier<T> function |
|value_calculation_time_timer | The time it has taken to execute the Supplier<T> function |
|distributed_cache_hitrate| The cache hits per second on the distributed cache (i.e. memcached) |
|distributed_cache_missrate| The cache misses per second on the distributed cache (i.e. memcached) |
|distributed_cache_timer| The time it takes to lookup a value from the distributed cache |
|distributed_cache_count | The number of lookups in the distributed cache that have been performed|
|distributed_cache_hitcount | The cache hits in total on the distributed cache|
|distributed_cache_misscount | The cache misses in total on the distributed cache|
|distributed_cache_writes_count| The writes performed on the distributed cache|
|stale_distributed_cache_timer|The time it takes to lookup a stale value from the distributed cache |
|stale_distributed_cache_hitrate| The stale cache hits per second on the distributed cache (i.e. memcached) |
|stale_distributed_cache_missrate| The stale cache misses per second on the distributed cache (i.e. memcached) |
|stale_distributed_cache_count| The stale hits performed on the distributed cache (i.e. memcached) |
|stale_distributed_cache_hitcount| The stale cache hits in total on the distributed cache|
|stale_distributed_cache_misscount| The stale cache misses in total on the distributed cache|
|stale_value_calculation_cache_misscount | The cache misses in total on the internal future cache for a stale value|
|stale_value_calculation_cache_hitcount | The cache hits in total on the internal future cache for a stale value|
|stale_value_calculation_cache_missrate| The cache misses per second on the internal future cache for stale value |
|stale_value_calculation_cache_hitrate| The cache hits per second on the internal future cache for stale value |
|Method | Description
|value_calculation_cache_hitrate | The cache hits per second on the internal future cache
|value_calculation_cache_missrate | The cache misses per second on the internal future cache
|value_calculation_cache_hitcount | The cache hits in total on the internal future cache
|value_calculation_cache_misscount | The cache misses in total on the internal future cache
|value_calculation_success_count | The number of successful runs of the Supplier<T> function
|value_calculation_failure_count | The number of failed runs of the Supplier<T> function
|value_calculation_time_timer | The time it has taken to execute the Supplier<T> function
|distributed_cache_hitrate | The cache hits per second on the distributed cache (i.e. memcached)
|distributed_cache_missrate | The cache misses per second on the distributed cache (i.e. memcached)
|distributed_cache_timer | The time it takes to lookup a value from the distributed cache
|distributed_cache_count | The number of lookups in the distributed cache that have been performed
|distributed_cache_hitcount | The cache hits in total on the distributed cache
|distributed_cache_misscount | The cache misses in total on the distributed cache
|distributed_cache_writes_count | The writes performed on the distributed cache
|stale_distributed_cache_timer | The time it takes to lookup a stale value from the distributed cache
|stale_distributed_cache_hitrate | The stale cache hits per second on the distributed cache (i.e. memcached)
|stale_distributed_cache_missrate | The stale cache misses per second on the distributed cache (i.e. memcached)
|stale_distributed_cache_count | The stale hits performed on the distributed cache (i.e. memcached)
|stale_distributed_cache_hitcount | The stale cache hits in total on the distributed cache
|stale_distributed_cache_misscount | The stale cache misses in total on the distributed cache
|stale_value_calculation_cache_misscount | The cache misses in total on the internal future cache for a stale value
|stale_value_calculation_cache_hitcount | The cache hits in total on the internal future cache for a stale value
|stale_value_calculation_cache_missrate | The cache misses per second on the internal future cache for stale value
|stale_value_calculation_cache_hitrate | The cache hits per second on the internal future cache for stale value
|=========================================================

'''



Expand Down Expand Up @@ -1085,4 +1086,16 @@ To compile and run perf tests do:
----
mvn clean test-compile assembly:single
$JAVA_HOME/bin/java -jar target/performancetests-test-jar-with-dependencies.jar
Example output:
Benchmark Mode Cnt Score Error Units
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest thrpt 60 21.335 ± 1.396 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTest thrpt 60 21.315 ± 0.706 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTest thrpt 60 21.194 ± 0.873 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTest thrpt 60 20.755 ± 0.340 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss thrpt 60 57.897 ± 1.095 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss thrpt 60 123.656 ± 2.166 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress thrpt 60 81.231 ± 1.841 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress thrpt 60 136.641 ± 3.344 ops/ms
----
9 changes: 9 additions & 0 deletions jmh-result.text
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Benchmark Mode Cnt Score Error Units
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest thrpt 60 21.335 ± 1.396 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTest thrpt 60 21.315 ± 0.706 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTest thrpt 60 21.194 ± 0.873 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTest thrpt 60 20.755 ± 0.340 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss thrpt 60 57.897 ± 1.095 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss thrpt 60 123.656 ± 2.166 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress thrpt 60 81.231 ± 1.841 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress thrpt 60 136.641 ± 3.344 ops/ms
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>folsom</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
import com.google.common.util.concurrent.*;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import org.greencheek.caching.herdcache.RequiresShutdown;
import org.greencheek.caching.herdcache.lru.CacheRequestFutureComputationCompleteNotifier;
import org.greencheek.caching.herdcache.lru.CacheValueComputationFailureHandler;
import org.greencheek.caching.herdcache.memcached.config.ElastiCacheCacheConfig;
import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig;
import org.greencheek.caching.herdcache.memcached.factory.MemcachedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.ReferencedClient;
import org.greencheek.caching.herdcache.memcached.factory.*;
import org.greencheek.caching.herdcache.memcached.keyhashing.*;
import org.greencheek.caching.herdcache.memcached.metrics.MetricRecorder;
import org.greencheek.caching.herdcache.memcached.spyconnectionfactory.SpyConnectionFactoryBuilder;
Expand Down Expand Up @@ -40,6 +37,17 @@ public static ConnectionFactory createMemcachedConnectionFactory(MemcachedCacheC
config.getProtocol(),config.getReadBufferSize(),config.getKeyHashType());
}

public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCacheConfig config) {
switch(config.getClientType()) {
case SPY:
return new SpyMemcachedReferencedClientFactory<>(createMemcachedConnectionFactory(config.getMemcachedCacheConfig()));
case FOLSOM:
return new FolsomReferencedClientFactory<>(config);
default:
return new SpyMemcachedReferencedClientFactory<>(createMemcachedConnectionFactory(config.getMemcachedCacheConfig()));
}
}

public static final String CACHE_TYPE_VALUE_CALCULATION = "value_calculation_cache";
public static final String CACHE_TYPE_STALE_VALUE_CALCULATION = "stale_value_calculation_cache";
public static final String CACHE_TYPE_CACHE_DISABLED = "disabled_cache";
Expand Down Expand Up @@ -191,15 +199,15 @@ private void writeToDistributedCache(ReferencedClient client,
int entryTTLInSeconds = (int)getDuration(timeToLive);

if( waitForMemcachedSet ) {
Future<Boolean> futureSet = client.getClient().set(key, entryTTLInSeconds, value);
Future<Boolean> futureSet = client.set(key, entryTTLInSeconds, value);
try {
futureSet.get(waitForSetDurationInMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.warn("Exception waiting for memcached set to occur",e);
}
} else {
try {
client.getClient().set(key, entryTTLInSeconds, value);
client.set(key, entryTTLInSeconds, value);
} catch (Exception e) {
logger.warn("Exception waiting for memcached set to occur",e);
}
Expand Down Expand Up @@ -523,18 +531,13 @@ private V getFromDistributedCache(ReferencedClient client,String key, long timeo
Object serialisedObj = null;
long nanos = System.nanoTime();
try {
Future<Object> future = client.getClient().asyncGet(key);
Object cacheVal = future.get(timeoutInMillis,TimeUnit.MILLISECONDS);
Object cacheVal = client.get(key,timeoutInMillis, TimeUnit.MILLISECONDS);
if(cacheVal==null){
logCacheMiss(key,cacheType);
} else {
logCacheHit(key,cacheType);
serialisedObj = cacheVal;
}
} catch ( OperationTimeoutException | CheckedOperationTimeoutException e) {
logger.warn("timeout when retrieving key {} from memcached",key);
} catch (TimeoutException e) {
logger.warn("timeout when retrieving key {} from memcached", key);
} catch(Exception e) {
logger.warn("Unable to contact memcached for get({}): {}", key, e.getMessage());
} catch(Throwable e) {
Expand Down Expand Up @@ -586,25 +589,23 @@ public void clear() {
public void clear(boolean waitForClear) {
clearInternalCaches();
ReferencedClient client = clientFactory.getClient();
if(client.isAvailable()) {
MemcachedClientIF cli = client.getClient();
if(client!=null) {
Future<Boolean> future = cli.flush();
if (client.isAvailable()) {
Future<Boolean> future = client.flush();
if(future!=null) {
long millisToWait = config.getWaitForRemove().toMillis();
if(waitForClear || millisToWait>0) {
if (waitForClear || millisToWait > 0) {
try {
if(millisToWait>0) {
future.get(millisToWait,TimeUnit.MILLISECONDS);
}
else {
if (millisToWait > 0) {
future.get(millisToWait, TimeUnit.MILLISECONDS);
} else {
future.get();
}
} catch (InterruptedException e) {
logger.warn("Interrupted whilst waiting for cache clear to occur",e);
logger.warn("Interrupted whilst waiting for cache clear to occur", e);
} catch (ExecutionException e) {
logger.warn("Exception whilst waiting for cache clear to occur",e);
logger.warn("Exception whilst waiting for cache clear to occur", e);
} catch (TimeoutException e) {
logger.warn("Timeout whilst waiting for cache clear to occur",e);
logger.warn("Timeout whilst waiting for cache clear to occur", e);
}
}
}
Expand Down Expand Up @@ -637,15 +638,17 @@ private void waitForDelete(Future<Boolean> future,long millisToWait,
@Override
public void clear(String key) {
ReferencedClient client = clientFactory.getClient();
if(client.isAvailable()) {
MemcachedClientIF cli = client.getClient();
if(client!=null) {
long millisToWait = config.getWaitForRemove().toMillis();
if(config.isUseStaleCache()) {
Future<Boolean> staleCacheFuture = cli.delete(createStaleCacheKey(key));
if (client.isAvailable()) {
key = getHashedKey(key);
long millisToWait = config.getWaitForRemove().toMillis();
if (config.isUseStaleCache()) {
Future<Boolean> staleCacheFuture = client.delete(createStaleCacheKey(key));
if (staleCacheFuture != null) {
waitForDelete(staleCacheFuture, millisToWait, key, "stale cache");
}
Future<Boolean> future = cli.delete(key);
}
Future<Boolean> future = client.delete(key);
if (future != null) {
waitForDelete(future, millisToWait, key, "cache");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.greencheek.caching.herdcache.memcached.elasticacheconfig.client.ElastiCacheServerConnectionDetails;
import org.greencheek.caching.herdcache.memcached.elasticacheconfig.client.LocalhostElastiCacheServerConnectionDetails;
import org.greencheek.caching.herdcache.memcached.factory.ElastiCacheClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.ReferencedClientFactory;

import java.util.List;

Expand All @@ -24,7 +25,7 @@ public ElastiCacheMemcachedCache(ElastiCacheCacheConfig config) {

private ElastiCacheMemcachedCache(MemcachedCacheConfig mConfig,ElastiCacheCacheConfig config) {
super(new ElastiCacheClientFactory(
createMemcachedConnectionFactory(mConfig),
createReferenceClientFactory(config),
ElastiCacheConfigHostsParser.parseElastiCacheConfigHosts(config.getElastiCacheConfigHosts()),
config.getConfigPollingTime(),
config.getInitialConfigPollingDelay(),
Expand All @@ -43,4 +44,6 @@ private ElastiCacheMemcachedCache(MemcachedCacheConfig mConfig,ElastiCacheCacheC
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.greencheek.caching.herdcache.memcached;

import org.greencheek.caching.herdcache.memcached.config.ElastiCacheCacheConfig;
import org.greencheek.caching.herdcache.memcached.factory.FolsomReferencedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedClientFactory;

/**
*
*/
public class FolsomMemcachedCache<V> extends BaseMemcachedCache<V> {
public FolsomMemcachedCache(ElastiCacheCacheConfig config) {
super(new SpyMemcachedClientFactory<V>(config.getMemcachedCacheConfig().getMemcachedHosts(),
config.getMemcachedCacheConfig().getDnsConnectionTimeout(), config.getMemcachedCacheConfig().getHostStringParser(),
config.getMemcachedCacheConfig().getHostResolver(), new FolsomReferencedClientFactory<V>(config)), config.getMemcachedCacheConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig;
import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedReferencedClientFactory;

/**
* Created by dominictootell on 26/08/2014.
*/
public class SpyMemcachedCache<V> extends BaseMemcachedCache<V> {
public SpyMemcachedCache(MemcachedCacheConfig config) {
super(new SpyMemcachedClientFactory(config.getMemcachedHosts(),
super(new SpyMemcachedClientFactory<V>(config.getMemcachedHosts(),
config.getDnsConnectionTimeout(),config.getHostStringParser(),
config.getHostResolver(),createMemcachedConnectionFactory(config)), config);
config.getHostResolver(),new SpyMemcachedReferencedClientFactory<V>(createMemcachedConnectionFactory(config))), config);
}
}
Loading

0 comments on commit dc81c15

Please sign in to comment.