Skip to content

Commit

Permalink
Merge pull request #4 from samcgardner/handle-rx-exceptions-in-set
Browse files Browse the repository at this point in the history
Added handling for exceptions when scheduling sets using RxJava
  • Loading branch information
tootedom authored Oct 9, 2017
2 parents d36fdfd + 4e2b5c1 commit 9921122
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,45 @@ private void notifySubscriberAndWriteToCache(ReferencedClient client,
CacheItem<V> cacheItem = new CacheItem<>(keyString,results,false);

if(waitForMemcachedSet) {
Single<CacheItem<V>> write = writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor);
write.subscribeOn(Schedulers.immediate()).subscribe();

try {
writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor)
.subscribeOn(Schedulers.immediate())
.subscribe();
}

catch (RejectedExecutionException e) {
logger.warn("Scheduler rejected execution", e);
}

catch (Exception e) {
logger.error("Some wholly unforeseen error condition occurred," +
" chomping it so your cache can continue operating", e);
}

notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store);
} else {
notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store);

Single<CacheItem<V>> write = writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor);
write.subscribeOn(config.getWaitForMemcachedSetRxScheduler()).subscribe();
try {
writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor)
.subscribeOn(config.getWaitForMemcachedSetRxScheduler())
.subscribe();
}

catch (RejectedExecutionException e) {
logger.warn("Scheduler rejected execution", e);
}

catch (Exception e) {
logger.error("Some wholly unforeseen error condition occurred," +
" chomping it so your cache can continue operating", e);
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -537,6 +538,32 @@ public void testErrorSubscriber() {
assertTrue("should have errored",hasErrored.get());
}

@Test
public void testErrorScheduler() {

cache = new SpyObservableMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setWaitForMemcachedSetRxScheduler(new Scheduler() {
@Override
public Worker createWorker() {
throw new RuntimeException();
}
})
.setWaitForMemcachedSet(false)
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(false)
.setKeyValidationType(KeyValidationType.ALWAYS)
.buildMemcachedConfig()
);

CacheItem<String> val = cache.set("Key1", "value1",Duration.ZERO).toBlocking().value();

// No need to assert anything, an unhandled scheduling exception will fail this test for us :)
}



@Test
public void testFromCacheSetting() {
Expand Down

0 comments on commit 9921122

Please sign in to comment.