Skip to content

Commit

Permalink
Cache and log rejected execution when writing to cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tootedom committed Oct 9, 2017
1 parent 9921122 commit c1f5563
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.LoggerFactory;
import rx.Single;
import rx.SingleSubscriber;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.schedulers.Schedulers;

import java.io.Serializable;
Expand Down Expand Up @@ -57,7 +59,6 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa
private final CacheRead<V> cacheReader;
private static final Logger logger = LoggerFactory.getLogger(BaseObservableMemcachedCache.class);


private final MemcachedCacheConfig config;
private final MemcachedClientFactory clientFactory;
private final ConcurrentMap<String,Single<CacheItem<V>>> store;
Expand All @@ -72,8 +73,6 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa
private final long millisToWaitForDelete;
private final boolean waitForMemcachedSet;



public BaseObservableMemcachedCache(
MemcachedClientFactory clientFactory,
MemcachedCacheConfig config) {
Expand Down Expand Up @@ -113,6 +112,14 @@ private ConcurrentMap createInternalCache(boolean createCache,
}


public static final void logMemcachedWriteError(Throwable t) {
if(t instanceof RejectedExecutionException) {
logger.warn("Scheduler rejected execution.",t);
}
else {
logger.warn("Unexpected Exception occurred during memcached write on Scheduler.",t);
}
}

private void warnCacheDisabled() {
logger.warn("Cache is disabled");
Expand Down Expand Up @@ -309,44 +316,21 @@ private void notifySubscriberAndWriteToCache(ReferencedClient client,
CacheItem<V> cacheItem = new CacheItem<>(keyString,results,false);

if(waitForMemcachedSet) {

try {
writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor)
.subscribeOn(Schedulers.immediate())
.subscribe();
}

catch (RejectedExecutionException e) {
logger.warn("Scheduler rejected execution", e);
}

catch (Exception e) {
logger.error("Some wholly unforeseen error condition occurred," +
" chomping it so your cache can continue operating", e);
}
.subscribe(Actions.empty(),BaseObservableMemcachedCache::logMemcachedWriteError);

notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store);
} else {
notifySubscriberOnSuccess(keyString,cacheItem,singleSubscriber,store);

try {
writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor)
.subscribeOn(config.getWaitForMemcachedSetRxScheduler())
.subscribe();
}

catch (RejectedExecutionException e) {
logger.warn("Scheduler rejected execution", e);
}

catch (Exception e) {
logger.error("Some wholly unforeseen error condition occurred," +
" chomping it so your cache can continue operating", e);
}
writeToCache(client, cacheItem, keyString,
DurationToSeconds.getSeconds(timeToLive),
canCacheValueEvalutor)
.subscribeOn(config.getWaitForMemcachedSetRxScheduler())
.subscribe(Actions.empty(),BaseObservableMemcachedCache::logMemcachedWriteError);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder;
import org.greencheek.caching.herdcache.memcached.util.MemcachedDaemonFactory;
import org.greencheek.caching.herdcache.memcached.util.MemcachedDaemonWrapper;
import org.greencheek.caching.herdcache.util.EmptyBlockingQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action1;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -539,30 +542,99 @@ public void testErrorSubscriber() {
}

@Test
public void testErrorScheduler() {
public void testErrorOnMemcachedWriteProvidedScheduler() {


ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new EmptyBlockingQueue(),
new ThreadPoolExecutor.AbortPolicy());

cache = new SpyObservableMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setWaitForMemcachedSetRxScheduler(new Scheduler() {
@Override
public Worker createWorker() {
throw new RuntimeException();
}
})
.setWaitForMemcachedSetRxScheduler(Schedulers.from(p))
.setWaitForMemcachedSet(false)
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(false)
.setKeyValidationType(KeyValidationType.ALWAYS)
.buildMemcachedConfig()
);

CacheItem<String> val = cache.set("Key1", "value1",Duration.ZERO).toBlocking().value();
AtomicBoolean keepRunning = new AtomicBoolean(true);

p.execute(new Runnable() {
@Override
public void run() {
while(keepRunning.get()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

CacheItem<String> val = cache.set("Key1", "value1", Duration.ZERO).toBlocking().value();
keepRunning.set(false);

// No need to assert anything, an unhandled scheduling exception will fail this test for us :)
}

@Test
public void testErrorOnMemcachedWriteScheduler() {

cache = new SpyObservableMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setWaitForMemcachedSet(false)
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setKeyValidationType(KeyValidationType.ALWAYS)
.buildMemcachedConfig()
);

ThreadPoolExecutor ex = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new EmptyBlockingQueue(),
new ThreadPoolExecutor.AbortPolicy());

Scheduler sh = Schedulers.from(ex);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger error = new AtomicInteger(0);

RxJavaHooks.setOnError((t) -> {
latch.countDown();
error.incrementAndGet();
});

cache.set("Key1", "value1",Duration.ZERO).subscribeOn(sh).observeOn(sh).subscribe((val) -> {
System.out.println(val);
latch.countDown();
},(t) -> {
latch.countDown();
error.incrementAndGet();
});


try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

assertEquals(1,error.get());

}



@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.greencheek.caching.herdcache.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Created by dominictootell on 09/10/2017.
*/
public class EmptyBlockingQueue<T> implements BlockingQueue<T> {
public boolean add(T e) {
throw new IllegalStateException();
}

public boolean offer(T e) {
return false;
}

public void put(T e) throws InterruptedException {
return;
}

public boolean offer(T e, long l, TimeUnit tu) throws InterruptedException {
tu.sleep(l);
return false;
}

public T take() throws InterruptedException {
throw new InterruptedException();
}

public T poll(long l, TimeUnit tu) throws InterruptedException {
tu.sleep(l);
return null;
}

public int remainingCapacity() {
return 0;
}

public boolean remove(Object o) {
return false;
}

public boolean contains(Object o) {
return false;
}

public int drainTo(Collection<? super T> clctn) {
return 0;
}

public int drainTo(Collection<? super T> clctn, int i) {
return 0;
}

public T remove() {
return null;
}

public T poll() {
return null;
}

public T element() {
return null;
}

public T peek() {
return null;
}

public int size() {
return 0;
}

public boolean isEmpty() {
return false;
}

public Iterator<T> iterator() {
return null;
}

public Object[] toArray() {
return null;
}

public <T> T[] toArray(T[] ts) {
return null;
}

public boolean containsAll(Collection<?> clctn) {
return false;
}

public boolean addAll(Collection<? extends T> clctn) {
return false;
}

public boolean removeAll(Collection<?> clctn) {
return false;
}

public boolean retainAll(Collection<?> clctn) {
return false;
}

public void clear() {
}
}

0 comments on commit c1f5563

Please sign in to comment.