Skip to content

Latest commit

 

History

History
1109 lines (810 loc) · 49.7 KB

README.asciidoc

File metadata and controls

1109 lines (810 loc) · 49.7 KB

HerdCache

Overview

The cache borrows heavily from the concepts, and implementation of caching in spray-caching

The idea here being that the cache stores a future rather than the value, rather than values of a given generic type. The benefit of this approach is that is that is has the advantage, that it nicely takes care of thundering herds/cache miss storm issue. Which is where many requests to a particular cache key (e.g. a resource URI) arrive before the first one could be completed. The result being that the cache key is requested ( the processing associated with calculating the value is performed ) multiple times. As a result the backend resource is put under undue load.

Storing a future in the cache means that multiple resources for a missing cache value that hasn’t been calculated, wait on the same future that is executing for the initial/first request for the cache key.


Usage

The below details how to use the caching implementations, the differences between them.

Dependency

<dependency>
  <groupId>org.greencheek.caching</groupId>
  <artifactId>herdcache</artifactId>
  <version>1.0.10</version>
</dependency>

Please note that 0.1.0 is not backwards compatible with 1.0.1. 1.0.1 extends the Cache interface to include a couple of get methods. Therefore, introduction a breaking change with the old api.

Cache Types

There are currently two types of Cache that implement the Cache<V> interface:

  • SimpleLastRecentlyUsedCache

  • ExpiringLastRecentlyUsedCache

And two types of Cache that implement the CacheWithExpiry<V> that extends upon the Cache<V> interface:

  • SpyMemcachedCache<V>

  • ElastiCacheMemcachedCache<V>

Cache Interface

The cache used to have a single method apply that takes:

The method is: ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)

The returned value is that of Guava’s ListenableFuture, upon which you can attach a callback, or wait for a value to be generated:

The cache now includes two additional methods:

  • public ListenableFuture<V> get(String key)

  • public ListenableFuture<V> get(String key,ListeningExecutorService executorService)

Both methods lookup a value in the cache that is associated with a value. The difference between the get and the apply, is that the apply can generate the value, whilst the get only looks up in the cache.

Both get methods lookup a cache value, always returning a Guava’s ListenableFuture

The below shows a couple of examples of working with the returned ListenableFuture.

  • Adding a callback:

// Executes on the calling thread
Futures.addCallback(future,new FutureCallback<String>() {
                        @Override
                        public void onSuccess(String result) {

                        }

                        @Override
                        public void onFailure(Throwable t) {

                        }
                   });


// Executes on the passing in executor thread pool
private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Futures.addCallback(val,new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {

            }

            @Override
            public void onFailure(Throwable t) {

            }
},executorService);
  • Waiting for the value (or failure)

        try {
            future.get();
        } catch (InterruptedException e) {

        } catch (ExecutionException e) {

        }

SimpleLastRecentlyUsedCache

This cache uses a ConcurrentLinkedHashMap to store a maximum number of values in the cache. Once the cache hit the maximum number of values, the key that has been Last recently used is removed

Examples

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Cache<String> cache = new SimpleLastRecentlyUsedCache<>();

ListenableFuture<String> val = cache.apply("Key1",
                                           () -> {
                                                try {
                                                    Thread.sleep(1000);
                                                } catch (InterruptedException e) {
                                                    e.printStackTrace();
                                                }
                                                return "key1";
                                           },
                                           executorService);

The SimpleLastRecentlyUsedCache has no expiry on the items in the cache. It is just limited by the number of items in the cache and the item that has been last recently used.

This can be seen in the following example:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Cache<String> cache = new SimpleLastRecentlyUsedCache<>();

