Skip to content

Commit

Permalink
Dynamic loading notification (#581)
Browse files Browse the repository at this point in the history
* Dynamic loading notification

We need more complete notifications for configuration reloading, specifically we need an event before and after loading to control fast property behavior.

* Switch travis to openjdk8

* Catch Exception
  • Loading branch information
elandau authored Sep 3, 2019
1 parent 2c4b000 commit 004cbf7
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: java
jdk:
- oraclejdk8
- openjdk8
sudo: false
install: ./installViaTravis.sh
script: ./buildViaTravis.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ public AbstractPollingScheduler() {
protected synchronized void initialLoad(final PolledConfigurationSource source, final Configuration config) {
PollResult result = null;
try {
result = source.poll(true, null);
fireEvent(EventType.POLL_BEGIN, null, null);
result = source.poll(true, null);
checkPoint = result.getCheckPoint();
try {
populateProperties(result, config);
} catch (Exception e) {
throw new RuntimeException("Unable to load Properties", e);
}
fireEvent(EventType.POLL_SUCCESS, result, null);
} catch (Throwable e) {
} catch (Exception e) {
fireEvent(EventType.POLL_FAILURE, null, e);
throw new RuntimeException("Unable to load Properties source from " + source, e);
}
try {
populateProperties(result, config);
} catch (Throwable e) {
throw new RuntimeException("Unable to load Properties", e);
}
}

/**
Expand Down Expand Up @@ -160,19 +162,20 @@ public void run() {
log.debug("Polling started");
PollResult result = null;
try {
fireEvent(EventType.POLL_BEGIN, null, null);
result = source.poll(false, getNextCheckPoint(checkPoint));
checkPoint = result.getCheckPoint();

try {
populateProperties(result, config);
} catch (Exception e) {
log.error("Error applying properties", e);
}
fireEvent(EventType.POLL_SUCCESS, result, null);
} catch (Throwable e) {
} catch (Exception e) {
log.error("Error getting result from polling source", e);
fireEvent(EventType.POLL_FAILURE, null, e);
return;
}
try {
populateProperties(result, config);
} catch (Throwable e) {
log.error("Error occured applying properties", e);
}
}

};
Expand Down Expand Up @@ -203,7 +206,7 @@ public void startPolling(final PolledConfigurationSource source, final Configura
}

/**
* Add the PollLisetner
* Add the PollListener
*
* @param l
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
public class DynamicConfiguration extends ConcurrentMapConfiguration {
private AbstractPollingScheduler scheduler;
private PolledConfigurationSource source;


/**
* Constant for the add property event type.
*/
public static final int EVENT_RELOAD = 100;

/**
* Create an instance and start polling the configuration source.
*
Expand Down Expand Up @@ -57,7 +62,25 @@ public synchronized void startPolling(PolledConfigurationSource source, Abstract
this.scheduler = scheduler;
this.source = source;
init(source, scheduler);
scheduler.startPolling(source, this);

scheduler.addPollListener(new PollListener() {
@Override
public void handleEvent(EventType eventType, PollResult lastResult, Throwable exception) {
switch (eventType) {
case POLL_SUCCESS:
fireEvent(EVENT_RELOAD, null, null, false);
break;
case POLL_FAILURE:
fireError(EVENT_RELOAD, null, null, exception);
break;
case POLL_BEGIN:
fireEvent(EVENT_RELOAD, null, null, true);
break;
}
}
});

scheduler.startPolling(source, this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
*/
public interface PollListener {

public enum EventType {
POLL_SUCCESS, POLL_FAILURE
enum EventType {
POLL_SUCCESS, POLL_FAILURE, POLL_BEGIN;
}
/**
* This method is called when the listener is invoked after a polling.
Expand All @@ -33,5 +33,5 @@ public enum EventType {
* @param lastResult the last poll result, null if the poll fails or there is no result
* @param exception any Throwable caught in the last poll, null if the poll is successful
*/
public void handleEvent(EventType eventType, PollResult lastResult, Throwable exception);
void handleEvent(EventType eventType, PollResult lastResult, Throwable exception);
}
106 changes: 90 additions & 16 deletions archaius-core/src/test/java/com/netflix/config/PollingSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,27 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.event.ConfigurationErrorEvent;
import org.apache.commons.configuration.event.ConfigurationErrorListener;
import org.apache.commons.configuration.event.ConfigurationEvent;
import org.apache.commons.configuration.event.ConfigurationListener;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

public class PollingSourceTest {

static class DummyPollingSource implements PolledConfigurationSource {

volatile boolean incremental;
volatile Map<String, Object> full, added, deleted, changed;
private boolean incremental;
private Map<String, Object> full, added, deleted, changed;
private Exception exception;

public DummyPollingSource(boolean incremental) {
this.incremental = incremental;
}
Expand All @@ -38,42 +49,45 @@ public synchronized void setIncremental(boolean value) {
this.incremental = value;
}

public synchronized void setContent(String content, Map<String, Object> map) {
String[] pairs = content.split(",");
public synchronized Map<String, Object> parse(String content) {
final Map<String, Object> map = new ConcurrentHashMap<String, Object>();
final String[] pairs = content.split(",");
if (pairs != null) {
for (String pair: pairs) {
for (String pair : pairs) {
String[] nameValue = pair.trim().split("=");
if (nameValue.length == 2) {
map.put(nameValue[0], nameValue[1]);
map.put(nameValue[0], nameValue[1]);
}
}
}
return map;
}

public synchronized void setFull(String content) {
full = new ConcurrentHashMap<String, Object>();
setContent(content, full);
full = parse(content);
}

public synchronized void setAdded(String content) {
added = new ConcurrentHashMap<String, Object>();
setContent(content, added);
added = parse(content);
}

public synchronized void setDeleted(String content) {
deleted = new ConcurrentHashMap<String, Object>();
setContent(content, deleted);
deleted = parse(content);
}

public synchronized void setChanged(String content) {
changed = new ConcurrentHashMap<String, Object>();
setContent(content, changed);
changed = parse(content);
}

public synchronized void setException(Exception e) {
this.exception = e;
}


@Override
public synchronized PollResult poll(boolean initial, Object checkPoint) throws Exception {
if (incremental) {
if (exception != null) {
throw exception;
} else if (incremental) {
return PollResult.createIncremental(added, changed, deleted, null);
} else {
return PollResult.createFull(full);
Expand Down Expand Up @@ -159,5 +173,65 @@ public void testIncrementalPollingSource() throws Exception {
assertEquals("changed", prop2.get());
}

@Test
public void notificationOnSuccess() {
// Setup
DummyPollingSource source = new DummyPollingSource(true);
FixedDelayPollingScheduler scheduler = new FixedDelayPollingScheduler(0, 200, false);
DynamicConfiguration config = new DynamicConfiguration(source, scheduler);

ConfigurationListener configurationListener = Mockito.mock(ConfigurationListener.class);
ConfigurationErrorListener errorListener = Mockito.mock(ConfigurationErrorListener.class);

ArgumentCaptor<ConfigurationEvent> eventCapture = ArgumentCaptor.forClass(ConfigurationEvent.class);
ArgumentCaptor<ConfigurationErrorEvent> errorCapture = ArgumentCaptor.forClass(ConfigurationErrorEvent.class);

config.addConfigurationListener(configurationListener);
config.addErrorListener(errorListener);

// Run
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);

// Verify
Mockito.verify(configurationListener, Mockito.times(2)).configurationChanged(eventCapture.capture());
Mockito.verify(errorListener, Mockito.never()).configurationError(errorCapture.capture());

Assert.assertEquals(DynamicConfiguration.EVENT_RELOAD, eventCapture.getAllValues().get(0).getType());
Assert.assertTrue(eventCapture.getAllValues().get(0).isBeforeUpdate());
Assert.assertEquals(DynamicConfiguration.EVENT_RELOAD, eventCapture.getAllValues().get(1).getType());
Assert.assertFalse(eventCapture.getAllValues().get(1).isBeforeUpdate());
}

@Test
public void notificationOnFetchFailure() {
// Setup
DummyPollingSource source = new DummyPollingSource(true);

FixedDelayPollingScheduler scheduler = new FixedDelayPollingScheduler(0, 200, false);
DynamicConfiguration config = new DynamicConfiguration(source, scheduler);

ConfigurationListener configurationListener = Mockito.mock(ConfigurationListener.class);
ConfigurationErrorListener errorListener = Mockito.mock(ConfigurationErrorListener.class);

ArgumentCaptor<ConfigurationEvent> eventCapture = ArgumentCaptor.forClass(ConfigurationEvent.class);
ArgumentCaptor<ConfigurationErrorEvent> errorCapture = ArgumentCaptor.forClass(ConfigurationErrorEvent.class);

config.addConfigurationListener(configurationListener);
config.addErrorListener(errorListener);

// Run
source.setException(new Exception("Failed to load"));
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);

// Verify
Mockito.verify(configurationListener, Mockito.times(1)).configurationChanged(eventCapture.capture());
Mockito.verify(errorListener, Mockito.times(1)).configurationError(errorCapture.capture());

Assert.assertEquals(DynamicConfiguration.EVENT_RELOAD, eventCapture.getAllValues().get(0).getType());
Assert.assertTrue(eventCapture.getAllValues().get(0).isBeforeUpdate());

Assert.assertEquals(DynamicConfiguration.EVENT_RELOAD, errorCapture.getAllValues().get(0).getType());
Assert.assertTrue(eventCapture.getAllValues().get(0).isBeforeUpdate());
}
}

1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project(':archaius-core') {
testCompile 'org.slf4j:slf4j-simple:1.7.5'
testCompile 'org.apache.derby:derby:10.8.2.2'
testCompile 'org.apache.commons:commons-io:1.3.2'
testCompile 'org.mockito:mockito-all:1.9.5'
testCompile files('src/test/resources/classpathTestResources.jar')

}
Expand Down

0 comments on commit 004cbf7

Please sign in to comment.