From 4e2b5c12690f425f40874a350972626c6b4e4194 Mon Sep 17 00:00:00 2001 From: samcgardner Date: Mon, 9 Oct 2017 17:27:15 +0100 Subject: [PATCH] Added handling for exceptions when scheduling sets using RxJava --- .../BaseObservableMemcachedCache.java | 42 +++++++++++++++---- .../TestObservableSettingCacheValues.java | 27 ++++++++++++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java index 6d1cb4e..bad69c4 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java @@ -309,19 +309,45 @@ private void notifySubscriberAndWriteToCache(ReferencedClient client, CacheItem cacheItem = new CacheItem<>(keyString,results,false); if(waitForMemcachedSet) { - Single> write = writeToCache(client, cacheItem, keyString, - DurationToSeconds.getSeconds(timeToLive), - canCacheValueEvalutor); - write.subscribeOn(Schedulers.immediate()).subscribe(); + + try { + writeToCache(client, cacheItem, keyString, + DurationToSeconds.getSeconds(timeToLive), + canCacheValueEvalutor) + .subscribeOn(Schedulers.immediate()) + .subscribe(); + } + + catch (RejectedExecutionException e) { + logger.warn("Scheduler rejected execution", e); + } + + catch (Exception e) { + logger.error("Some wholly unforeseen error condition occurred," + + " chomping it so your cache can continue operating", e); + } notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store); } else { notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store); - Single> write = writeToCache(client, cacheItem, keyString, - DurationToSeconds.getSeconds(timeToLive), - canCacheValueEvalutor); - write.subscribeOn(config.getWaitForMemcachedSetRxScheduler()).subscribe(); + try { + writeToCache(client, cacheItem, keyString, + DurationToSeconds.getSeconds(timeToLive), + canCacheValueEvalutor) + .subscribeOn(config.getWaitForMemcachedSetRxScheduler()) + .subscribe(); + } + + catch (RejectedExecutionException e) { + logger.warn("Scheduler rejected execution", e); + } + + catch (Exception e) { + logger.error("Some wholly unforeseen error condition occurred," + + " chomping it so your cache can continue operating", e); + } + } } } diff --git a/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java b/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java index 62b120a..da315f9 100644 --- a/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java +++ b/src/test/java/org/greencheek/caching/herdcache/memcached/observable/TestObservableSettingCacheValues.java @@ -14,6 +14,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import rx.Scheduler; import rx.Single; import rx.functions.Action1; import rx.schedulers.Schedulers; @@ -537,6 +538,32 @@ public void testErrorSubscriber() { assertTrue("should have errored",hasErrored.get()); } + @Test + public void testErrorScheduler() { + + cache = new SpyObservableMemcachedCache<>( + new ElastiCacheCacheConfigBuilder() + .setMemcachedHosts("localhost:" + memcached.getPort()) + .setWaitForMemcachedSetRxScheduler(new Scheduler() { + @Override + public Worker createWorker() { + throw new RuntimeException(); + } + }) + .setWaitForMemcachedSet(false) + .setTimeToLive(Duration.ofSeconds(10)) + .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT) + .setWaitForMemcachedSet(false) + .setKeyValidationType(KeyValidationType.ALWAYS) + .buildMemcachedConfig() + ); + + CacheItem val = cache.set("Key1", "value1",Duration.ZERO).toBlocking().value(); + + // No need to assert anything, an unhandled scheduling exception will fail this test for us :) + } + + @Test public void testFromCacheSetting() {