From c1f55634a8c17a1cedc90ec37718cd6a2299bd7d Mon Sep 17 00:00:00 2001 From: dominictootell Date: Tue, 10 Oct 2017 00:15:12 +0100 Subject: [PATCH] Cache and log rejected execution when writing to cache --- .../BaseObservableMemcachedCache.java | 48 +++----- .../TestObservableSettingCacheValues.java | 94 +++++++++++++-- .../herdcache/util/EmptyBlockingQueue.java | 112 ++++++++++++++++++ 3 files changed, 211 insertions(+), 43 deletions(-) create mode 100644 src/test/java/org/greencheek/caching/herdcache/util/EmptyBlockingQueue.java diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java index bad69c4..cb0ebab 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java @@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory; import rx.Single; import rx.SingleSubscriber; +import rx.functions.Action1; +import rx.functions.Actions; import rx.schedulers.Schedulers; import java.io.Serializable; @@ -57,7 +59,6 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa private final CacheRead cacheReader; private static final Logger logger = LoggerFactory.getLogger(BaseObservableMemcachedCache.class); - private final MemcachedCacheConfig config; private final MemcachedClientFactory clientFactory; private final ConcurrentMap>> store; @@ -72,8 +73,6 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa private final long millisToWaitForDelete; private final boolean waitForMemcachedSet; - - public BaseObservableMemcachedCache( MemcachedClientFactory clientFactory, MemcachedCacheConfig config) { @@ -113,6 +112,14 @@ private ConcurrentMap createInternalCache(boolean createCache, } + public static final void logMemcachedWriteError(Throwable t) { + if(t instanceof RejectedExecutionException) { + logger.warn("Scheduler rejected execution.",t); + } + else { + logger.warn("Unexpected Exception occurred during memcached write on Scheduler.",t); + } + } private void warnCacheDisabled() { logger.warn("Cache is disabled"); @@ -309,44 +316,21 @@ private void notifySubscriberAndWriteToCache(ReferencedClient client, CacheItem cacheItem = new CacheItem<>(keyString,results,false); if(waitForMemcachedSet) { - - try { writeToCache(client, cacheItem, keyString, DurationToSeconds.getSeconds(timeToLive), canCacheValueEvalutor) .subscribeOn(Schedulers.immediate()) - .subscribe(); - } - - catch (RejectedExecutionException e) { - logger.warn("Scheduler rejected execution", e); - } - - catch (Exception e) { - logger.error("Some wholly unforeseen error condition occurred," + - " chomping it so your cache can continue operating", e); - } + .subscribe(Actions.empty(),BaseObservableMemcachedCache::logMemcachedWriteError); notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store); } else { notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store); - try { - writeToCache(client, cacheItem, keyString, - DurationToSeconds.getSeconds(timeToLive), - canCacheValueEvalutor) - .subscribeOn(config.getWaitForMemcachedSetRxScheduler()) - .subscribe(); - } - - catch (RejectedExecutionException e) { - logger.warn("Scheduler rejected execution", e); - } - - catch (Exception e) { - logger.error("Some wholly unforeseen error condition occurred," + - " chomping it so your cache can continue operating", e); - } + writeToCache(client, cacheItem, keyString, + DurationToSeconds.getSeconds(timeToLive), + canCacheValueEvalutor) + .subscribeOn(config.getWaitForMemcachedSetRxScheduler()) + .subscribe(Actions.empty(),BaseObservableMemcachedCache::logMemcachedWriteError); } } diff --git a/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java b/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java index da315f9..d0bab11 100644 --- a/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java +++ b/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java @@ -11,19 +11,22 @@ import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder; import org.greencheek.caching.herdcache.memcached.util.MemcachedDaemonFactory; import org.greencheek.caching.herdcache.memcached.util.MemcachedDaemonWrapper; +import org.greencheek.caching.herdcache.util.EmptyBlockingQueue; import org.junit.After; import org.junit.Before; import org.junit.Test; import rx.Scheduler; import rx.Single; import rx.functions.Action1; +import rx.plugins.RxJavaHooks; import rx.schedulers.Schedulers; import java.time.Duration; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; +import java.util.Queue; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -539,30 +542,99 @@ public void testErrorSubscriber() { } @Test - public void testErrorScheduler() { + public void testErrorOnMemcachedWriteProvidedScheduler() { + + + ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new EmptyBlockingQueue(), + new ThreadPoolExecutor.AbortPolicy()); cache = new SpyObservableMemcachedCache<>( new ElastiCacheCacheConfigBuilder() .setMemcachedHosts("localhost:" + memcached.getPort()) - .setWaitForMemcachedSetRxScheduler(new Scheduler() { - @Override - public Worker createWorker() { - throw new RuntimeException(); - } - }) + .setWaitForMemcachedSetRxScheduler(Schedulers.from(p)) .setWaitForMemcachedSet(false) .setTimeToLive(Duration.ofSeconds(10)) .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT) - .setWaitForMemcachedSet(false) .setKeyValidationType(KeyValidationType.ALWAYS) .buildMemcachedConfig() ); - CacheItem val = cache.set("Key1", "value1",Duration.ZERO).toBlocking().value(); + AtomicBoolean keepRunning = new AtomicBoolean(true); + + p.execute(new Runnable() { + @Override + public void run() { + while(keepRunning.get()){ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + CacheItem val = cache.set("Key1", "value1", Duration.ZERO).toBlocking().value(); + keepRunning.set(false); // No need to assert anything, an unhandled scheduling exception will fail this test for us :) } + @Test + public void testErrorOnMemcachedWriteScheduler() { + + cache = new SpyObservableMemcachedCache<>( + new ElastiCacheCacheConfigBuilder() + .setMemcachedHosts("localhost:" + memcached.getPort()) + .setWaitForMemcachedSet(false) + .setTimeToLive(Duration.ofSeconds(10)) + .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT) + .setKeyValidationType(KeyValidationType.ALWAYS) + .buildMemcachedConfig() + ); + + ThreadPoolExecutor ex = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new EmptyBlockingQueue(), + new ThreadPoolExecutor.AbortPolicy()); + + Scheduler sh = Schedulers.from(ex); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger error = new AtomicInteger(0); + + RxJavaHooks.setOnError((t) -> { + latch.countDown(); + error.incrementAndGet(); + }); + + cache.set("Key1", "value1",Duration.ZERO).subscribeOn(sh).observeOn(sh).subscribe((val) -> { + System.out.println(val); + latch.countDown(); + },(t) -> { + latch.countDown(); + error.incrementAndGet(); + }); + + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + assertEquals(1,error.get()); + + } + @Test diff --git a/src/test/java/org/greencheek/caching/herdcache/util/EmptyBlockingQueue.java b/src/test/java/org/greencheek/caching/herdcache/util/EmptyBlockingQueue.java new file mode 100644 index 0000000..01f1112 --- /dev/null +++ b/src/test/java/org/greencheek/caching/herdcache/util/EmptyBlockingQueue.java @@ -0,0 +1,112 @@ +package org.greencheek.caching.herdcache.util; + +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Created by dominictootell on 09/10/2017. + */ +public class EmptyBlockingQueue implements BlockingQueue { + public boolean add(T e) { + throw new IllegalStateException(); + } + + public boolean offer(T e) { + return false; + } + + public void put(T e) throws InterruptedException { + return; + } + + public boolean offer(T e, long l, TimeUnit tu) throws InterruptedException { + tu.sleep(l); + return false; + } + + public T take() throws InterruptedException { + throw new InterruptedException(); + } + + public T poll(long l, TimeUnit tu) throws InterruptedException { + tu.sleep(l); + return null; + } + + public int remainingCapacity() { + return 0; + } + + public boolean remove(Object o) { + return false; + } + + public boolean contains(Object o) { + return false; + } + + public int drainTo(Collection clctn) { + return 0; + } + + public int drainTo(Collection clctn, int i) { + return 0; + } + + public T remove() { + return null; + } + + public T poll() { + return null; + } + + public T element() { + return null; + } + + public T peek() { + return null; + } + + public int size() { + return 0; + } + + public boolean isEmpty() { + return false; + } + + public Iterator iterator() { + return null; + } + + public Object[] toArray() { + return null; + } + + public T[] toArray(T[] ts) { + return null; + } + + public boolean containsAll(Collection clctn) { + return false; + } + + public boolean addAll(Collection clctn) { + return false; + } + + public boolean removeAll(Collection clctn) { + return false; + } + + public boolean retainAll(Collection clctn) { + return false; + } + + public void clear() { + } +}