Skip to content

Commit

Permalink
allow resolution of multiple hosts from dns
Browse files Browse the repository at this point in the history
  • Loading branch information
tootedom committed May 17, 2018
1 parent 6b26f66 commit aca2bd9
Show file tree
Hide file tree
Showing 25 changed files with 845 additions and 52 deletions.
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,16 @@
<netty.version>4.0.49.Final</netty.version>
<fst.version>1.63</fst.version>
<jmh.version>1.19</jmh.version>
<metrics.version>3.1.0</metrics.version>
<!--<rx.version>2.1.0</rx.version>-->
<rx.version>1.3.0</rx.version>



<!-- java9 amn not available yet -->
<metrics.version>4.0.0</metrics.version>
<rx.version>1.3.4</rx.version>


<!-- java9 amn available -->
<rxjava2.version>2.1.8</rxjava2.version>
</properties>
<dependencies>
<!--<dependency>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
/**
*
*/
class BaseObservableMemcachedCache<V extends Serializable> implements ObservableCache<V>
abstract class BaseObservableMemcachedCache<V extends Serializable> implements ObservableCache<V>
{


Expand All @@ -55,6 +55,7 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa




private final CacheWrite cacheWriter;
private final CacheRead<V> cacheReader;
private static final Logger logger = LoggerFactory.getLogger(BaseObservableMemcachedCache.class);
Expand All @@ -73,12 +74,20 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa
private final long millisToWaitForDelete;
private final boolean waitForMemcachedSet;

public BaseObservableMemcachedCache(MemcachedCacheConfig config) {
this(null,config);
}

public BaseObservableMemcachedCache(
MemcachedClientFactory clientFactory,
MemcachedCacheConfig config) {
this.config = config;
cacheKeyCreator = CacheKeyCreatorFactory.DEFAULT_INSTANCE.create(config);
this.clientFactory = clientFactory;
if(clientFactory == null) {
this.clientFactory = buildClientFactory(config);
} else {
this.clientFactory = clientFactory;
}

int maxCapacity = config.getMaxCapacity();

Expand All @@ -97,6 +106,8 @@ public BaseObservableMemcachedCache(
waitForMemcachedSet = config.isWaitForMemcachedSet();
}

public abstract MemcachedClientFactory buildClientFactory(Object cfg);

private ConcurrentMap createInternalCache(boolean createCache,
int initialCapacity,
int maxCapacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig;
import org.greencheek.caching.herdcache.memcached.elasticacheconfig.client.ElastiCacheConfigHostsParser;
import org.greencheek.caching.herdcache.memcached.factory.ElastiCacheClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.MemcachedClientFactory;

import java.io.Serializable;

Expand Down Expand Up @@ -38,6 +39,26 @@ private ElastiCacheObservableMemcachedCache(MemcachedCacheConfig mConfig, Elasti
}


@Override
public MemcachedClientFactory buildClientFactory(Object cfg) {
ElastiCacheCacheConfig config = (ElastiCacheCacheConfig) cfg;
MemcachedCacheConfig mConfig = config.getMemcachedCacheConfig();
return new ElastiCacheClientFactory(
createReferenceClientFactory(config),
ElastiCacheConfigHostsParser.parseElastiCacheConfigHosts(config.getElastiCacheConfigHosts()),
config.getConfigPollingTime(),
config.getInitialConfigPollingDelay(),
config.getIdleReadTimeout(),
config.getReconnectDelay(),
config.getDelayBeforeClientClose(),
mConfig.getHostResolver(),
mConfig.getDnsConnectionTimeout(),
config.isUpdateConfigVersionOnDnsTimeout(),
config.getNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(),
config.getConnectionTimeoutInMillis(),
config.getClusterUpdatedObservers(),
config.getConfigUrlUpdater(),
config.isUpdateConfigOnlyOnVersionChange());


}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
package org.greencheek.caching.herdcache.memcached;

import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig;
import org.greencheek.caching.herdcache.memcached.factory.DynamicSpyMemcachedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.MemcachedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedClientFactory;
import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedReferencedClientFactory;

import java.io.Serializable;
import java.time.Duration;

/**
* Created by dominictootell on 26/08/2014.
*/
public class SpyObservableMemcachedCache<V extends Serializable> extends BaseObservableMemcachedCache<V> {
MemcachedCacheConfig config;

public SpyObservableMemcachedCache(MemcachedCacheConfig config) {
super(new SpyMemcachedClientFactory<V>(config.getMemcachedHosts(),
config.getDnsConnectionTimeout(),config.getHostStringParser(),
config.getHostResolver(),new SpyMemcachedReferencedClientFactory<V>(createMemcachedConnectionFactory(config))), config);
super(config);
}


public MemcachedClientFactory buildClientFactory(Object cfg) {
MemcachedCacheConfig config = (MemcachedCacheConfig)cfg;
if (config.resolveHostsFromDns()) {
return new DynamicSpyMemcachedClientFactory<V>(config.getMemcachedHosts(),
config.getDurationForResolvingHostsFromDns(),config.getHostStringParser(),new SpyMemcachedReferencedClientFactory<V>(createMemcachedConnectionFactory(config)));
} else {
return new SpyMemcachedClientFactory<V>(config.getMemcachedHosts(),
config.getDnsConnectionTimeout(),config.getHostStringParser(),
config.getHostResolver(),new SpyMemcachedReferencedClientFactory<V>(createMemcachedConnectionFactory(config)));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class MemcachedCacheConfig {
private final boolean herdProtectionEnabled;
private final Scheduler waitForMemcachedSetRxScheduler;
private final KeyValidationType keyValidationType;
private final boolean resolveHostsFromDns;
private final Duration resolveHostsFromDnsEvery;


public MemcachedCacheConfig(Duration timeToLive,
Expand Down Expand Up @@ -79,7 +81,9 @@ public MemcachedCacheConfig(Duration timeToLive,
LocatorFactory locatorFactory,
boolean herdProtectionEnabled,
Scheduler waitForMemcachedSetRxScheduler,
KeyValidationType keyValidationType) {
KeyValidationType keyValidationType,
boolean resolveHostsFromDns,
Duration resolveHostsFromDnsEvery) {
this.timeToLive = timeToLive;
this.maxCapacity = maxCapacity;
this.memcachedHosts = hosts;
Expand Down Expand Up @@ -111,6 +115,8 @@ public MemcachedCacheConfig(Duration timeToLive,
this.herdProtectionEnabled = herdProtectionEnabled;
this.waitForMemcachedSetRxScheduler = waitForMemcachedSetRxScheduler;
this.keyValidationType = keyValidationType;
this.resolveHostsFromDns = resolveHostsFromDns;
this.resolveHostsFromDnsEvery = resolveHostsFromDnsEvery;
}

public Duration getTimeToLive() {
Expand Down Expand Up @@ -205,6 +211,7 @@ public boolean hasKeyPrefix() {
return hasKeyPrefix;
}


public boolean isRemoveFutureFromInternalCacheBeforeSettingValue() {
return removeFutureFromInternalCacheBeforeSettingValue;
}
Expand Down Expand Up @@ -252,4 +259,12 @@ public Scheduler getWaitForMemcachedSetRxScheduler() {
public KeyValidationType getKeyValidationType() {
return keyValidationType;
}

public boolean resolveHostsFromDns() {
return resolveHostsFromDns;
}

public Duration getDurationForResolvingHostsFromDns() {
return resolveHostsFromDnsEvery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public abstract class MemcachedCacheConfigBuilder<T extends MemcachedCacheConfig
private LocatorFactory locatorFactory = LocatorFactory.KETAMA_CEILING_ARRAY;
private CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.SNAPPY;
private boolean herdProtectionEnabled = true;
private boolean resolveHostsFromDns = false;
private Duration resolveHostsFromDnsEvery = Duration.ofMillis((30000)); // 30 seconds

private Scheduler waitForMemcachedSetRxScheduler = Schedulers.io();
private KeyValidationType keyValidationType = KeyValidationType.BY_HASHING_TYPE;
Expand All @@ -84,7 +86,9 @@ public MemcachedCacheConfig buildMemcachedConfig()
locatorFactory,
herdProtectionEnabled,
waitForMemcachedSetRxScheduler,
keyValidationType);
keyValidationType,
resolveHostsFromDns,
resolveHostsFromDnsEvery);
}

public T setCompressionAlgorithm(CompressionAlgorithm algorithm) {
Expand Down Expand Up @@ -279,4 +283,14 @@ public T setKeyValidationType(KeyValidationType type) {
this.keyValidationType = type;
return self();
}

public T setResolveHostsFromDns(boolean resolveHostsFromDns) {
this.resolveHostsFromDns = resolveHostsFromDns;
return self();
}

public T setResolveHostsFromDnsEvery(Duration resolveHostsFromDnsEvery) {
this.resolveHostsFromDnsEvery = resolveHostsFromDnsEvery;
return self();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.greencheek.caching.herdcache.memcached.dns;

import java.net.InetAddress;

/**
* TODO Document
*/
public interface AddressResolver {

/**
* Resolve a host name into one or more IPvX address types.
*
* @param host The host to resolve the IP addresses for.
*
* @return An array of one or more {@link InetAddress} details for a given host. If an error occurred (such as an
* unknown host, or a DNS collision) then an empty result should be returned.
*/
InetAddress[] resolve(String host);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.greencheek.caching.herdcache.memcached.dns;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

/**
* Default {@link AddressResolver} that defers to the {@link InetAddress#getAllByName(String)} method for address
* resolution.
*
* If the DNS resolver returns 127.0.53.53 (https://www.icann.org/resources/pages/name-collision-2013-12-06-en),
* then it is ignored from the list of returned addresses.
*
* If there are no addresses, then an Array with 0 elements is returned.
*/
public class DefaultAddressResolver implements AddressResolver {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAddressResolver.class);

public static final InetAddress[] EMPTY = new InetAddress[0];

@Override
public InetAddress[] resolve(final String host) {

try {
InetAddress[] addresses = InetAddress.getAllByName(host);

if (addresses.length == 0) {
return EMPTY;
}

return addresses;
} catch (UnknownHostException e) {
LOGGER.warn("Failed to resolve address for '{}'", host, e);
return EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,37 @@ public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> node
@Override
public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> nodes, Duration dnsLookupTimeout) {
LookupService addressLookupService = LookupService.create();

List<InetSocketAddress> workingNodes = new ArrayList<InetSocketAddress>(nodes.size());
for (Host hostAndPort : nodes) {
Future<InetAddress> future = null;
String host = hostAndPort.getHost();
int port = hostAndPort.getPort();
try {
future = addressLookupService.getByName(host);
InetAddress ia = future.get(dnsLookupTimeout.getSeconds(), TimeUnit.SECONDS);
if (ia == null) {
logger.error("Unable to resolve dns entry for the host: {}", host);
}
else
{
try {
workingNodes.add(new InetSocketAddress(ia,port));
}
catch (IllegalArgumentException e) {
logger.error("Invalid port number has been provided for the memcached node: host({}),port({})", host, port);
try {

for (Host hostAndPort : nodes) {
Future<InetAddress> future = null;
String host = hostAndPort.getHost();
int port = hostAndPort.getPort();
try {
future = addressLookupService.getByName(host);
InetAddress ia = future.get(dnsLookupTimeout.getSeconds(), TimeUnit.SECONDS);
if (ia == null) {
logger.error("Unable to resolve dns entry for the host: {}", host);
} else {
try {
workingNodes.add(new InetSocketAddress(ia, port));
} catch (IllegalArgumentException e) {
logger.error("Invalid port number has been provided for the memcached node: host({}),port({})", host, port);
}
}
} catch (TimeoutException e) {
logger.error("Problem resolving host name ({}) to an ip address in fixed number of seconds: {}", host, dnsLookupTimeout, e);
} catch (Exception e) {
logger.error("Problem resolving host name to ip address: {}", host, e);
} finally {
if (future != null) future.cancel(true);
}
}
catch(TimeoutException e) {
logger.error("Problem resolving host name ({}) to an ip address in fixed number of seconds: {}", host, dnsLookupTimeout, e);
}
catch(Exception e) {
logger.error("Problem resolving host name to ip address: {}", host,e);
}
finally {
if (future != null) future.cancel(true);
}
} finally {
addressLookupService.shutdown();
}
addressLookupService.shutdown();

return workingNodes;

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.greencheek.caching.herdcache.memcached.dns.lookup;

import org.greencheek.caching.herdcache.memcached.config.Host;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
Expand All @@ -9,8 +10,10 @@
* Created by dominictootell on 06/06/2014.
*/
public interface HostResolver {
public static Duration DEFAULT_DNS_TIMEOUT = Duration.ofSeconds(3);
public static Duration DEFAULT_DNS_TIMEOUT = Duration.ofSeconds(3);

public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> nodes);

public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> nodes, Duration dnsLookupTimeout);

public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> nodes);
public List<InetSocketAddress> returnSocketAddressesForHostNames(List<Host> nodes, Duration dnsLookupTimeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.greencheek.caching.herdcache.memcached.dns.resolver;

/**
* Created by dominictootell on 17/05/2018.
*/
public class Foreground {


}
Loading

0 comments on commit aca2bd9

Please sign in to comment.