ListenableFuture<String> val = cache.apply("Key1", () -> {
  try {
     Thread.sleep(1000);
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return "key1";
}, executorService);


ListenableFuture<String> val2 = cache.apply("key2", () -> {
  try {
     Thread.sleep(500);
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return "key2";
}, executorService);


ListenableFuture<String> val3 = cache.apply("key3", () -> {
  try {
     Thread.sleep(500);
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return "key3";
}, executorService);

ListenableFuture<String> val4 = cache.apply("key1", () -> {
  try {
    Thread.sleep(500);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "key_new";
}, executorService);


assertEquals("Value should be key1","key1",this.awaitForFutureOrElse(val, null));
assertEquals("Value should be key2","key2",this.awaitForFutureOrElse(val2, null));
assertEquals("Value should be key3","key3",this.awaitForFutureOrElse(val3, null));

// Key1 will no longer be in the cache, only key2 and key3
assertEquals("Value should be key1","key_new",this.awaitForFutureOrElse(val4, null));

ExpiringLastRecentlyUsedCache

The difference between SimpleLastRecentlyUsedCache and ExpiringLastRecentlyUsedCache is that the later has a default time to live for the elements that are put in the cache, and also; if desired, a time to idle for the items.

The timeToLive and timeToIdle are supplied to the constructor of the cache:


Using Only Time to Live

Example, of creating a cache for which the item will live for 1 minute, regardless of when they were last used:

new ExpiringLastRecentlyUsedCache<>(10,60,0, TimeUnit.SECONDS));

Using Time to Live, and Time to Idle

Example, of creating a cache for which the item will live for 1 minute, but have to be used within the last 30 SECONDS

new ExpiringLastRecentlyUsedCache<>(10,60,30, TimeUnit.SECONDS));

Waiting on futures

The Cache<V> interface inherits a Utility interface (AwaitOnFuture<V>) that gives you a couple of utility methods that allow you to wait on futures, for a value to be calculated

  • V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)

  • V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)

Wait on future, with fallback value incase of exception

A the value returned back from a cache apply is that of a ListenableFuture. You can naturally wait on the currently executing thread (blocking that thread), for a value to be returned. This is as follows:

try {
   return future.get();
} catch (Exception e) {
   return somefallback;
}

The method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue), remove the ceremony of the try/catch block for you.

The other method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit) allows you wait a finite amount of time for a value to be returned. The amount of time elapsed, the onTimeoutValue is going to be returned. Any other exception results in the onExceptionValue being thrown.


CacheWithExpiry

There are two implementations of the CacheWithExpiry<V> interface:

  • SpyMemcachedCache<V>

  • ElastiCacheMemcachedCache<V>

The second implementation ElastiCacheMemcachedCache<V> is an extension of the SpyMemcachedCache<V> implementation for working with Amazon AWS’s memcached support (known as ElastiCache).

The CacheWithExpiry<V> contains the method:

public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService);

The difference between this method and the apply that is available in the Cache<V> interface, is the addition of the Duration parameter. Meaning that keys can have differing cache expiry times (memcached supports this).


Defaults

Both the following cache classes use the following defaults.

The ElastiCacheCacheConfigBuilder extends the abstract class MemcachedCacheConfigBuilder which contains the defaults for which the SpyMemcachedCache<V> will execute. The builder allows you to override the defaults:

The following defaults are for both memcached and ElastiCache memcached

Method Default Description

setTimeToLive

Duration.ofSeconds(60);

The default expiry time an item with be given if not specified

setMaxCapacity

1000;

Max number of futures to internal cache whilst a value is being calculated

setMemcachedHosts

"localhost:11211";

Comma separated host list

setHashingType

ConnectionFactoryBuilder.Locator.CONSISTENT;

Using consistent hashing, don’t change

setFailureMode

FailureMode.Redistribute;

When an error occurs, what should occur (FailureMode.Retry may suit you better for this)

setHashAlgorithm

DefaultHashAlgorithm.KETAMA_HASH;

Type of consistent hashing to be used for calculating the memcached node to talk to, don’t change

serializingTranscoder

new FastSerializingTranscoder();

The type of serializer to be used. Class responsbile for serialising java objects to a byte stream to store in memcached

protocol

ConnectionFactoryBuilder.Protocol.BINARY;

the protocol used for talking to memcached

readBufferSize

DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;

default socket buffer size when talking to memcached, do not change

memcachedGetTimeout

Duration.ofMillis(2500);

when looking in memcached for a matching key, this is the amount of time to wait before timing out

dnsConnectionTimeout

Duration.ofSeconds(3);

When resolving the memcachedHosts to ip addresses, the amount of time to wait for dns lookup, before ignoring that node

waitForMemcachedSet

false

Wait for the write to memcached to occur before removing future from internal cache

