The cache borrows heavily from the concepts, and implementation of caching in spray-caching
The idea here being that the cache stores a future rather than the value, rather than values of a given generic type. The benefit of this approach is that is that is has the advantage, that it nicely takes care of thundering herds/cache miss storm issue. Which is where many requests to a particular cache key (e.g. a resource URI) arrive before the first one could be completed. The result being that the cache key is requested ( the processing associated with calculating the value is performed ) multiple times. As a result the backend resource is put under undue load.
Storing a future in the cache means that multiple resources for a missing cache value that hasn’t been calculated, wait on the same future that is executing for the initial/first request for the cache key.
The below details how to use the caching implementations, the differences between them.
<dependency>
<groupId>org.greencheek.caching</groupId>
<artifactId>herdcache</artifactId>
<version>1.0.10</version>
</dependency>
Please note that 0.1.0 is not backwards compatible with 1.0.1. 1.0.1 extends the Cache interface to include a couple of get methods. Therefore, introduction a breaking change with the old api.
There are currently two types of Cache that implement the Cache<V>
interface:
-
SimpleLastRecentlyUsedCache
-
ExpiringLastRecentlyUsedCache
And two types of Cache that implement the CacheWithExpiry<V>
that extends upon the Cache<V>
interface:
-
SpyMemcachedCache<V>
-
ElastiCacheMemcachedCache<V>
The cache used to have a single method apply
that takes:
-
The key to look for
-
An implementation of the
Supplier<T>
functional interface -
The guava
ListeningExecutorService
executor
The method is: ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)
The returned value is that of Guava’s ListenableFuture, upon which you can attach a callback, or wait for a value to be generated:
The cache now includes two additional methods:
-
public ListenableFuture<V> get(String key)
-
public ListenableFuture<V> get(String key,ListeningExecutorService executorService)
Both methods lookup a value in the cache that is associated with a value. The difference between the get
and the apply
,
is that the apply
can generate the value, whilst the get
only looks up in the cache.
Both get methods lookup a cache value, always returning a Guava’s ListenableFuture
The below shows a couple of examples of working with the returned ListenableFuture
.
-
Adding a callback:
// Executes on the calling thread Futures.addCallback(future,new FutureCallback<String>() { @Override public void onSuccess(String result) { } @Override public void onFailure(Throwable t) { } }); // Executes on the passing in executor thread pool private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); Futures.addCallback(val,new FutureCallback<String>() { @Override public void onSuccess(String result) { } @Override public void onFailure(Throwable t) { } },executorService);
-
Waiting for the value (or failure)
try { future.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { }
This cache uses a ConcurrentLinkedHashMap to store a maximum number of values in the cache. Once the cache hit the maximum number of values, the key that has been Last recently used is removed
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Cache<String> cache = new SimpleLastRecentlyUsedCache<>();
ListenableFuture<String> val = cache.apply("Key1",
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "key1";
},
executorService);
The SimpleLastRecentlyUsedCache
has no expiry on the items in the cache. It is just limited by the number of
items in the cache and the item that has been last recently used.
This can be seen in the following example:
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Cache<String> cache = new SimpleLastRecentlyUsedCache<>();
ListenableFuture<String> val = cache.apply("Key1", () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "key1";
}, executorService);
ListenableFuture<String> val2 = cache.apply("key2", () -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "key2";
}, executorService);
ListenableFuture<String> val3 = cache.apply("key3", () -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "key3";
}, executorService);
ListenableFuture<String> val4 = cache.apply("key1", () -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "key_new";
}, executorService);
assertEquals("Value should be key1","key1",this.awaitForFutureOrElse(val, null));
assertEquals("Value should be key2","key2",this.awaitForFutureOrElse(val2, null));
assertEquals("Value should be key3","key3",this.awaitForFutureOrElse(val3, null));
// Key1 will no longer be in the cache, only key2 and key3
assertEquals("Value should be key1","key_new",this.awaitForFutureOrElse(val4, null));
The difference between SimpleLastRecentlyUsedCache
and ExpiringLastRecentlyUsedCache
is that the later has a default
time to live for the elements that are put in the cache, and also; if desired, a time to idle for the items.
The timeToLive
and timeToIdle
are supplied to the constructor of the cache:
Example, of creating a cache for which the item will live for 1 minute, regardless of when they were last used:
new ExpiringLastRecentlyUsedCache<>(10,60,0, TimeUnit.SECONDS));
The Cache<V>
interface inherits a Utility interface (AwaitOnFuture<V>
) that gives you a couple of utility methods that allow you to wait
on futures, for a value to be calculated
-
V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)
-
V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)
A the value returned back from a cache apply is that of a ListenableFuture
. You can naturally wait on the currently
executing thread (blocking that thread), for a value to be returned. This is as follows:
try {
return future.get();
} catch (Exception e) {
return somefallback;
}
The method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)
, remove the ceremony of the try/catch
block for you.
The other method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)
allows you wait a finite amount of time for a value to be returned. The amount of time elapsed, the onTimeoutValue
is going to be returned.
Any other exception results in the onExceptionValue
being thrown.
There are two implementations of the CacheWithExpiry<V>
interface:
-
SpyMemcachedCache<V>
-
ElastiCacheMemcachedCache<V>
The second implementation ElastiCacheMemcachedCache<V>
is an extension of the SpyMemcachedCache<V>
implementation
for working with Amazon AWS’s memcached support (known as ElastiCache).
The CacheWithExpiry<V>
contains the method:
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService);
The difference between this method and the apply
that is available in the Cache<V>
interface, is the addition of the
Duration parameter. Meaning that keys can have differing cache expiry times (memcached supports this).
Both the following cache classes use the following defaults.
The ElastiCacheCacheConfigBuilder
extends the abstract class MemcachedCacheConfigBuilder
which contains the defaults
for which the SpyMemcachedCache<V>
will execute. The builder allows you to override the defaults:
The following defaults are for both memcached and ElastiCache memcached
Method | Default | Description | |
---|---|---|---|
setTimeToLive |
Duration.ofSeconds(60); |
The default expiry time an item with be given if not specified |
|
setMaxCapacity |
1000; |
Max number of futures to internal cache whilst a value is being calculated |
|
setMemcachedHosts |
"localhost:11211"; |
Comma separated host list |
|
setHashingType |
ConnectionFactoryBuilder.Locator.CONSISTENT; |
Using consistent hashing, don’t change |
|
setFailureMode |
FailureMode.Redistribute; |
When an error occurs, what should occur (FailureMode.Retry may suit you better for this) |
|
setHashAlgorithm |
DefaultHashAlgorithm.KETAMA_HASH; |
Type of consistent hashing to be used for calculating the memcached node to talk to, don’t change |
|
serializingTranscoder |
new FastSerializingTranscoder(); |
The type of serializer to be used. Class responsbile for serialising java objects to a byte stream to store in memcached |
|
protocol |
ConnectionFactoryBuilder.Protocol.BINARY; |
the protocol used for talking to memcached |
|
readBufferSize |
DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE; |
default socket buffer size when talking to memcached, do not change |
|
memcachedGetTimeout |
Duration.ofMillis(2500); |
when looking in memcached for a matching key, this is the amount of time to wait before timing out |
|
dnsConnectionTimeout |
Duration.ofSeconds(3); |
When resolving the memcachedHosts to ip addresses, the amount of time to wait for dns lookup, before ignoring that node |
|
waitForMemcachedSet |
false |
Wait for the write to memcached to occur before removing future from internal cache |
|
setWaitDuration |
Duration.ofSeconds(2); |
amount of time to wait for the memcached set |
|
keyHashType |
KeyHashingType.JAVA_XXHASH; |
how the cache key is hashed |
|
keyPrefix |
Optional.empty() |
should the key used in lookup, be prefixed with a string to avoid the unlikely event of a key claash. |
|
asciiOnlyKeys |
false; |
we only have ascii keys that will be stored in the cache |
|
hostStringParser |
new CommaSeparatedHostAndPortStringParser(); |
do not change |
|
hostResolver |
new AddressByNameHostResolver(); |
do not change |
|
useStaleCache |
false; |
whether stale caching is enabled |
|
staleCacheAdditionalTimeToLive |
Duration.ZERO; |
The amount of time extra that items will be stored in the stale cached |
|
staleCachePrefix |
"stale"; |
The prefix for stale keys, to avoid clash |
|
staleMaxCapacity |
-1; |
The size of the cache for futures for the stae cache is the same as the |
|
staleCacheMemachedGetTimeout |
Duration.ZERO |
Time to wait for lookups against the stale cache |
|
removeFutureFromInternalCacheBeforeSettingValue |
false; |
When the |
The following default apply just to that of ElastiCache memcached
Method |
Default |
Description |
|
setElastiCacheConfigHosts |
"localhost:11211"; |
The memcached elasticache config host name i.e. yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211 |
|
setConfigPollingTime |
Duration.ofSeconds(60); |
The frequency by which to contact the config host for potential updates to the memcached nodes |
|
setInitialConfigPollingDelay |
Duration.ZERO; |
The time for the initial poll to the config host to obtain the memcached nodes |
|
setConnectionTimeoutInMillis |
Duration.ofMillis(3000); |
The time for establishing a connection to the config host before stopping and retrying |
|
setIdleReadTimeout |
Duration.ofSeconds(125); |
If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout. |
|
setReconnectDelay |
Duration.ofSeconds(5); |
The delay between performing a reconnection attempt to the config host |
|
setDelayBeforeClientClose |
Duration.ofSeconds(300); |
When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use |
|
setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect |
3 |
If the config host returns invalid config this number of times in a row, a reconnection will be made |
|
setUpdateConfigVersionOnDnsTimeout |
true; |
Set to false, if you don’t want to acknowledge a config update, if a dns resolution for any of the memcached nodes failed |
The SpyMemcachedCache<V>
implementation uses the spy memcached java library to communicate with memcached.
The implementation is similar to that of SimpleLastRecentlyUsedCache
in that it uses a ConcurrentLinkedHashMap
to store the cache key against an executing future.
When two requests come for the same key, the future is stored in an internal ConcurrentLinkedHashMap:
store.putIfAbsent(keyString, future)
If a subsequent request comes in for the same key, and the future has not completed yet, the existing future in the
ConcurrentLinkedHashMap is returned to the caller. This way two requests wait on the same executing Supplier<V> computation
When constructing the SpyMemcachedCache
, you can specify the max size of the internal ConcurrentLinkedHash that is used
to store the concurrently executing futures.
Unlike the SimpleLastRecentlyUsedCache
implementation, that stores the Completed futures in the ConcurrentLinkedHash
for subsequent cache hits to obtain the completed future’s value, the SpyMemcachedCache<V>
cache removes the key and associated future from
the internal ConcurrentLinkedHash
. The value of the completed future is instead stored in memcached for subsequent retrieval.
Before the Supplier<V> computation
is submitted to the passed executor for execution, the memcached cluster is checked
for the existance of a value for the given key. If a value is present in memcached, the returned future will be set with
the obtained value. This means that if two request comes in for the same key, for which a value is present in memcached
they will wait on the same future to have it’s value set to that of the memcached cache hit.
If a value does not exist in the memcached, then the given Supplier<V>
computation is submitted to the provided executor,
for execution. Once the value has been calculated, it is sent over the network to memcached for storage.
With this library the value is stored asynchronously in memcached, and the future completed with the computed value and sub-sequentially the future is removed from the ConcurrentLinkedHashMap. Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.
It is possible when constructing the SpyMemcachedCache
to specify to a period of time
(i.e. make the asynchronous set into memcached call semi synchronous) to wait for the set to occur.
The SpyMemcachedCache
is created by passing a MemcachedCacheConfig
. A MemcachedCacheConfig
is created via that of
a ElastiCacheCacheConfigBuilder
that contains the method public MemcachedCacheConfig buildMemcachedConfig()
that build
the CacheConfig for both the ElastiCacheMemcachedCache
and the SpyMemcachedCache
The following show various ways of configuring the cache:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211")
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {
return "value1";
}, Duration.ofSeconds(3), executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
By default the host string is localhost:11211
, however, you can specify a number of hosts to connect to by specifying
them as a comma separated string in the Builder:
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
.buildMemcachedConfig()
);
When the SpyMemcachedCache
is passed the list of memcached hosts, the ip address for host needs to be resolved.
By default 3 seconds, per host, is waited for to obtain the ip address. This can be controlled, like as follows:
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.buildMemcachedConfig()
);
There are two ways to specify the Expiry of items that are stored in memcached:
-
A global Time To Live for the items
-
Passing the Time To Live for cached item in the
apply
method
The below for example will set a default of 30 seconds for all items saved in the cache, for which a TimeToLive has not been specified:
ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, executorService);
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(30))
.buildMemcachedConfig()
);
To specify the TTL on a per time basis, specify the Duration when calling the apply
method:
ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, Duration.ofSeconds(10), executorService);
When an item is not in the cache, or currently being calculated; the cache will execute the Supplier<V>
computation,
and store the returned value in memcached. A future has been created and stored in the internal future calculation cache,
so that any requests for the same key, wait on the completion of the same future.
With this library the computed cache value is stored asynchronously in memcached, and the future completed with the same value. The future is completed, and removed from the internal future calculation cache ( ConcurrentLinkedHashMap ). Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.
As a result, you can request that the write to memcached be synchronous and a finite period be waited for, for the write to take place. This is done a constructor time, as shown in the following which waits a max of 3 seconds for the set to occur.
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setSetWaitDuration(Duration.ofSeconds(3))
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {
return "value1";
}, Duration.ofSeconds(3), executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
Items in the cache can have no expiry TTL apply by specifying the duration as ZERO
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setSetWaitDuration(Duration.ofSeconds(3))
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {return "value1";}, Duration.ZERO, executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
The cache key has to be a string. Memcached has a requirement for makeup of keys, when using the TEXT protocol, such that your key object must conform to the following requirements.
-
Needs to be a string
-
cannot contain ' '(space), '\r'(return), '\n'(linefeed)
If you are using the BINARY protocol these requirements do not apply. However, you may wish to perform hashing of the string representing the key to allow for any character to be used. The cache has the ability for a couple of hash representations of the key:
-
NONE,
-
NATIVE_XXHASH,
-
JAVA_XXHASH,
-
MD5_UPPER,
-
SHA256_UPPER,
-
MD5_LOWER,
-
SHA256_LOWER
To use either of these you need to specify the hashing method to be used at cache construction time. For the best performance, XXHash is recommended:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setKeyHashType(KeyHashingType.MD5_LOWER)
.buildMemcachedConfig()
);
When hashing a key, there is a potential for two different Strings to actually end up with the same Hashed value. As a result you can add a cache prefix to the cache at construction.
The below specifies a cache prefix of article
. This will be prepended to the hashed cache key, the method setHashKeyPrefix(false)
means that the prefix will be added after the cache key has be hashed. setting setHashKeyPrefix(true)
to true means that
the prefix will be prepended to the cache key, and then the hashing will take place. This is the default, as the prefix
has the potential to break the TEXT protocol key requirements (Hashing the key makes sure this does not occur).
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setKeyHashType(KeyHashingType.MD5_LOWER)
.setKeyPrefix(Optional.of("article"))
.setHashKeyPrefix(false)
.buildMemcachedConfig()
);
Since 1.0.6 the client has the following method:
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,
Predicate<V> canCacheValueEvalutor);
And The CacheWithExpiry<V>
contains the method:
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive,
ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor);
These methods allow you to pass a Predicate<V>
that you can use to evaluate whether the value returned from the
Supplier<V>
(the function generating the value to cache), should actually be stored in memcached, etc. This can be
useful in situtations where the Supplier<V>
is lets say a HystrixCommand object, how value has on this occasion been
generated by it’s fallback. The Predicate<V>
could wrap the command object an evaluate if the value was from the
fallback and choose not to cache:
apply("webservicecallx",() -> command.execute(),
(cachevalue) -> {
return !command.isResponseFromFallback();
}
)
Since 1.0.1 the client supports a stale caching mechanism; this by default is not
enabled as it requires an additional future (via composition) to perform the additional cache lookup.
It is also an addition lookup on the memcached server, and also will use x2 the memory (items are stored twice in the cache).
Enabling the stale caching feature is done via the .setUseStaleCache(true)
method.
The stale caching function is a mini "stale-while-revalidate" mechanism. Without the stale caching enabled, when an item expires in the cache, which is popular; then a lot of requests will be waiting on the cache item to be regenerated from the backend. This means you can have a spike in a larger than you would like requests.
With stale caching enabled, only one request will regenerate the item from the backend cache. The other requests will use a stale cache. The stale cached is ONLY checked if a future exists in the internal cache, meaning that a backend request is in operation to calculate the cache item
With stale caching enabled when an item is stored in memcached, it is stored twice. The 2nd time it is stored under a
different key. This key is made up of the hashed cache key, and the stale cache key prefix set via the constructor method
.setStaleCachePrefix("staleprefix")
. The default value is that of stale
.
The item is stored, by default for setTimeToLive
longer than the original cache item.
To provide a value of your own, say 10 minutes extra, you can specify this at construction time:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(1))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofMinutes(10))
.setStaleCachePrefix("staleprefix")
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.buildMemcachedConfig()
);
Stale Caching is available in both SpyMemcachedCache
and ElastiCacheMemcachedCache
Metric are available in both SpyMemcachedCache
and ElastiCacheMemcachedCache
as of version 1.0.11
. The configuration builder takes an option .setMetricsRecorder(..)'
This takes an implementation of org.greencheek.caching.herdcache.memcached.metrics.MetricsRecorder. The default
implementation being a `NoOpMetricRecorder
. The other implementation is that of the new YammerMetricsRecorder(registry)
which uses the yammer metrics library (https://dropwizard.github.io/metrics).
With the YammerMetricsRecorder the following metrics are placed inside the Metrics library:
Method | Description |
---|---|
value_calculation_cache_hitrate |
The cache hits per second on the internal future cache |
value_calculation_cache_missrate |
The cache misses per second on the internal future cache |
value_calculation_cache_hitcount |
The cache hits in total on the internal future cache |
value_calculation_cache_misscount |
The cache misses in total on the internal future cache |
value_calculation_success_count |
The number of successful runs of the Supplier<T> function |
value_calculation_failure_count |
The number of failed runs of the Supplier<T> function |
value_calculation_time_timer |
The time it has taken to execute the Supplier<T> function |
distributed_cache_hitrate |
The cache hits per second on the distributed cache (i.e. memcached) |
distributed_cache_missrate |
The cache misses per second on the distributed cache (i.e. memcached) |
distributed_cache_timer |
The time it takes to lookup a value from the distributed cache |
distributed_cache_count |
The number of lookups in the distributed cache that have been performed |
distributed_cache_hitcount |
The cache hits in total on the distributed cache |
distributed_cache_misscount |
The cache misses in total on the distributed cache |
distributed_cache_writes_count |
The writes performed on the distributed cache |
stale_distributed_cache_timer |
The time it takes to lookup a stale value from the distributed cache |
stale_distributed_cache_hitrate |
The stale cache hits per second on the distributed cache (i.e. memcached) |
stale_distributed_cache_missrate |
The stale cache misses per second on the distributed cache (i.e. memcached) |
stale_distributed_cache_count |
The stale hits performed on the distributed cache (i.e. memcached) |
stale_distributed_cache_hitcount |
The stale cache hits in total on the distributed cache |
stale_distributed_cache_misscount |
The stale cache misses in total on the distributed cache |
stale_value_calculation_cache_misscount |
The cache misses in total on the internal future cache for a stale value |
stale_value_calculation_cache_hitcount |
The cache hits in total on the internal future cache for a stale value |
stale_value_calculation_cache_missrate |
The cache misses per second on the internal future cache for stale value |
stale_value_calculation_cache_hitrate |
The cache hits per second on the internal future cache for stale value |
Since release 1.0.1 there has been support AWS’s ElasticCache memcached cluster:
This is done by creating an instance of ElastiCacheMemcachedCache<V>
rather than SpyMemcachedCache<V>
. An example
is as follows:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
.setConfigPollingTime(Duration.ofSeconds(10))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(2))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig()
);
The ElastiCache cache works by using the auto discovery mechanism as described here, through that of a ElastiCache Configuration Endpoint, which is described here:
You supply to the ElastiCacheMemcachedCache<V>
cache the url of the ElastiCache Configuration Endpoint.
The ElastiCache cache uses the netty library (http://netty.io/) to periodically send the config get cluster command
to the ElastiCache Configuration Endpoint. The ElastiCache keeps a persistent connection open to the ElastiCache
Configuration Endpoint, sending the command periodically. The ElastiCache Configuration Endpoint returns a
configuration similar to the following, that details the actually memcached instances that should be connected to:
CONFIG cluster 0 147
12
myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211
END
When the version number (the second line) increases a new spy memcached instance is created, and the old spy
memcached instance is scheduled for being closed. The ElastiCacheMemcachedCache<V>
continuously polls the
ElastiCache Configuration Endpoint, for any changes in the number of memcached hosts, or the hosts that are available.
The ElastiCache Configuration Endpoint is specified via the setElastiCacheConfigHosts(String config)
method on the
ElastiCacheCacheConfigBuilder
object:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
For the moment you should only specify 1 configuration host. Currently a cache cluster is only in one Availability Zone. A cluster cannot at the moment in AWS span multiple Availability Zones. You can have 3 separate elasticache clusters, one in each availability zone, but the cache will only connect to 1 availability zone at any one time.
By default the ElastiCache cache polls the ElastiCache Configuration Endpoint for an update to the nodes that make up the cluster every 60 seconds. This can be configured via the following methods:
-
.setConfigPollingTime(Duration.ofSeconds(10))
-
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
This can be seen in the following example:
private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
.setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig());
The ElastiCache uses a persistent connection to the ElastiCache Configuration Endpoint. If the connection is lost,
the client will automatically reconnect. The client will wait for a period (default 5 seconds) before reconnecting.
This can be changed by specifying the delay via the method: .setReconnectDelay(Duration.ofSeconds(10))
.
If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made;
this idle period is controlled by the setting idleReadTimeout. This is set to 125 seconds by default.
If you modify this setting you SHOULD NOT
set it lower that the polling duration; as you will just end up in the
persistent connection not being persistent.
.setReconnectDelay(Duration.ofSeconds(5))
.setIdleReadTimeout(Duration.ofSeconds(125))
If the ElastiCache Configuration Endpoint is in some way returning invalid configurations,
then the client will reconnect to the Configuration Endpoint. By default it takes 3 consecutive invalid
configurations before the client will reconnect. This is controlled by the method:
setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(int number)
When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created,
and the old client is closed. There a delay before the old client is closed, as it may still be in use
(i.e. network requests may still be executing). By default the delay is 10 second; this can be change by specifying the
following config builder method .setDelayBeforeClientClose(Duration.ofSeconds(1))
By default the client will wait for 3 seconds for a connection to the ElastiCache Configuration Endpoint.
This can be changed by the following following config builder method .setConnectionTimeoutInMillis(Duration.ofSeconds(2))
When the ElastiCache Configuration Endpoint returns the configuration information it returns the hostname, and it may send with it the IP address.
CONFIG cluster 0 147
12
myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211
END
If the IP address is not returned the client will perform a DNS lookup on the hostname.
By default the timeout is 3 seconds. This can be changed with the builder method .setDnsConnectionTimeout(Duration.ofSeconds(2))
If a DNS lookup fails, due to connection timeout (or a temporary network issue), or otherwise. By default that host will be excluded from the list of memcached hosts the cache client will be connected to. As a result, this will not change unless you update the cluster configuration and a new version of the config is supplied by the ElastiCache Configuration Endpoint.
A builder configuration property .setUpdateConfigVersionOnDnsTimeout(true)
allows you to change this behaviour when a
host is not resolved to an IP. The resolution of a host’s dns may be a temporary issue, and on the next polling config
the dns will be resolvable. If you set the builder property to false .setUpdateConfigVersionOnDnsTimeout(false)
Then the memcached client will be updated to point to the hosts mentioned in the config; but if any host resolution fails; the client will not record the current configuration’s version number. Meaning on the next poll for the current cluster configuration, the memcached client will again be recreated, to point to the hosts specified in configuration.
Note if the dns resolution is constantly failing then client memcached client will constantly be re-created each time the configuration polling occurs. No validation of the previously resolved hosts, and the current resolved hosts is performed. The client will just be recreated.
This feature is available in 1.0.9
and above.
As previous discussed above, when you create an ElastiCache cache you provide the url to the configuration endpoint:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
It is possible that you might wish to create another cluster, with a different machine type, and switch the ElastiCache cache over at runtime to the new cluster. For example, you are moving the cache types to faster cpu type machines.
It is possible to do this by constructing a SimpleVolatileBasedElastiCacheConfigServerUpdater
and passing that to the
builder:
ElastiCacheConfigServerUpdater configUrlUpdater = new SimpleVolatileBasedElastiCacheConfigServerUpdater()
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
.setConfigUrlUpdater(configUrlUpdater)
.buildElastiCacheMemcachedConfig())
You would then code a JMX hook, or admin REST endpoint in your application that called the method: connectionUpdated(String url)
to inform the cache that the configuration url has changed, and that it should connect to the new url to
obtain the new list of cache cluster nodes.
configUrlUpdater.connectionUpdated("yourcluster.irujgk.0001.euw1.cache.amazonaws.com:11211")
The cache will connect to the new config cluster endpoint and obtain the cache cluster nodes. The cache will wait for
setReconnectDelay(Duration.ofSeconds(5))
(5 seconds is the default), before attempting the connection to the new cluster
config endpoint. You can reduce or increase this timeout.
The below is a simple java main class the can be run on the command line like the following. The below generates a random integer between 1 and 1000, and applys that value to the cache.
java -DmaxRand=1000 -Dmillis=500 -Dhosts=herdtesting.jgkygp.cfg.euw1.cache.amazonaws.com:11211 -jar herd-elastitest-0.1.0-SNAPSHOT-relocated-shade.jar
With the given logback.xml, you would have output on the console that will show if I can hit or not occurred:
71671 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "-778756949", "cachetype" : "distributed_cache"}
71671 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 633
71680 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "274176478", "cachetype" : "distributed_cache"}
71680 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 35
71690 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachemiss" : "65783974", "cachetype" : "distributed_cache"}
71690 [pool-1-thread-1] DEBUG o.g.c.h.m.BaseMemcachedCache - set requested for 65783974
71691 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 107
package org.greencheek.caching.elasticache;
import com.google.common.util.concurrent.MoreExecutors;
import net.spy.memcached.ConnectionFactoryBuilder;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import org.greencheek.caching.herdcache.memcached.ElastiCacheMemcachedCache;
import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class ElastiCacheTest {
private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private static final Logger logger = LoggerFactory.getLogger("ElastiCacheTest");
private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
.setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig());
public static void main(String[] args) {
service.scheduleAtFixedRate(()-> {
int i = randInt(Integer.getInteger("minRand",1),Integer.getInteger("maxRand",2));
logger.info("Adding cache value : {}",cache.awaitForFutureOrElse(
cache.apply(""+i,() -> { return i; },
MoreExecutors.sameThreadExecutor()
),
MoreExecutors.sameThreadExecutor()),"null");
}
,0,Integer.getInteger("millis",1000),TimeUnit.MILLISECONDS);
}
public static int randInt(int min,int max) {
// NOTE: Usually this should be a field rather than a method
// variable so that it is not re-seeded every call.
Random rand = new Random();
// nextInt is normally exclusive of the top value,
// so add 1 to make it inclusive
return rand.nextInt((max - min) + 1) + min;
}
}
<configuration scan="true" scanPeriod="120 seconds" >
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
</encoder>
</appender>
<logger name="net.spy" level="WARN"/>
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
To compile and run perf tests do:
mvn clean test-compile assembly:single
$JAVA_HOME/bin/java -jar target/performancetests-test-jar-with-dependencies.jar
Example output:
Benchmark Mode Cnt Score Error Units
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest thrpt 40 48.359 ± 5.699 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTestLargeValue thrpt 40 47.423 ± 16.481 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTest thrpt 40 46.821 ± 8.525 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTestLargeValue thrpt 40 38.507 ± 10.402 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTest thrpt 40 48.549 ± 10.593 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTestLargeValue thrpt 40 32.091 ± 8.526 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyNoKeyHashingJenkinsTest thrpt 40 54.154 ± 9.703 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyNoKeyHashingJenkinsTestLargeValue thrpt 40 51.703 ± 24.600 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applySHA256HashingJenkinsTest thrpt 40 42.654 ± 1.257 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applySHA256HashingJenkinsTestLargeValue thrpt 40 31.011 ± 6.156 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTest thrpt 40 47.783 ± 7.174 ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTestLargeValue thrpt 40 45.857 ± 13.562 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss thrpt 40 72.708 ± 2.179 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss thrpt 40 162.476 ± 3.815 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress thrpt 40 105.303 ± 2.542 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress thrpt 40 173.413 ± 6.217 ops/ms