diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 287f6872d3b7..c5988993e424 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2112,16 +2112,19 @@ In addition to the normal cache metrics, the caffeine cache implementation also Uses memcached as cache backend. This allows all processes to share the same cache. -|Property|Description|Default| -|--------|-----------|-------| -|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)| -|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500| -|`druid.cache.hosts`|Comma separated list of Memcached hosts ``.|none| -|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MiB)| -|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid| -|`druid.cache.numConnections`|Number of memcached connections to use.|1| -|`druid.cache.protocol`|Memcached communication protocol. Can be binary or text.|binary| -|`druid.cache.locator`|Memcached locator. Can be consistent or array_mod.|consistent| +| Property | Description | Default | +|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| +| `druid.cache.expiration` | Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol). | 2592000 (30 days) | +| `druid.cache.timeout` | Maximum time in milliseconds to wait for a response from Memcached. | 500 | +| `druid.cache.hosts` | Comma separated list of Memcached hosts ``. Need to specify all nodes when `druid.cache.clientMode` is set to static. Dynamic mode [automatically identifies nodes in your cluster](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.html) so just specifying the configuration endpoint and port is fine. | none | +| `druid.cache.maxObjectSize` | Maximum object size in bytes for a Memcached object. | 52428800 (50 MiB) | +| `druid.cache.memcachedPrefix` | Key prefix for all keys in Memcached. | druid | +| `druid.cache.numConnections` | Number of memcached connections to use. | 1 | +| `druid.cache.protocol` | Memcached communication protocol. Can be binary or text. | binary | +| `druid.cache.locator` | Memcached locator. Can be consistent or array_mod. | consistent | +| `druid.cache.enableTls` | Enable TLS based connection for Memcached client. Boolean | false | +| `druid.cache.clientMode` | Client Mode. Static mode requires the user to specify individual cluster nodes. Dynamic mode uses [AutoDiscovery](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.HowAutoDiscoveryWorks.html) feature of AWS Memcached. String. ["static"](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.Manual.html) or ["dynamic"](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.Using.ModifyApp.Java.html) | static | +| `druid.cache.skipTlsHostnameVerification` | Skip TLS Hostname Verification. Boolean. | true | #### Hybrid diff --git a/licenses.yaml b/licenses.yaml index 734e834f173e..6e931bbc208a 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1658,13 +1658,13 @@ libraries: --- -name: Spymemcached +name: aws-elasticache-cluster-client-memcached-for-java license_category: binary module: java-core license_name: Apache License version 2.0 -version: 2.12.3 +version: 1.2.0 libraries: - - net.spy: spymemcached + - com.amazonaws: elasticache-java-cluster-client --- diff --git a/pom.xml b/pom.xml index 0064fa4fc50f..9f1a3e9360cd 100644 --- a/pom.xml +++ b/pom.xml @@ -773,9 +773,9 @@ 3.3.6 - net.spy - spymemcached - 2.12.3 + com.amazonaws + elasticache-java-cluster-client + 1.2.0 org.antlr diff --git a/server/pom.xml b/server/pom.xml index 5ba0b170a9ed..6154ecfa4c20 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -134,8 +134,8 @@ tesla-aether - net.spy - spymemcached + com.amazonaws + elasticache-java-cluster-client org.lz4 diff --git a/server/src/main/java/org/apache/druid/client/cache/MemcachedCache.java b/server/src/main/java/org/apache/druid/client/cache/MemcachedCache.java index 08ba9f8706c1..d67e01b110b3 100644 --- a/server/src/main/java/org/apache/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/org/apache/druid/client/cache/MemcachedCache.java @@ -30,6 +30,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import net.spy.memcached.AddrUtil; +import net.spy.memcached.ClientMode; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; @@ -52,10 +53,16 @@ import org.apache.druid.java.util.metrics.AbstractMonitor; import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -339,25 +346,8 @@ public void updateHistogram(String name, int amount) } }; - final ConnectionFactory connectionFactory = new MemcachedCustomConnectionFactoryBuilder() - // 1000 repetitions gives us good distribution with murmur3_128 - // (approx < 5% difference in counts across nodes, with 5 cache nodes) - .setKetamaNodeRepetitions(1000) - .setHashAlg(MURMUR3_128) - .setProtocol(ConnectionFactoryBuilder.Protocol.valueOf(StringUtils.toUpperCase(config.getProtocol()))) - .setLocatorType(ConnectionFactoryBuilder.Locator.valueOf(StringUtils.toUpperCase(config.getLocator()))) - .setDaemon(true) - .setFailureMode(FailureMode.Cancel) - .setTranscoder(transcoder) - .setShouldOptimize(true) - .setOpQueueMaxBlockTime(config.getTimeout()) - .setOpTimeout(config.getTimeout()) - .setReadBufferSize(config.getReadBufferSize()) - .setOpQueueFactory(opQueueFactory) - .setMetricCollector(metricCollector) - .setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds - .build(); - + final ConnectionFactory connectionFactory = createConnectionFactory(config, transcoder, + opQueueFactory, metricCollector); final List hosts = AddrUtil.getAddresses(config.getHosts()); @@ -389,11 +379,57 @@ public MemcachedClientIF get() return new MemcachedCache(clientSupplier, config, monitor); } - catch (IOException e) { + catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + catch (KeyStoreException e) { + throw new RuntimeException(e); + } + catch (KeyManagementException e) { throw new RuntimeException(e); } } + public static ConnectionFactory createConnectionFactory(final MemcachedCacheConfig config, final LZ4Transcoder transcoder, final OperationQueueFactory opQueueFactory, final MetricCollector metricCollector) throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException + { + MemcachedCustomConnectionFactoryBuilder connectionFactoryBuilder = (MemcachedCustomConnectionFactoryBuilder) new MemcachedCustomConnectionFactoryBuilder() + // 1000 repetitions gives us good distribution with murmur3_128 + // (approx < 5% difference in counts across nodes, with 5 cache nodes) + .setKetamaNodeRepetitions(1000) + .setHashAlg(MURMUR3_128) + .setProtocol(ConnectionFactoryBuilder.Protocol.valueOf(StringUtils.toUpperCase(config.getProtocol()))) + .setLocatorType(ConnectionFactoryBuilder.Locator.valueOf(StringUtils.toUpperCase(config.getLocator()))) + .setDaemon(true) + .setFailureMode(FailureMode.Cancel) + .setTranscoder(transcoder) + .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()) + .setOpQueueFactory(opQueueFactory) + .setMetricCollector(metricCollector) + .setEnableMetrics(MetricType.DEBUG); // Not as scary as it sounds + if (config.enableTls()) { + // Build SSLContext + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init((KeyStore) null); + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + // Create the client in TLS mode + connectionFactoryBuilder.setSSLContext(sslContext); + } + if ("dynamic".equals(config.getClientMode())) { + connectionFactoryBuilder.setClientMode(ClientMode.Dynamic); + connectionFactoryBuilder.setHostnameForTlsVerification(config.getHosts().split(",")[0]); + } else if ("static".equals(config.getClientMode())) { + connectionFactoryBuilder.setClientMode(ClientMode.Static); + } else { + throw new RuntimeException("Invalid value provided for `druid.cache.clientMode`. Value must be 'static' or 'dynamic'."); + } + connectionFactoryBuilder.setSkipTlsHostnameVerification(config.skipTlsHostnameVerification()); + return connectionFactoryBuilder.build(); + } + private final int timeout; private final int expiration; private final String memcachedPrefix; diff --git a/server/src/main/java/org/apache/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/org/apache/druid/client/cache/MemcachedCacheConfig.java index 6ad250396a04..338cdba33c1c 100644 --- a/server/src/main/java/org/apache/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/org/apache/druid/client/cache/MemcachedCacheConfig.java @@ -63,6 +63,15 @@ public class MemcachedCacheConfig @JsonProperty private String locator = "consistent"; + @JsonProperty + private boolean enableTls = false; + + @JsonProperty + private String clientMode = "static"; + + @JsonProperty + private boolean skipTlsHostnameVerification = true; + public int getExpiration() { return expiration; @@ -112,4 +121,19 @@ public String getLocator() { return locator; } + + public boolean enableTls() + { + return enableTls; + } + + public String getClientMode() + { + return clientMode; + } + + public boolean skipTlsHostnameVerification() + { + return skipTlsHostnameVerification; + } } diff --git a/server/src/main/java/org/apache/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java b/server/src/main/java/org/apache/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java index 5ca33e4bc7f3..d5b182a9adfe 100644 --- a/server/src/main/java/org/apache/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java +++ b/server/src/main/java/org/apache/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java @@ -20,6 +20,7 @@ package org.apache.druid.client.cache; import net.spy.memcached.ArrayModNodeLocator; +import net.spy.memcached.ClientMode; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionObserver; @@ -37,6 +38,7 @@ import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration; +import javax.net.ssl.SSLContext; import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -56,7 +58,7 @@ public MemcachedCustomConnectionFactoryBuilder setKetamaNodeRepetitions(int repe @Override public ConnectionFactory build() { - return new DefaultConnectionFactory() + return new DefaultConnectionFactory(clientMode) { @Override public NodeLocator createLocator(List nodes) @@ -213,6 +215,45 @@ public long getAuthWaitTime() { return authWaitTime; } + + @Override + public SSLContext getSSLContext() + { + return sslContext == null ? super.getSSLContext() : sslContext; + } + + @Override + public String getHostnameForTlsVerification() + { + return hostnameForTlsVerification == null ? super.getHostnameForTlsVerification() : hostnameForTlsVerification; + } + @Override + public ClientMode getClientMode() + { + return clientMode == null ? super.getClientMode() : clientMode; + } + + @Override + public boolean skipTlsHostnameVerification() + { + return skipTlsHostnameVerification; + } + + @Override + public String toString() + { + // MURMUR_128 cannot be cast to DefaultHashAlgorithm + return "Failure Mode: " + getFailureMode().name() + ", Hash Algorithm: " + + getHashAlg() + " Max Reconnect Delay: " + + getMaxReconnectDelay() + ", Max Op Timeout: " + getOperationTimeout() + + ", Op Queue Length: " + getOpQueueLen() + ", Op Max Queue Block Time" + + getOpQueueMaxBlockTime() + ", Max Timeout Exception Threshold: " + + getTimeoutExceptionThreshold() + ", Read Buffer Size: " + + getReadBufSize() + ", Transcoder: " + getDefaultTranscoder() + + ", Operation Factory: " + getOperationFactory() + " isDaemon: " + + isDaemon() + ", Optimized: " + shouldOptimize() + ", Using Nagle: " + + useNagleAlgorithm() + ", KeepAlive: " + getKeepAlive() + ", SSLContext: " + getSSLContext().getProtocol() + ", ConnectionFactory: " + getName(); + } }; } } diff --git a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java index 4326e0e90988..831150fd6771 100644 --- a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java @@ -33,6 +33,8 @@ import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; +import net.spy.memcached.ClientMode; +import net.spy.memcached.ConnectionFactory; import net.spy.memcached.ConnectionObserver; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.MemcachedNode; @@ -62,6 +64,9 @@ import org.junit.Test; import java.net.SocketAddress; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -224,6 +229,61 @@ public void emit(Event event) } } + @Test + public void testDefaultClientMode() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException + { + ConnectionFactory connectionFactory = MemcachedCache.createConnectionFactory(memcachedCacheConfig, null, null, null); + // Ensure that clientMode is set to Static by default + Assert.assertEquals(connectionFactory.getClientMode(), ClientMode.Static); + Assert.assertNull(connectionFactory.getSSLContext()); + } + @Test + public void testConnectionFactory() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException + { + final MemcachedCacheConfig config = new MemcachedCacheConfig() + { + @Override + public boolean enableTls() + { + return true; + } + + @Override + public String getClientMode() + { + return "dynamic"; + } + @Override + public String getHosts() + { + return "localhost:9999"; + } + }; + // Dynamic mode + ConnectionFactory connectionFactoryDynamic = MemcachedCache.createConnectionFactory(config, null, null, null); + // Ensure client mode is set to the value passed in config. + Assert.assertEquals(connectionFactoryDynamic.getClientMode(), ClientMode.Dynamic); + //enableTls is true so sslContext is not null + Assert.assertNotNull(connectionFactoryDynamic.getSSLContext()); + } + + @Test + public void testInvalidClientMode() + { + final MemcachedCacheConfig config = new MemcachedCacheConfig() + { + + @Override + public String getClientMode() + { + return "invalid-name"; + } + }; + RuntimeException exception = Assert.assertThrows(RuntimeException.class, () -> { + MemcachedCache.createConnectionFactory(config, null, null, null); + }); + Assert.assertEquals(exception.getMessage(), "Invalid value provided for `druid.cache.clientMode`. Value must be 'static' or 'dynamic'."); + } @Test public void testSanity() { @@ -316,6 +376,12 @@ public Collection getAvailableServers() throw new UnsupportedOperationException("not implemented"); } + @Override + public boolean refreshCertificate() + { + return true; + } + @Override public Collection getUnavailableServers() {