setWaitDuration

Duration.ofSeconds(2);

amount of time to wait for the memcached set

keyHashType

KeyHashingType.JAVA_XXHASH;

how the cache key is hashed

keyPrefix

Optional.empty()

should the key used in lookup, be prefixed with a string to avoid the unlikely event of a key claash.

asciiOnlyKeys

false;

we only have ascii keys that will be stored in the cache

hostStringParser

new CommaSeparatedHostAndPortStringParser();

do not change

hostResolver

new AddressByNameHostResolver();

do not change

useStaleCache

false;

whether stale caching is enabled

staleCacheAdditionalTimeToLive

Duration.ZERO;

The amount of time extra that items will be stored in the stale cached

staleCachePrefix

"stale";

The prefix for stale keys, to avoid clash

staleMaxCapacity

-1;

The size of the cache for futures for the stae cache is the same as the maxCapacity if -1

staleCacheMemachedGetTimeout

Duration.ZERO

Time to wait for lookups against the stale cache

removeFutureFromInternalCacheBeforeSettingValue

false;

When the Supplier<V> computation is completed the future is set with the computed value, and removed from the internal cache. This is whether (false) set the future to complete, before removal for internal future cache. Or (true), remove the future from map firts and then set the future value

The following default apply just to that of ElastiCache memcached

Method

Default

Description

setElastiCacheConfigHosts

"localhost:11211";

The memcached elasticache config host name i.e. yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211

setConfigPollingTime

Duration.ofSeconds(60);

The frequency by which to contact the config host for potential updates to the memcached nodes

setInitialConfigPollingDelay

Duration.ZERO;

The time for the initial poll to the config host to obtain the memcached nodes

setConnectionTimeoutInMillis

Duration.ofMillis(3000);

The time for establishing a connection to the config host before stopping and retrying

setIdleReadTimeout

Duration.ofSeconds(125);

If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout.

setReconnectDelay

Duration.ofSeconds(5);

The delay between performing a reconnection attempt to the config host

setDelayBeforeClientClose

Duration.ofSeconds(300);

When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use

setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect

3

If the config host returns invalid config this number of times in a row, a reconnection will be made

setUpdateConfigVersionOnDnsTimeout

true;

Set to false, if you don’t want to acknowledge a config update, if a dns resolution for any of the memcached nodes failed


SpyMemcachedCache

The SpyMemcachedCache<V> implementation uses the spy memcached java library to communicate with memcached. The implementation is similar to that of SimpleLastRecentlyUsedCache in that it uses a ConcurrentLinkedHashMap to store the cache key against an executing future.

When two requests come for the same key, the future is stored in an internal ConcurrentLinkedHashMap:

store.putIfAbsent(keyString, future)

If a subsequent request comes in for the same key, and the future has not completed yet, the existing future in the ConcurrentLinkedHashMap is returned to the caller. This way two requests wait on the same executing Supplier<V> computation

When constructing the SpyMemcachedCache, you can specify the max size of the internal ConcurrentLinkedHash that is used to store the concurrently executing futures.

Unlike the SimpleLastRecentlyUsedCache implementation, that stores the Completed futures in the ConcurrentLinkedHash for subsequent cache hits to obtain the completed future’s value, the SpyMemcachedCache<V> cache removes the key and associated future from the internal ConcurrentLinkedHash. The value of the completed future is instead stored in memcached for subsequent retrieval.

Before the Supplier<V> computation is submitted to the passed executor for execution, the memcached cluster is checked for the existance of a value for the given key. If a value is present in memcached, the returned future will be set with the obtained value. This means that if two request comes in for the same key, for which a value is present in memcached they will wait on the same future to have it’s value set to that of the memcached cache hit.

If a value does not exist in the memcached, then the given Supplier<V> computation is submitted to the provided executor, for execution. Once the value has been calculated, it is sent over the network to memcached for storage.

With this library the value is stored asynchronously in memcached, and the future completed with the computed value and sub-sequentially the future is removed from the ConcurrentLinkedHashMap. Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.

It is possible when constructing the SpyMemcachedCache to specify to a period of time (i.e. make the asynchronous set into memcached call semi synchronous) to wait for the set to occur.

