diff --git a/pom.xml b/pom.xml index 9de7d10..c77adb4 100644 --- a/pom.xml +++ b/pom.xml @@ -67,9 +67,16 @@ 4.0.49.Final 1.63 1.19 - 3.1.0 - - 1.3.0 + + + + + 4.0.0 + 1.3.4 + + + + 2.1.8 diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java index cb0ebab..55b9e94 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/BaseObservableMemcachedCache.java @@ -37,7 +37,7 @@ /** * */ -class BaseObservableMemcachedCache implements ObservableCache +abstract class BaseObservableMemcachedCache implements ObservableCache { @@ -55,6 +55,7 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa + private final CacheWrite cacheWriter; private final CacheRead cacheReader; private static final Logger logger = LoggerFactory.getLogger(BaseObservableMemcachedCache.class); @@ -73,12 +74,20 @@ public static ReferencedClientFactory createReferenceClientFactory(ElastiCacheCa private final long millisToWaitForDelete; private final boolean waitForMemcachedSet; + public BaseObservableMemcachedCache(MemcachedCacheConfig config) { + this(null,config); + } + public BaseObservableMemcachedCache( MemcachedClientFactory clientFactory, MemcachedCacheConfig config) { this.config = config; cacheKeyCreator = CacheKeyCreatorFactory.DEFAULT_INSTANCE.create(config); - this.clientFactory = clientFactory; + if(clientFactory == null) { + this.clientFactory = buildClientFactory(config); + } else { + this.clientFactory = clientFactory; + } int maxCapacity = config.getMaxCapacity(); @@ -97,6 +106,8 @@ public BaseObservableMemcachedCache( waitForMemcachedSet = config.isWaitForMemcachedSet(); } + public abstract MemcachedClientFactory buildClientFactory(Object cfg); + private ConcurrentMap createInternalCache(boolean createCache, int initialCapacity, int maxCapacity) { diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/ElastiCacheObservableMemcachedCache.java b/src/main/java/org/greencheek/caching/herdcache/memcached/ElastiCacheObservableMemcachedCache.java index 527b76c..6bcebd6 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/ElastiCacheObservableMemcachedCache.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/ElastiCacheObservableMemcachedCache.java @@ -4,6 +4,7 @@ import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig; import org.greencheek.caching.herdcache.memcached.elasticacheconfig.client.ElastiCacheConfigHostsParser; import org.greencheek.caching.herdcache.memcached.factory.ElastiCacheClientFactory; +import org.greencheek.caching.herdcache.memcached.factory.MemcachedClientFactory; import java.io.Serializable; @@ -38,6 +39,26 @@ private ElastiCacheObservableMemcachedCache(MemcachedCacheConfig mConfig, Elasti } + @Override + public MemcachedClientFactory buildClientFactory(Object cfg) { + ElastiCacheCacheConfig config = (ElastiCacheCacheConfig) cfg; + MemcachedCacheConfig mConfig = config.getMemcachedCacheConfig(); + return new ElastiCacheClientFactory( + createReferenceClientFactory(config), + ElastiCacheConfigHostsParser.parseElastiCacheConfigHosts(config.getElastiCacheConfigHosts()), + config.getConfigPollingTime(), + config.getInitialConfigPollingDelay(), + config.getIdleReadTimeout(), + config.getReconnectDelay(), + config.getDelayBeforeClientClose(), + mConfig.getHostResolver(), + mConfig.getDnsConnectionTimeout(), + config.isUpdateConfigVersionOnDnsTimeout(), + config.getNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(), + config.getConnectionTimeoutInMillis(), + config.getClusterUpdatedObservers(), + config.getConfigUrlUpdater(), + config.isUpdateConfigOnlyOnVersionChange()); - + } } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/SpyObservableMemcachedCache.java b/src/main/java/org/greencheek/caching/herdcache/memcached/SpyObservableMemcachedCache.java index 9b05218..ec37155 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/SpyObservableMemcachedCache.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/SpyObservableMemcachedCache.java @@ -1,18 +1,35 @@ package org.greencheek.caching.herdcache.memcached; import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig; +import org.greencheek.caching.herdcache.memcached.factory.DynamicSpyMemcachedClientFactory; +import org.greencheek.caching.herdcache.memcached.factory.MemcachedClientFactory; import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedClientFactory; import org.greencheek.caching.herdcache.memcached.factory.SpyMemcachedReferencedClientFactory; import java.io.Serializable; +import java.time.Duration; /** * Created by dominictootell on 26/08/2014. */ public class SpyObservableMemcachedCache extends BaseObservableMemcachedCache { + MemcachedCacheConfig config; + public SpyObservableMemcachedCache(MemcachedCacheConfig config) { - super(new SpyMemcachedClientFactory(config.getMemcachedHosts(), - config.getDnsConnectionTimeout(),config.getHostStringParser(), - config.getHostResolver(),new SpyMemcachedReferencedClientFactory(createMemcachedConnectionFactory(config))), config); + super(config); } + + + public MemcachedClientFactory buildClientFactory(Object cfg) { + MemcachedCacheConfig config = (MemcachedCacheConfig)cfg; + if (config.resolveHostsFromDns()) { + return new DynamicSpyMemcachedClientFactory(config.getMemcachedHosts(), + config.getDurationForResolvingHostsFromDns(),config.getHostStringParser(),new SpyMemcachedReferencedClientFactory(createMemcachedConnectionFactory(config))); + } else { + return new SpyMemcachedClientFactory(config.getMemcachedHosts(), + config.getDnsConnectionTimeout(),config.getHostStringParser(), + config.getHostResolver(),new SpyMemcachedReferencedClientFactory(createMemcachedConnectionFactory(config))); + } + } + } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/config/MemcachedCacheConfig.java b/src/main/java/org/greencheek/caching/herdcache/memcached/config/MemcachedCacheConfig.java index 867b2a5..8eec610 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/config/MemcachedCacheConfig.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/config/MemcachedCacheConfig.java @@ -48,6 +48,8 @@ public class MemcachedCacheConfig { private final boolean herdProtectionEnabled; private final Scheduler waitForMemcachedSetRxScheduler; private final KeyValidationType keyValidationType; + private final boolean resolveHostsFromDns; + private final Duration resolveHostsFromDnsEvery; public MemcachedCacheConfig(Duration timeToLive, @@ -79,7 +81,9 @@ public MemcachedCacheConfig(Duration timeToLive, LocatorFactory locatorFactory, boolean herdProtectionEnabled, Scheduler waitForMemcachedSetRxScheduler, - KeyValidationType keyValidationType) { + KeyValidationType keyValidationType, + boolean resolveHostsFromDns, + Duration resolveHostsFromDnsEvery) { this.timeToLive = timeToLive; this.maxCapacity = maxCapacity; this.memcachedHosts = hosts; @@ -111,6 +115,8 @@ public MemcachedCacheConfig(Duration timeToLive, this.herdProtectionEnabled = herdProtectionEnabled; this.waitForMemcachedSetRxScheduler = waitForMemcachedSetRxScheduler; this.keyValidationType = keyValidationType; + this.resolveHostsFromDns = resolveHostsFromDns; + this.resolveHostsFromDnsEvery = resolveHostsFromDnsEvery; } public Duration getTimeToLive() { @@ -205,6 +211,7 @@ public boolean hasKeyPrefix() { return hasKeyPrefix; } + public boolean isRemoveFutureFromInternalCacheBeforeSettingValue() { return removeFutureFromInternalCacheBeforeSettingValue; } @@ -252,4 +259,12 @@ public Scheduler getWaitForMemcachedSetRxScheduler() { public KeyValidationType getKeyValidationType() { return keyValidationType; } + + public boolean resolveHostsFromDns() { + return resolveHostsFromDns; + } + + public Duration getDurationForResolvingHostsFromDns() { + return resolveHostsFromDnsEvery; + } } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/config/builder/MemcachedCacheConfigBuilder.java b/src/main/java/org/greencheek/caching/herdcache/memcached/config/builder/MemcachedCacheConfigBuilder.java index d6c9237..8a26498 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/config/builder/MemcachedCacheConfigBuilder.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/config/builder/MemcachedCacheConfigBuilder.java @@ -59,6 +59,8 @@ public abstract class MemcachedCacheConfigBuilder returnSocketAddressesForHostNames(List node @Override public List returnSocketAddressesForHostNames(List nodes, Duration dnsLookupTimeout) { LookupService addressLookupService = LookupService.create(); - List workingNodes = new ArrayList(nodes.size()); - for (Host hostAndPort : nodes) { - Future future = null; - String host = hostAndPort.getHost(); - int port = hostAndPort.getPort(); - try { - future = addressLookupService.getByName(host); - InetAddress ia = future.get(dnsLookupTimeout.getSeconds(), TimeUnit.SECONDS); - if (ia == null) { - logger.error("Unable to resolve dns entry for the host: {}", host); - } - else - { - try { - workingNodes.add(new InetSocketAddress(ia,port)); - } - catch (IllegalArgumentException e) { - logger.error("Invalid port number has been provided for the memcached node: host({}),port({})", host, port); + try { + + for (Host hostAndPort : nodes) { + Future future = null; + String host = hostAndPort.getHost(); + int port = hostAndPort.getPort(); + try { + future = addressLookupService.getByName(host); + InetAddress ia = future.get(dnsLookupTimeout.getSeconds(), TimeUnit.SECONDS); + if (ia == null) { + logger.error("Unable to resolve dns entry for the host: {}", host); + } else { + try { + workingNodes.add(new InetSocketAddress(ia, port)); + } catch (IllegalArgumentException e) { + logger.error("Invalid port number has been provided for the memcached node: host({}),port({})", host, port); + } } + } catch (TimeoutException e) { + logger.error("Problem resolving host name ({}) to an ip address in fixed number of seconds: {}", host, dnsLookupTimeout, e); + } catch (Exception e) { + logger.error("Problem resolving host name to ip address: {}", host, e); + } finally { + if (future != null) future.cancel(true); } } - catch(TimeoutException e) { - logger.error("Problem resolving host name ({}) to an ip address in fixed number of seconds: {}", host, dnsLookupTimeout, e); - } - catch(Exception e) { - logger.error("Problem resolving host name to ip address: {}", host,e); - } - finally { - if (future != null) future.cancel(true); - } + } finally { + addressLookupService.shutdown(); } - addressLookupService.shutdown(); - return workingNodes; + } } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/lookup/HostResolver.java b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/lookup/HostResolver.java index 77cbd7e..208bd93 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/lookup/HostResolver.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/lookup/HostResolver.java @@ -1,6 +1,7 @@ package org.greencheek.caching.herdcache.memcached.dns.lookup; import org.greencheek.caching.herdcache.memcached.config.Host; + import java.net.InetSocketAddress; import java.time.Duration; import java.util.List; @@ -9,8 +10,10 @@ * Created by dominictootell on 06/06/2014. */ public interface HostResolver { - public static Duration DEFAULT_DNS_TIMEOUT = Duration.ofSeconds(3); + public static Duration DEFAULT_DNS_TIMEOUT = Duration.ofSeconds(3); + + public List returnSocketAddressesForHostNames(List nodes); + + public List returnSocketAddressesForHostNames(List nodes, Duration dnsLookupTimeout); - public List returnSocketAddressesForHostNames(List nodes); - public List returnSocketAddressesForHostNames(List nodes, Duration dnsLookupTimeout); } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/Foreground.java b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/Foreground.java new file mode 100644 index 0000000..4b95376 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/Foreground.java @@ -0,0 +1,9 @@ +package org.greencheek.caching.herdcache.memcached.dns.resolver; + +/** + * Created by dominictootell on 17/05/2018. + */ +public class Foreground { + + +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/IPSort.java b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/IPSort.java new file mode 100644 index 0000000..5814f33 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/IPSort.java @@ -0,0 +1,76 @@ +package org.greencheek.caching.herdcache.memcached.dns.resolver; + +import java.net.InetAddress; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Optional; +import java.util.stream.Stream; + +public class IPSort { + private static String[] TESTS = {"0:0:0:0:0:0:fff:ffff","::FFFF:222.1.41.90",":8:","::::5:6::8","::::5:6::7","::::5:6::8","123..245.23","1...","..1.","123...23",".1..","123..245.23", "123..245.23", "104.244.253.29", "1.198.3.93", "32.183.93.40", "32.183.93.40", "104.30.244.2", "104.244.4.1","0.0.0.1",":a:","::5:3:4:5:6:78","1::2:3","1::2:3:4","1::5:256.2.3.4","1:1:3000.30.30.30","ae80::217:f2ff:254:7:237:98","::2:3:4:5:6:7","2:3:4:5:6:7","::5:3:4:5:6:7:8","::5:3:4:5:6:7:8:9:0","1::8","1::2:3","1::2:3:4","1::5:256.2.3.4","1:1:3000.30.30.30","ae80::217:f2ff:254.7.237.98","1:2:3:4::5:1.2.3.4","2001:0000:1234:0000:0000:C1C0:ABCD:0876","12345::6:7:8","1::1.2.900.4","fe80::","::ffff:0:0"}; + + public static class InetAddressComparator implements Comparator { + @Override + public int compare(InetAddress a, InetAddress b) { + byte[] aOctets = a.getAddress(), + bOctets = b.getAddress(); + int len = Math.max(aOctets.length, bOctets.length); + for (int i = 0; i < len; i++) { + byte aOctet = (i >= len - aOctets.length) ? + aOctets[i - (len - aOctets.length)] : 0; + byte bOctet = (i >= len - bOctets.length) ? + bOctets[i - (len - bOctets.length)] : 0; + if (aOctet != bOctet) return (0xff & aOctet) - (0xff & bOctet); + } + return 0; + } + } + + public static Optional toInetAddress(String s) { + try { + return Optional.of(InetAddress.getByName(s)); + } catch (UnknownHostException badAddress) { + return Optional.empty(); + } + } + + public static void main(String[] args) throws Exception { + System.out.println("Valid 32-bit addresses"); + Arrays.stream(TESTS) + .map(IPSort::toInetAddress) + .filter(Optional::isPresent) + .map(Optional::get) + .filter((addr) -> addr instanceof Inet4Address) + .map(InetAddress::getHostAddress) + .forEach(System.out::println); + + System.out.println("\nValid 128-bit addresses"); + Arrays.stream(TESTS) + .map(IPSort::toInetAddress) + .filter(Optional::isPresent) + .map(Optional::get) + .filter((addr) -> addr instanceof Inet6Address) + .map(InetAddress::getHostAddress) + .forEach(System.out::println); + + System.out.println("\nInvalid addresses"); + Arrays.stream(TESTS) + .filter((s) -> !toInetAddress(s).isPresent()) + .forEach(System.out::println); + + System.out.println("\nSorted addresses"); + Arrays.stream(TESTS) + .map(IPSort::toInetAddress) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted(new InetAddressComparator()) + .map(InetAddress::getHostAddress) + .forEach(System.out::println); + } +} + + diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparator.java b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparator.java new file mode 100644 index 0000000..38231c3 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparator.java @@ -0,0 +1,21 @@ +package org.greencheek.caching.herdcache.memcached.dns.resolver; + +import java.net.InetAddress; +import java.util.Comparator; + +public class InetAddressComparator implements Comparator { + @Override + public int compare(InetAddress a, InetAddress b) { + byte[] aOctets = a.getAddress(), + bOctets = b.getAddress(); + int len = Math.max(aOctets.length, bOctets.length); + for (int i = 0; i < len; i++) { + byte aOctet = (i >= len - aOctets.length) ? + aOctets[i - (len - aOctets.length)] : 0; + byte bOctet = (i >= len - bOctets.length) ? + bOctets[i - (len - bOctets.length)] : 0; + if (aOctet != bOctet) return (0xff & aOctet) - (0xff & bOctet); + } + return 0; + } +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/domain/ResolvedAddresses.java b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/domain/ResolvedAddresses.java new file mode 100644 index 0000000..30f2017 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/dns/resolver/domain/ResolvedAddresses.java @@ -0,0 +1,34 @@ +package org.greencheek.caching.herdcache.memcached.dns.resolver.domain; + +import java.net.InetAddress; + +/** + * {@link ResolvedAddresses} holds a collection of resolved {@link InetAddress} details for a single host. + */ +public interface ResolvedAddresses { + + /** + * Fetch the number of addresses available that can be used to connect to a host. + * + * @return The number of available {@link InetAddress} entries for a host. + */ + int available(); + + /** + * Check whether or not this {@link ResolvedAddresses} has {@link InetAddress} details available. + * + * @return {@code true} if the this instance has at least one {@link InetAddress} available to be used, or + * {@code false} if there are no {@link InetAddress} details available. + */ + boolean isEmpty(); + + /** + * Fetch the next available {@link InetAddress}(es) to use for a host. + * + *

The implementation is free to decide whether to return a single address, or multiple addresses if they are + * available.

+ * + * @return The next available {@link InetAddress}(es) in the pool, or an empty array if none are available. + */ + InetAddress[] next(); +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolver.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolver.java new file mode 100644 index 0000000..9595fc8 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolver.java @@ -0,0 +1,164 @@ +package org.greencheek.caching.herdcache.memcached.factory; + +import org.greencheek.caching.herdcache.memcached.config.Host; +import org.greencheek.caching.herdcache.memcached.dns.AddressResolver; +import org.greencheek.caching.herdcache.memcached.dns.DefaultAddressResolver; +import org.greencheek.caching.herdcache.memcached.dns.resolver.InetAddressComparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Created by dominictootell on 17/05/2018. + */ +public class BackgroundDnsResolver implements ReferencedClientHolder { + private static final Logger LOG = LoggerFactory.getLogger(BackgroundDnsResolver.class); + private static Holder EMPTY = new Holder(SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT,new InetAddress[0]); + + private final AtomicReference client = new AtomicReference(EMPTY); + + private final ScheduledExecutorService scheduledExecutorService; + private final AddressResolver resolver; + private final Host host; + private final ReferencedClientFactory connnectionFactory; + private final Comparator sortingComparator = new InetAddressComparator(); + + private static final ThreadFactory DEFAULT_THREAD_FACTORY = (r) -> { + final Thread t = new Thread(r); + t.setDaemon(true); + return t; + }; + + + public BackgroundDnsResolver(Host host,long backgroundPollingTimeInMillis, + ReferencedClientFactory connnectionFactory) { + + this(host,backgroundPollingTimeInMillis,connnectionFactory,new DefaultAddressResolver()); + } + + public BackgroundDnsResolver(Host host,long backgroundPollingTimeInMillis, + ReferencedClientFactory connnectionFactory, AddressResolver resolver) { + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(DEFAULT_THREAD_FACTORY); + this.host = host; + + + scheduledExecutorService.scheduleWithFixedDelay( + backgroundResolver(), + 0,backgroundPollingTimeInMillis, + TimeUnit.MILLISECONDS); + + + this.connnectionFactory = connnectionFactory; + this.resolver = resolver; + } + + private Runnable backgroundResolver() { + return () -> { + String hostName = host.getHost(); + Holder currentResolvedAddresses = client.get(); + InetAddress[] addresses = resolver.resolve(hostName); + + addresses = checkForCollisionResponses(addresses, hostName); + + InetAddress[] existingAddresses = currentResolvedAddresses.addresses; + + if(addresses.length == 0) { + + if(existingAddresses.length==0) { + LOG.error("Failed to resolve address for '{}', no pre-cached addresses to re-use", host); + } + } else { + if (haveAddressesChanged(addresses,existingAddresses)) { + ReferencedClient referencedClient = connnectionFactory.createClient(toSocketAddresses(addresses)); + if (referencedClient != SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT) { + client.set(new Holder(referencedClient,addresses)); + } + LOG.debug("[{}] Addresses available: {}", host, toCommaSeparated(addresses)); + } else { + LOG.debug("[{}] Has Same Addresses: {}", host, toCommaSeparated(addresses)); + } + } + }; + } + + private InetAddress[] checkForCollisionResponses(final InetAddress[] addresses, String host) { + List okResponses = new ArrayList<>(0); + for (final InetAddress address : addresses) { + if ((address instanceof Inet4Address) && isCollision(address)) { + LOG.warn("DNS collision response found for {}", host); + continue; + } + okResponses.add(address); + } + return okResponses.toArray(new InetAddress[okResponses.size()]); + } + + private boolean isCollision(final InetAddress inetAddress) { + final byte[] addressBytes = inetAddress.getAddress(); + + return (addressBytes[0] == 127) && (addressBytes[1] == 0) && (addressBytes[2] == 53) && (addressBytes[3] == 53); + } + + private List toSocketAddresses(InetAddress[] addresses) { + List socketAddresses = new ArrayList<>(addresses.length); + for(InetAddress addy : addresses) { + socketAddresses.add(new InetSocketAddress(addy,host.getPort())); + } + return socketAddresses; + } + + private String toCommaSeparated(InetAddress[] addresses) { + return Arrays.stream(addresses) + .map(x -> x.getHostAddress()) + .collect(Collectors.joining(",")); + } + + private boolean haveAddressesChanged(InetAddress[] newaddresses, InetAddress[] oldaddresses) { + + Arrays.sort(newaddresses, sortingComparator); + if (newaddresses.length != oldaddresses.length) { + return true; + } + + for (int i =0;i implements MemcachedClientFactory { + + private final ReferencedClientHolder memcached; + private final HostStringParser hostStringParser; + private final ReferencedClientFactory connectionFactory; + + public DynamicSpyMemcachedClientFactory(String memcachedHosts, + Duration pollingForAddressesTime, + HostStringParser hostStringParser, + ReferencedClientFactory connnectionFactory) { + this.hostStringParser = hostStringParser; + this.connectionFactory = connnectionFactory; + + List parsedHosts = hostStringParser.parseMemcachedNodeList(memcachedHosts); + + if(parsedHosts==null || parsedHosts.size()==0) { + throw new InstantiationError("Error Parsing Host String:"+memcachedHosts); + } + + if (parsedHosts.size()>1){ + throw new InstantiationError("Only one host expected in Host String:"+memcachedHosts); + } + + memcached = new BackgroundDnsResolver(parsedHosts.get(0),pollingForAddressesTime.toMillis(),connnectionFactory); + + } + + + @Override + public ReferencedClient getClient() { + return memcached.getClient(); + } + + @Override + public boolean isEnabled() { + return memcached.getClient().isAvailable(); + } + + @Override + public void shutdown() { + if(isEnabled()) { + memcached.shutdown(); + } + } +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferenceClientResolver.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferenceClientResolver.java new file mode 100644 index 0000000..9ff47f7 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferenceClientResolver.java @@ -0,0 +1,13 @@ +package org.greencheek.caching.herdcache.memcached.factory; + +import org.greencheek.caching.herdcache.memcached.config.Host; + +import java.time.Duration; +import java.util.List; + +/** + * Created by dominictootell on 17/05/2018. + */ +public interface ReferenceClientResolver { + ReferencedClientHolder getClientHolder(List parsedHosts, Duration lookupTimeout); +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferencedClientHolder.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferencedClientHolder.java new file mode 100644 index 0000000..39b93c6 --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/ReferencedClientHolder.java @@ -0,0 +1,9 @@ +package org.greencheek.caching.herdcache.memcached.factory; + +/** + * Created by dominictootell on 17/05/2018. + */ +public interface ReferencedClientHolder { + ReferencedClient getClient(); + default void shutdown() {}; +} diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedClientFactory.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedClientFactory.java index 7214717..b0c296d 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedClientFactory.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedClientFactory.java @@ -1,12 +1,9 @@ package org.greencheek.caching.herdcache.memcached.factory; -import net.spy.memcached.ConnectionFactory; -import net.spy.memcached.MemcachedClient; import org.greencheek.caching.herdcache.memcached.config.Host; import org.greencheek.caching.herdcache.memcached.config.hostparsing.HostStringParser; import org.greencheek.caching.herdcache.memcached.dns.lookup.HostResolver; -import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; import java.util.List; diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedReferencedClientFactory.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedReferencedClientFactory.java index edf202f..5b53036 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedReferencedClientFactory.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyMemcachedReferencedClientFactory.java @@ -21,7 +21,7 @@ public SpyMemcachedReferencedClientFactory(ConnectionFactory factory) { @Override public ReferencedClient createClient(List resolvedHosts) { try { - return new SpyReferencedClient(true,resolvedHosts,new MemcachedClient(factory,resolvedHosts)); + return new SpyReferencedClient(true,resolvedHosts,new MemcachedClient(factory,resolvedHosts)); } catch (IOException e) { return SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT; } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyReferencedClient.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyReferencedClient.java index 08fcc4e..bbee5f1 100644 --- a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyReferencedClient.java +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/SpyReferencedClient.java @@ -79,11 +79,9 @@ public Future flush() { @Override public void shutdown() { - client.shutdown(); + if(client!=null) { + client.shutdown(); + } } -// @Override -// public MemcachedClientIF getClient() { -// return client; -// } } diff --git a/src/main/java/org/greencheek/caching/herdcache/memcached/factory/StaticReferencedClientHolder.java b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/StaticReferencedClientHolder.java new file mode 100644 index 0000000..1bd29dc --- /dev/null +++ b/src/main/java/org/greencheek/caching/herdcache/memcached/factory/StaticReferencedClientHolder.java @@ -0,0 +1,17 @@ +package org.greencheek.caching.herdcache.memcached.factory; + +/** + * Created by dominictootell on 17/05/2018. + */ +public class StaticReferencedClientHolder implements ReferencedClientHolder { + + private final ReferencedClient client; + + public StaticReferencedClientHolder(ReferencedClient client) { + this.client = client; + } + @Override + public ReferencedClient getClient() { + return null; + } +} diff --git a/src/test/java/org/greencheek/caching/herdcache/memcached/TestSimpleMemcachedCaching.java b/src/test/java/org/greencheek/caching/herdcache/memcached/TestSimpleMemcachedCaching.java index 759e17f..d3f3177 100644 --- a/src/test/java/org/greencheek/caching/herdcache/memcached/TestSimpleMemcachedCaching.java +++ b/src/test/java/org/greencheek/caching/herdcache/memcached/TestSimpleMemcachedCaching.java @@ -565,20 +565,62 @@ private void testHashAlgorithm(HashAlgorithm algo) { } + private void testHashAlgorithmViaDynamicCache(HashAlgorithm algo) { + cache = new SpyMemcachedCache( + new ElastiCacheCacheConfigBuilder() + .setMemcachedHosts("localhost:" + memcached.getPort()) + .setTimeToLive(Duration.ofSeconds(60)) + .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT) + .setWaitForMemcachedSet(true) + .setHashAlgorithm(algo) + .setKeyPrefix(Optional.of("elastic")) + .setResolveHostsFromDns(true) + .buildMemcachedConfig() + ); + + ListenableFuture val = cache.apply("Key1", () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "value1"; + }, executorService); + + ListenableFuture val2 = cache.apply("Key1", () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "value2"; + }, executorService); + + + assertEquals("Value should be key1", "value1", cache.awaitForFutureOrElse(val, null)); + assertEquals("Value should be key1", "value1", cache.awaitForFutureOrElse(val2, null)); + + assertEquals(1, memcached.getDaemon().getCache().getCurrentItems()); + + } + @Test public void testJenkinsHashAlgorithm() { testHashAlgorithm(new JenkinsHash()); + testHashAlgorithmViaDynamicCache(new JenkinsHash()); } @Test public void testXXHashAlgorithm() { testHashAlgorithm(new XXHashAlogrithm()); + testHashAlgorithmViaDynamicCache(new XXHashAlogrithm()); } @Test public void testAsciiXXHashAlgorithm() { testHashAlgorithm(new AsciiXXHashAlogrithm()); + testHashAlgorithmViaDynamicCache(new AsciiXXHashAlogrithm()); } @Test diff --git a/src/test/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparatorTest.java b/src/test/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparatorTest.java new file mode 100644 index 0000000..bb08f2e --- /dev/null +++ b/src/test/java/org/greencheek/caching/herdcache/memcached/dns/resolver/InetAddressComparatorTest.java @@ -0,0 +1,32 @@ +package org.greencheek.caching.herdcache.memcached.dns.resolver; + + +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Comparator; + +import static org.junit.Assert.assertEquals; + +public class InetAddressComparatorTest { + + Comparator comparator = new InetAddressComparator(); + + + @Test + public void testSorting() throws UnknownHostException { + InetAddress[] addresses = new InetAddress[3]; + + addresses[0] = InetAddress.getByName("127.0.0.100"); + addresses[1] = InetAddress.getByName("127.0.0.2"); + addresses[2] = InetAddress.getByName("127.0.0.1"); + + Arrays.sort(addresses, comparator); + assertEquals("127.0.0.1",addresses[0].getHostAddress()); + assertEquals("127.0.0.2",addresses[1].getHostAddress()); + assertEquals("127.0.0.100",addresses[2].getHostAddress()); + } + +} \ No newline at end of file diff --git a/src/test/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolverTest.java b/src/test/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolverTest.java new file mode 100644 index 0000000..562a397 --- /dev/null +++ b/src/test/java/org/greencheek/caching/herdcache/memcached/factory/BackgroundDnsResolverTest.java @@ -0,0 +1,175 @@ +package org.greencheek.caching.herdcache.memcached.factory; + +import net.spy.memcached.ConnectionFactory; +import org.greencheek.caching.herdcache.memcached.config.Host; +import org.greencheek.caching.herdcache.memcached.config.MemcachedCacheConfig; +import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder; +import org.greencheek.caching.herdcache.memcached.dns.AddressResolver; +import org.greencheek.caching.herdcache.memcached.spyconnectionfactory.SpyConnectionFactoryBuilder; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +public class BackgroundDnsResolverTest { + + + + class SimpleCircularAddressResolver implements AddressResolver{ + private AtomicInteger called = new AtomicInteger(0); + private final InetAddress[][] addresses; + + public SimpleCircularAddressResolver(InetAddress[][] addressesToReturn) { + this.addresses = addressesToReturn; + } + + @Override + public InetAddress[] resolve(String host) { + if(called.get()==addresses.length) { + called.set(0); + } + + return addresses[called.getAndIncrement()]; + } + } + + ReferencedClientFactory reffactory; + + @Before + public void setUp() { + MemcachedCacheConfig config = new ElastiCacheCacheConfigBuilder().buildMemcachedConfig(); + ConnectionFactory factory = SpyConnectionFactoryBuilder.createConnectionFactory( + config.getFailureMode(), + config.getHashAlgorithm(), config.getSerializingTranscoder(), + config.getProtocol(), config.getReadBufferSize(), config.getKeyHashType(), + config.getLocatorFactory(), config.getKeyValidationType()); + + reffactory = new SpyMemcachedReferencedClientFactory(factory); + } + + @Test + public void updateOfDnsIsCaptured() throws UnknownHostException, InterruptedException { + + InetAddress[] addresses1 = new InetAddress[3]; + + addresses1[0] = InetAddress.getByName("127.0.0.100"); + addresses1[1] = InetAddress.getByName("127.0.0.2"); + addresses1[2] = InetAddress.getByName("127.0.0.1"); + + InetAddress[] addresses2 = new InetAddress[3]; + addresses2[0] = InetAddress.getByName("127.0.0.101"); + addresses2[1] = InetAddress.getByName("127.0.0.2"); + addresses2[2] = InetAddress.getByName("127.0.0.3"); + + InetAddress[] addresses3 = new InetAddress[3]; + addresses3[0] = InetAddress.getByName("127.0.0.101"); + addresses3[1] = InetAddress.getByName("127.0.0.2"); + addresses3[2] = InetAddress.getByName("127.0.0.3"); + + BackgroundDnsResolver resolver = new BackgroundDnsResolver(new Host("bob.com",11211),10000,reffactory,new SimpleCircularAddressResolver(new InetAddress[][]{addresses1,addresses2,addresses3})); + try { + Thread.sleep(2000); + ReferencedClient client = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + Thread.sleep(13000); + ReferencedClient client2 = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + assertNotSame(client, client2); + Thread.sleep(13000); + ReferencedClient client3 = resolver.getClient(); + assertSame(client2, client3); + } + finally { + resolver.shutdown(); + } + + } + + @Test + public void updateOfDnsIsNotChangedWhenInvalidAddressesReturned() throws UnknownHostException, InterruptedException { + + InetAddress[] addresses1 = new InetAddress[3]; + + addresses1[0] = InetAddress.getByName("127.0.0.100"); + addresses1[1] = InetAddress.getByName("127.0.0.2"); + addresses1[2] = InetAddress.getByName("127.0.0.1"); + + InetAddress[] addresses2 = new InetAddress[3]; + addresses2[0] = InetAddress.getByName("127.0.53.53"); + addresses2[1] = InetAddress.getByName("127.0.53.53"); + addresses2[2] = InetAddress.getByName("127.0.53.53"); + + InetAddress[] addresses3 = new InetAddress[3]; + addresses3[0] = InetAddress.getByName("127.0.53.53"); + addresses3[1] = InetAddress.getByName("127.0.53.53"); + addresses3[2] = InetAddress.getByName("127.0.53.53"); + + BackgroundDnsResolver resolver = new BackgroundDnsResolver(new Host("bob.com",11211),10000,reffactory,new SimpleCircularAddressResolver(new InetAddress[][]{addresses1,addresses2,addresses3})); + try { + Thread.sleep(2000); + ReferencedClient client = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + Thread.sleep(13000); + ReferencedClient client2 = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + assertSame(client, client2); + Thread.sleep(13000); + ReferencedClient client3 = resolver.getClient(); + assertSame(client2, client3); + assertSame(client, client3); + } + finally { + resolver.shutdown(); + } + + } + + @Test + public void updateOfDnsIsChangedAndFaultyAddressRemoved() throws UnknownHostException, InterruptedException { + + InetAddress[] addresses1 = new InetAddress[3]; + + addresses1[0] = InetAddress.getByName("127.0.0.100"); + addresses1[1] = InetAddress.getByName("127.0.0.2"); + addresses1[2] = InetAddress.getByName("127.0.0.1"); + + InetAddress[] addresses2 = new InetAddress[3]; + addresses2[0] = InetAddress.getByName("127.0.53.53"); + addresses2[1] = InetAddress.getByName("127.0.0.4"); + addresses2[2] = InetAddress.getByName("127.0.0.5"); + + + + BackgroundDnsResolver resolver = new BackgroundDnsResolver(new Host("bob.com",11211),10000,reffactory,new SimpleCircularAddressResolver(new InetAddress[][]{addresses1,addresses2})); + try { + Thread.sleep(5000); + ReferencedClient client = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + Thread.sleep(13000); + ReferencedClient client2 = resolver.getClient(); + assertNotSame(client, SpyReferencedClient.UNAVAILABLE_REFERENCE_CLIENT); + assertNotSame(client, client2); + + List sockAddys = client2.getResolvedHosts(); + + assertEquals(2, sockAddys.size()); + for(InetSocketAddress addy : sockAddys) { + System.out.println(addy.getAddress().getHostAddress()); + } + + } + finally { + resolver.shutdown(); + } + + } + +} \ No newline at end of file