diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index 7fe4ebdad2..eb25337ea5 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -1,17 +1,16 @@ package redis.clients.jedis; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import redis.clients.jedis.exceptions.JedisConnectionException; +import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey; import java.util.Map; -import java.util.Random; import java.util.Set; -import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import redis.clients.jedis.exceptions.JedisConnectionException; public abstract class JedisClusterConnectionHandler { protected final JedisClusterInfoCache cache; - private ThreadLocal random = new ThreadLocal(); abstract Jedis getConnection(); @@ -29,7 +28,6 @@ public void returnBrokenConnection(Jedis connection) { public JedisClusterConnectionHandler(Set nodes, final GenericObjectPoolConfig poolConfig) { this.cache = new JedisClusterInfoCache(poolConfig); - this.random.set(new Random()); initializeSlotsCache(nodes, poolConfig); } @@ -80,9 +78,4 @@ public void renewSlotCache() { } } - protected JedisPool getRandomConnection() { - Object[] nodeArray = cache.getNodes().values().toArray(); - return (JedisPool) (nodeArray[this.random.get().nextInt(nodeArray.length)]); - } - } diff --git a/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java deleted file mode 100644 index e13f227ca4..0000000000 --- a/src/main/java/redis/clients/jedis/JedisRandomConnectionHandler.java +++ /dev/null @@ -1,26 +0,0 @@ -package redis.clients.jedis; - -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; - -import java.util.Set; - -public class JedisRandomConnectionHandler extends JedisClusterConnectionHandler { - - public JedisRandomConnectionHandler(Set nodes) { - super(nodes, new GenericObjectPoolConfig()); - } - - public JedisRandomConnectionHandler(Set nodes, - final GenericObjectPoolConfig poolConfig) { - super(nodes, poolConfig); - } - - public Jedis getConnection() { - return getRandomConnection().getResource(); - } - - @Override - Jedis getConnectionFromSlot(int slot) { - return getRandomConnection().getResource(); - } -} diff --git a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java index 786eb050de..4bc4f9732e 100644 --- a/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/tests/JedisClusterTest.java @@ -5,6 +5,13 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.After; @@ -335,6 +342,33 @@ public void testCloseable() { } } + @Test + public void testJedisClusterRunsWithMultithreaded() throws InterruptedException, ExecutionException { + Set jedisClusterNode = new HashSet(); + jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); + final JedisCluster jc = new JedisCluster(jedisClusterNode); + jc.set("foo", "bar"); + + ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 100, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(10)); + List> futures = new ArrayList>(); + for (int i = 0 ; i < 50 ; i++) { + executor.submit(new Callable() { + @Override + public String call() throws Exception { + // FIXME : invalidate slot cache from JedisCluster to test random connection also does work + return jc.get("foo"); + } + }); + } + + for (Future future : futures) { + String value = future.get(); + assertEquals("bar", value); + } + + jc.close(); + } + private static String getNodeServingSlotRange(String infoOutput) { // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 1394372400827 0 connected 5461-10922 for (String infoLine : infoOutput.split("\n")) {