The SpyMemcachedCache is created by passing a MemcachedCacheConfig. A MemcachedCacheConfig is created via that of a ElastiCacheCacheConfigBuilder that contains the method public MemcachedCacheConfig buildMemcachedConfig() that build the CacheConfig for both the ElastiCacheMemcachedCache and the SpyMemcachedCache

The following show various ways of configuring the cache:


Constructing the SpyMemcachedCache

        cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:11211")
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .buildMemcachedConfig()
        );

        ListenableFuture<String> val = cache.apply("Key1", () -> {
            return "value1";
        }, Duration.ofSeconds(3), executorService);

        assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

Specifying the Memcached hosts

By default the host string is localhost:11211, however, you can specify a number of hosts to connect to by specifying them as a comma separated string in the Builder:

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
                         .buildMemcachedConfig()
                 );

When the SpyMemcachedCache is passed the list of memcached hosts, the ip address for host needs to be resolved. By default 3 seconds, per host, is waited for to obtain the ip address. This can be controlled, like as follows:

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
                         .setDnsConnectionTimeout(Duration.ofSeconds(2))
                         .buildMemcachedConfig()
                 );

Specifying the Expiry of Items in memcached

There are two ways to specify the Expiry of items that are stored in memcached:

  • A global Time To Live for the items

  • Passing the Time To Live for cached item in the apply method

The below for example will set a default of 30 seconds for all items saved in the cache, for which a TimeToLive has not been specified:

ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, executorService);

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(30))
                         .buildMemcachedConfig()
                 );

To specify the TTL on a per time basis, specify the Duration when calling the apply method:

ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, Duration.ofSeconds(10), executorService);


Setting Wait for memcached Set

When an item is not in the cache, or currently being calculated; the cache will execute the Supplier<V> computation, and store the returned value in memcached. A future has been created and stored in the internal future calculation cache, so that any requests for the same key, wait on the completion of the same future.

With this library the computed cache value is stored asynchronously in memcached, and the future completed with the same value. The future is completed, and removed from the internal future calculation cache ( ConcurrentLinkedHashMap ). Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.

As a result, you can request that the write to memcached be synchronous and a finite period be waited for, for the write to take place. This is done a constructor time, as shown in the following which waits a max of 3 seconds for the set to occur.

         cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(60))
                         .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                         .setWaitForMemcachedSet(true)
                         .setSetWaitDuration(Duration.ofSeconds(3))
                         .buildMemcachedConfig()
         );

         ListenableFuture<String> val = cache.apply("Key1", () -> {
             return "value1";
         }, Duration.ofSeconds(3), executorService);

         assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

No Expiry

Items in the cache can have no expiry TTL apply by specifying the duration as ZERO

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(60))
                         .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                         .setWaitForMemcachedSet(true)
                         .setSetWaitDuration(Duration.ofSeconds(3))
                         .buildMemcachedConfig()
         );

         ListenableFuture<String> val = cache.apply("Key1", () -> {return "value1";}, Duration.ZERO, executorService);

         assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

Cache Key

The cache key has to be a string. Memcached has a requirement for makeup of keys, when using the TEXT protocol, such that your key object must conform to the following requirements.

  • Needs to be a string

  • cannot contain ' '(space), '\r'(return), '\n'(linefeed)

If you are using the BINARY protocol these requirements do not apply. However, you may wish to perform hashing of the string representing the key to allow for any character to be used. The cache has the ability for a couple of hash representations of the key:

  • NONE,

  • NATIVE_XXHASH,

  • JAVA_XXHASH,

  • MD5_UPPER,

  • SHA256_UPPER,

  • MD5_LOWER,

  • SHA256_LOWER

To use either of these you need to specify the hashing method to be used at cache construction time. For the best performance, XXHash is recommended:

cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .setKeyHashType(KeyHashingType.MD5_LOWER)
                        .buildMemcachedConfig()
        );

Cache Key Prefix

When hashing a key, there is a potential for two different Strings to actually end up with the same Hashed value. As a result you can add a cache prefix to the cache at construction.

