Skip to content

Commit

Permalink
shutdown executors
Browse files Browse the repository at this point in the history
  • Loading branch information
tootedom committed Sep 25, 2016
1 parent 63b9074 commit e019a59
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 51 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@
<concurrentlinkedhashmap.version>1.4.2</concurrentlinkedhashmap.version>

<hystrix.version>1.4.5</hystrix.version>
<spy.version>2.12.0</spy.version>
<spy.version>2.12.1</spy.version>
<jsr666e.version>1.1.0</jsr666e.version>
<slf4j.version>1.7.12</slf4j.version>
<memcachedtest.version>1.0.0</memcachedtest.version>
<xxhashing.version>1.3.0</xxhashing.version>
<jsnappy.version>1.1.1.7</jsnappy.version>
<logback.version>1.1.3</logback.version>
<junit.version>4.11</junit.version>
<netty.version>4.0.30.Final</netty.version>
<netty.version>4.0.41.Final</netty.version>
<fst.version>1.63</fst.version>
<jmh.version>1.10.4</jmh.version>
<metrics.version>3.1.0</metrics.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ private void testCaching(CacheWithExpiry cache) {

private void testHashAlgorithm(HashAlgorithm algo) {

ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor();

String[] configurationsMessage1 = new String[]{
"CONFIG cluster 0 147\r\n" + "1\r\n" + "localhost|127.0.0.1|" + memcached1.getPort() + "\r\n" + "\nEND\r\n"
Expand All @@ -171,24 +170,26 @@ private void testHashAlgorithm(HashAlgorithm algo) {
};

StringServer configServer1 = new StringServer(configurationsMessage1, 0, TimeUnit.SECONDS);
configServer1.before(configurationsMessage1, TimeUnit.SECONDS, -1, false);


StringServer configServer2 = new StringServer(configurationsMessage2, 0, TimeUnit.SECONDS);
configServer2.before(configurationsMessage2, TimeUnit.SECONDS, -1, false);

ElastiCacheConfigServerUpdater configServerUpdater = new SimpleVolatileBasedElastiCacheConfigServerUpdater();

String[] urls = new String[]{"localhost:"+configServer1.getPort(),"localhost:"+configServer2.getPort()};

ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor();

try {
configServer1.before(configurationsMessage1, TimeUnit.SECONDS, -1, false);
configServer2.before(configurationsMessage2, TimeUnit.SECONDS, -1, false);
String[] urls = new String[]{"localhost:"+configServer1.getPort(),"localhost:"+configServer2.getPort()};


cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("localhost:" + configServer1.getPort())
.setConfigPollingTime(Duration.ofSeconds(10))
.setConfigPollingTime(Duration.ofSeconds(5))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(5))
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setHashAlgorithm(algo)
Expand Down Expand Up @@ -231,7 +232,7 @@ private void testHashAlgorithm(HashAlgorithm algo) {
testCaching(cache);
}
assertTrue(memcached1.getDaemon().getCache().getCurrentItems()>1);
assertTrue(memcached2.getDaemon().getCache().getCurrentItems()>1);
assertTrue(memcached2.getDaemon().getCache().getCurrentItems()>=1);

}
finally {
Expand All @@ -240,6 +241,7 @@ private void testHashAlgorithm(HashAlgorithm algo) {
if(cache instanceof RequiresShutdown) {
((RequiresShutdown)cache).shutdown();
}
sexec.shutdownNow();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ private void testHashAlgorithm(HashAlgorithm algo) {
};

StringServer server = new StringServer(configurationsMessage, 0, TimeUnit.SECONDS);
server.before(configurationsMessage, TimeUnit.SECONDS, -1, false);


try {
server.before(configurationsMessage, TimeUnit.SECONDS, -1, false);

cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("localhost:" + server.getPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,16 @@ private void testHashAlgorithm(HashAlgorithm algo) {
};

StringServer server = new StringServer(configurationsMessage, 0, TimeUnit.SECONDS);
server.before(configurationsMessage, TimeUnit.SECONDS, -1, false);

final AtomicInteger latch = new AtomicInteger(1);
try {
server.before(configurationsMessage, TimeUnit.SECONDS, -1, false);

ClientClusterUpdateObserver observer = (updated) -> {
latch.decrementAndGet();
};
final AtomicInteger latch = new AtomicInteger(1);

ClientClusterUpdateObserver observer = (updated) -> {
latch.decrementAndGet();
};

try {
cache = createCache(server.getPort(),algo,observer);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,43 +212,47 @@ public void processConfig(ConfigInfo info) {
public void testUpdateConfiguration() {

ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor();
ConfigRetrievalSettingsBuilder builder = new ConfigRetrievalSettingsBuilder();
final CountDownLatch latch = new CountDownLatch(7);
final AtomicInteger invalid = new AtomicInteger(0);
ConfigInfoProcessor processor = new ConfigInfoProcessor() {
@Override
public void processConfig(ConfigInfo info) {
System.out.println(info);
latch.countDown();
if(!info.isValid()) invalid.incrementAndGet();
try {
ConfigRetrievalSettingsBuilder builder = new ConfigRetrievalSettingsBuilder();
final CountDownLatch latch = new CountDownLatch(7);
final AtomicInteger invalid = new AtomicInteger(0);
ConfigInfoProcessor processor = new ConfigInfoProcessor() {
@Override
public void processConfig(ConfigInfo info) {
System.out.println(info);
latch.countDown();
if (!info.isValid()) invalid.incrementAndGet();
}
};

final ElastiCacheConfigServerUpdater configUpdator = new SimpleVolatileBasedElastiCacheConfigServerUpdater();

builder.setConfigInfoProcessor(processor);
builder.setConfigPollingTime(0, 2, TimeUnit.SECONDS);
builder.setIdleReadTimeout(70, TimeUnit.SECONDS);
builder.setReconnectDelay(1000, TimeUnit.MILLISECONDS);
builder.addElastiCacheHost(new ElastiCacheServerConnectionDetails("localhost", server.getPort()));
builder.setNumberOfInvalidConfigsBeforeReconnect(10);
builder.setConfigUrlUpdater(Optional.of(configUpdator));

client = new PeriodicConfigRetrievalClient(builder.build());
client.start();

boolean ok = false;
try {
sexec.scheduleWithFixedDelay(() -> {
configUpdator.connectionUpdated("localhost:" + server.getPort());
}, 0, 11, TimeUnit.SECONDS);
ok = latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("problem waiting for config retrieval");
}
};

final ElastiCacheConfigServerUpdater configUpdator = new SimpleVolatileBasedElastiCacheConfigServerUpdater();

builder.setConfigInfoProcessor(processor);
builder.setConfigPollingTime(0,2, TimeUnit.SECONDS);
builder.setIdleReadTimeout(70, TimeUnit.SECONDS);
builder.setReconnectDelay(1000,TimeUnit.MILLISECONDS);
builder.addElastiCacheHost(new ElastiCacheServerConnectionDetails("localhost",server.getPort()));
builder.setNumberOfInvalidConfigsBeforeReconnect(10);
builder.setConfigUrlUpdater(Optional.of(configUpdator));

client = new PeriodicConfigRetrievalClient(builder.build());
client.start();

boolean ok=false;
try {
sexec.scheduleWithFixedDelay(()->{
configUpdator.connectionUpdated("localhost:"+server.getPort());
},0,11,TimeUnit.SECONDS);
ok = latch.await(30, TimeUnit.SECONDS);
} catch(InterruptedException e) {
fail("problem waiting for config retrieval");
assertTrue(ok);
assertEquals(7, invalid.get());
} finally {
sexec.shutdownNow();
}

assertTrue(ok);
assertEquals(7,invalid.get());
}

@Test
Expand Down

0 comments on commit e019a59

Please sign in to comment.