The below specifies a cache prefix of article. This will be prepended to the hashed cache key, the method setHashKeyPrefix(false) means that the prefix will be added after the cache key has be hashed. setting setHashKeyPrefix(true) to true means that the prefix will be prepended to the cache key, and then the hashing will take place. This is the default, as the prefix has the potential to break the TEXT protocol key requirements (Hashing the key makes sure this does not occur).

  cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .setKeyHashType(KeyHashingType.MD5_LOWER)
                        .setKeyPrefix(Optional.of("article"))
                        .setHashKeyPrefix(false)
                        .buildMemcachedConfig()
        );

Chosing Not To Cache

Since 1.0.6 the client has the following method:

    public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,
                                     Predicate<V> canCacheValueEvalutor);

And The CacheWithExpiry<V> contains the method:

    public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive,
                                     ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor);

These methods allow you to pass a Predicate<V> that you can use to evaluate whether the value returned from the Supplier<V> (the function generating the value to cache), should actually be stored in memcached, etc. This can be useful in situtations where the Supplier<V> is lets say a HystrixCommand object, how value has on this occasion been generated by it’s fallback. The Predicate<V> could wrap the command object an evaluate if the value was from the fallback and choose not to cache:

    apply("webservicecallx",() -> command.execute(),
          (cachevalue) -> {
                    return !command.isResponseFromFallback();
          }
         )

Stale Caching

Since 1.0.1 the client supports a stale caching mechanism; this by default is not enabled as it requires an additional future (via composition) to perform the additional cache lookup. It is also an addition lookup on the memcached server, and also will use x2 the memory (items are stored twice in the cache). Enabling the stale caching feature is done via the .setUseStaleCache(true) method.

The stale caching function is a mini "stale-while-revalidate" mechanism. Without the stale caching enabled, when an item expires in the cache, which is popular; then a lot of requests will be waiting on the cache item to be regenerated from the backend. This means you can have a spike in a larger than you would like requests.

With stale caching enabled, only one request will regenerate the item from the backend cache. The other requests will use a stale cache. The stale cached is ONLY checked if a future exists in the internal cache, meaning that a backend request is in operation to calculate the cache item

With stale caching enabled when an item is stored in memcached, it is stored twice. The 2nd time it is stored under a different key. This key is made up of the hashed cache key, and the stale cache key prefix set via the constructor method .setStaleCachePrefix("staleprefix"). The default value is that of stale.

The item is stored, by default for setTimeToLive longer than the original cache item. To provide a value of your own, say 10 minutes extra, you can specify this at construction time:

        cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(1))
                        .setUseStaleCache(true)
                        .setStaleCacheAdditionalTimeToLive(Duration.ofMinutes(10))
                        .setStaleCachePrefix("staleprefix")
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .buildMemcachedConfig()
        );

Stale Caching is available in both SpyMemcachedCache and ElastiCacheMemcachedCache


Metrics

Metric are available in both SpyMemcachedCache and ElastiCacheMemcachedCache as of version 1.0.11. The configuration builder takes an option .setMetricsRecorder(..)' This takes an implementation of org.greencheek.caching.herdcache.memcached.metrics.MetricsRecorder. The default implementation being a `NoOpMetricRecorder. The other implementation is that of the new YammerMetricsRecorder(registry) which uses the yammer metrics library (https://dropwizard.github.io/metrics).

With the YammerMetricsRecorder the following metrics are placed inside the Metrics library:

Method Description

value_calculation_cache_hitrate

The cache hits per second on the internal future cache

value_calculation_cache_missrate

The cache misses per second on the internal future cache

value_calculation_cache_hitcount

The cache hits in total on the internal future cache

value_calculation_cache_misscount

The cache misses in total on the internal future cache

value_calculation_success_count

The number of successful runs of the Supplier<T> function

value_calculation_failure_count

The number of failed runs of the Supplier<T> function

value_calculation_time_timer

The time it has taken to execute the Supplier<T> function

distributed_cache_hitrate

The cache hits per second on the distributed cache (i.e. memcached)

distributed_cache_missrate

The cache misses per second on the distributed cache (i.e. memcached)

distributed_cache_timer

The time it takes to lookup a value from the distributed cache

distributed_cache_count

The number of lookups in the distributed cache that have been performed

distributed_cache_hitcount

The cache hits in total on the distributed cache

distributed_cache_misscount

The cache misses in total on the distributed cache

distributed_cache_writes_count

The writes performed on the distributed cache

stale_distributed_cache_timer

The time it takes to lookup a stale value from the distributed cache

stale_distributed_cache_hitrate

The stale cache hits per second on the distributed cache (i.e. memcached)

stale_distributed_cache_missrate

The stale cache misses per second on the distributed cache (i.e. memcached)

stale_distributed_cache_count

The stale hits performed on the distributed cache (i.e. memcached)

stale_distributed_cache_hitcount

The stale cache hits in total on the distributed cache

stale_distributed_cache_misscount

The stale cache misses in total on the distributed cache

stale_value_calculation_cache_misscount

The cache misses in total on the internal future cache for a stale value

stale_value_calculation_cache_hitcount

The cache hits in total on the internal future cache for a stale value

stale_value_calculation_cache_missrate

The cache misses per second on the internal future cache for stale value

stale_value_calculation_cache_hitrate

The cache hits per second on the internal future cache for stale value


ElastiCacheMemcachedCache (AWS ElastiCache Support)

Since release 1.0.1 there has been support AWS’s ElasticCache memcached cluster:

This is done by creating an instance of ElastiCacheMemcachedCache<V> rather than SpyMemcachedCache<V>. An example is as follows:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
                            .setConfigPollingTime(Duration.ofSeconds(10))
                            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
                            .setTimeToLive(Duration.ofSeconds(2))
                            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                            .setWaitForMemcachedSet(true)
                            .setDelayBeforeClientClose(Duration.ofSeconds(1))
                            .setDnsConnectionTimeout(Duration.ofSeconds(2))
                            .setUseStaleCache(true)
                            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
                            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
                            .buildElastiCacheMemcachedConfig()
            );

Setting ElastiCache Hosts

The ElastiCache cache works by using the auto discovery mechanism as described here, through that of a ElastiCache Configuration Endpoint, which is described here:

You supply to the ElastiCacheMemcachedCache<V> cache the url of the ElastiCache Configuration Endpoint. The ElastiCache cache uses the netty library (http://netty.io/) to periodically send the config get cluster command to the ElastiCache Configuration Endpoint. The ElastiCache keeps a persistent connection open to the ElastiCache Configuration Endpoint, sending the command periodically. The ElastiCache Configuration Endpoint returns a configuration similar to the following, that details the actually memcached instances that should be connected to:

    CONFIG cluster 0 147
    12
    myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211

    END

When the version number (the second line) increases a new spy memcached instance is created, and the old spy memcached instance is scheduled for being closed. The ElastiCacheMemcachedCache<V> continuously polls the ElastiCache Configuration Endpoint, for any changes in the number of memcached hosts, or the hosts that are available.

The ElastiCache Configuration Endpoint is specified via the setElastiCacheConfigHosts(String config) method on the ElastiCacheCacheConfigBuilder object:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")

For the moment you should only specify 1 configuration host. Currently a cache cluster is only in one Availability Zone. A cluster cannot at the moment in AWS span multiple Availability Zones. You can have 3 separate elasticache clusters, one in each availability zone, but the cache will only connect to 1 availability zone at any one time.


Specifying the polling time

By default the ElastiCache cache polls the ElastiCache Configuration Endpoint for an update to the nodes that make up the cluster every 60 seconds. This can be configured via the following methods:

  • .setConfigPollingTime(Duration.ofSeconds(10))

  • .setInitialConfigPollingDelay(Duration.ofSeconds(0))

This can be seen in the following example:

private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
            new ElastiCacheCacheConfigBuilder()
            .setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
            .setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
            .setTimeToLive(Duration.ofSeconds(10))
            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
            .setWaitForMemcachedSet(true)
            .setDelayBeforeClientClose(Duration.ofSeconds(1))
            .setDnsConnectionTimeout(Duration.ofSeconds(2))
            .setUseStaleCache(true)
            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
            .buildElastiCacheMemcachedConfig());

Persistent Connection to ElastiCache Configuration Endpoint

The ElastiCache uses a persistent connection to the ElastiCache Configuration Endpoint. If the connection is lost, the client will automatically reconnect. The client will wait for a period (default 5 seconds) before reconnecting. This can be changed by specifying the delay via the method: .setReconnectDelay(Duration.ofSeconds(10)).

If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout. This is set to 125 seconds by default. If you modify this setting you SHOULD NOT set it lower that the polling duration; as you will just end up in the persistent connection not being persistent.

.setReconnectDelay(Duration.ofSeconds(5)) .setIdleReadTimeout(Duration.ofSeconds(125))

If the ElastiCache Configuration Endpoint is in some way returning invalid configurations, then the client will reconnect to the Configuration Endpoint. By default it takes 3 consecutive invalid configurations before the client will reconnect. This is controlled by the method: setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(int number)

Cluster nodes update and closing old SpyMemcached client

When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use (i.e. network requests may still be executing). By default the delay is 10 second; this can be change by specifying the following config builder method .setDelayBeforeClientClose(Duration.ofSeconds(1))

ElastiCache Configuration Endpoint timeout

By default the client will wait for 3 seconds for a connection to the ElastiCache Configuration Endpoint. This can be changed by the following following config builder method .setConnectionTimeoutInMillis(Duration.ofSeconds(2))

Host lookup

When the ElastiCache Configuration Endpoint returns the configuration information it returns the hostname, and it may send with it the IP address.

    CONFIG cluster 0 147
    12
    myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211

    END

If the IP address is not returned the client will perform a DNS lookup on the hostname. By default the timeout is 3 seconds. This can be changed with the builder method .setDnsConnectionTimeout(Duration.ofSeconds(2))

If a DNS lookup fails, due to connection timeout (or a temporary network issue), or otherwise. By default that host will be excluded from the list of memcached hosts the cache client will be connected to. As a result, this will not change unless you update the cluster configuration and a new version of the config is supplied by the ElastiCache Configuration Endpoint.

A builder configuration property .setUpdateConfigVersionOnDnsTimeout(true) allows you to change this behaviour when a host is not resolved to an IP. The resolution of a host’s dns may be a temporary issue, and on the next polling config the dns will be resolvable. If you set the builder property to false .setUpdateConfigVersionOnDnsTimeout(false)

Then the memcached client will be updated to point to the hosts mentioned in the config; but if any host resolution fails; the client will not record the current configuration’s version number. Meaning on the next poll for the current cluster configuration, the memcached client will again be recreated, to point to the hosts specified in configuration.

Note if the dns resolution is constantly failing then client memcached client will constantly be re-created each time the configuration polling occurs. No validation of the previously resolved hosts, and the current resolved hosts is performed. The client will just be recreated.


ElastiCache Configuration Url Endpoint update

This feature is available in 1.0.9 and above.

As previous discussed above, when you create an ElastiCache cache you provide the url to the configuration endpoint:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")

It is possible that you might wish to create another cluster, with a different machine type, and switch the ElastiCache cache over at runtime to the new cluster. For example, you are moving the cache types to faster cpu type machines.

It is possible to do this by constructing a SimpleVolatileBasedElastiCacheConfigServerUpdater and passing that to the builder:

ElastiCacheConfigServerUpdater configUrlUpdater = new SimpleVolatileBasedElastiCacheConfigServerUpdater()

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
                            .setConfigUrlUpdater(configUrlUpdater)
                            .buildElastiCacheMemcachedConfig())

You would then code a JMX hook, or admin REST endpoint in your application that called the method: connectionUpdated(String url) to inform the cache that the configuration url has changed, and that it should connect to the new url to obtain the new list of cache cluster nodes.

configUrlUpdater.connectionUpdated("yourcluster.irujgk.0001.euw1.cache.amazonaws.com:11211")

The cache will connect to the new config cluster endpoint and obtain the cache cluster nodes. The cache will wait for setReconnectDelay(Duration.ofSeconds(5)) (5 seconds is the default), before attempting the connection to the new cluster config endpoint. You can reduce or increase this timeout.


Example Simple ElastiCache Test Class

The below is a simple java main class the can be run on the command line like the following. The below generates a random integer between 1 and 1000, and applys that value to the cache.

java -DmaxRand=1000 -Dmillis=500 -Dhosts=herdtesting.jgkygp.cfg.euw1.cache.amazonaws.com:11211 -jar herd-elastitest-0.1.0-SNAPSHOT-relocated-shade.jar

With the given logback.xml, you would have output on the console that will show if I can hit or not occurred:

71671 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "-778756949", "cachetype" : "distributed_cache"}
71671 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 633
71680 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "274176478", "cachetype" : "distributed_cache"}
71680 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 35
71690 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachemiss" : "65783974", "cachetype" : "distributed_cache"}
71690 [pool-1-thread-1] DEBUG o.g.c.h.m.BaseMemcachedCache - set requested for 65783974
71691 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 107
package org.greencheek.caching.elasticache;


import com.google.common.util.concurrent.MoreExecutors;
import net.spy.memcached.ConnectionFactoryBuilder;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import org.greencheek.caching.herdcache.memcached.ElastiCacheMemcachedCache;
import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
 *
 */
public class ElastiCacheTest {

  private static final ScheduledExecutorService service  = Executors.newSingleThreadScheduledExecutor();
  private static final Logger logger = LoggerFactory.getLogger("ElastiCacheTest");


  private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
            new ElastiCacheCacheConfigBuilder()
            .setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
            .setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
            .setTimeToLive(Duration.ofSeconds(10))
            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
            .setWaitForMemcachedSet(true)
            .setDelayBeforeClientClose(Duration.ofSeconds(1))
            .setDnsConnectionTimeout(Duration.ofSeconds(2))
            .setUseStaleCache(true)
            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
            .buildElastiCacheMemcachedConfig());

  public static void main(String[] args) {
      service.scheduleAtFixedRate(()-> {
              int i = randInt(Integer.getInteger("minRand",1),Integer.getInteger("maxRand",2));
              logger.info("Adding cache value : {}",cache.awaitForFutureOrElse(
                      cache.apply(""+i,() -> { return i; },
                        MoreExecutors.sameThreadExecutor()
                      ),
                      MoreExecutors.sameThreadExecutor()),"null");
      }
      ,0,Integer.getInteger("millis",1000),TimeUnit.MILLISECONDS);

  }



  public static int randInt(int min,int max) {

    // NOTE: Usually this should be a field rather than a method
    // variable so that it is not re-seeded every call.
      Random rand = new Random();

    // nextInt is normally exclusive of the top value,
    // so add 1 to make it inclusive
    return rand.nextInt((max - min) + 1) + min;
  }
}


<configuration scan="true" scanPeriod="120 seconds" >
    <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
        <resetJUL>true</resetJUL>
    </contextListener>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
        </encoder>
    </appender>


    <logger name="net.spy" level="WARN"/>

    <root level="DEBUG">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Internal Notes

To compile and run perf tests do:

mvn clean test-compile assembly:single
$JAVA_HOME/bin/java -jar target/performancetests-test-jar-with-dependencies.jar

Example output:

Benchmark                                                                                  Mode  Cnt    Score    Error   Units
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest            thrpt   40   48.359 ±  5.699  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyDefaultKetamaHashAlgoTestLargeValue  thrpt   40   47.423 ± 16.481  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTest                           thrpt   40   46.821 ±  8.525  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyFolsomTestLargeValue                 thrpt   40   38.507 ± 10.402  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTest                  thrpt   40   48.549 ± 10.593  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyJenkinsHashAlgoTestLargeValue        thrpt   40   32.091 ±  8.526  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyNoKeyHashingJenkinsTest              thrpt   40   54.154 ±  9.703  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyNoKeyHashingJenkinsTestLargeValue    thrpt   40   51.703 ± 24.600  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applySHA256HashingJenkinsTest             thrpt   40   42.654 ±  1.257  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applySHA256HashingJenkinsTestLargeValue   thrpt   40   31.011 ±  6.156  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTest                       thrpt   40   47.783 ±  7.174  ops/ms
o.g.c.h.p.b.cachetests.get.PerfTestApplyCommand.applyXXHashAlgoTestLargeValue             thrpt   40   45.857 ± 13.562  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss                                      thrpt   40   72.708 ±  2.179  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss                                    thrpt   40  162.476 ±  3.815  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress                                     thrpt   40  105.303 ±  2.542  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress                                   thrpt   40  173.413 ±  6.217  ops/ms