forked from hector-client/hector
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
confirm hector-client#647
- Loading branch information
Showing
1 changed file
with
72 additions
and
0 deletions.
There are no files selected for viewing
72 changes: 72 additions & 0 deletions
72
core/src/main/java/me/prettyprint/cassandra/connection/BlockedHealthBalncingPolicy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package me.prettyprint.cassandra.connection; | ||
|
||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
import me.prettyprint.cassandra.connection.factory.HClientFactory; | ||
import me.prettyprint.cassandra.service.CassandraHost; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import com.google.common.collect.Lists; | ||
|
||
public class BlockedHealthBalncingPolicy implements LoadBalancingPolicy { | ||
|
||
private static final long serialVersionUID = 4216966823555634515L; | ||
private static final Logger log = LoggerFactory.getLogger(BlockedHealthBalncingPolicy.class); | ||
private static final int ZERO_VALUE = 0; | ||
|
||
private final class FilteringCompare implements Comparator<HClientPool> { | ||
|
||
public int compare(HClientPool o1, HClientPool o2) { | ||
|
||
if ( log.isDebugEnabled() ) { | ||
log.debug("comparing 1: {} / count {} / Blocked {} / availableClientQueue {} with 2: {} / count {} / Blocked {} / availableClientQueue {} ", | ||
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o1.getNumBlockedThreads(), o1.getNumIdle(), | ||
o2.getCassandraHost(), o2.getNumActive(), o2.getNumBlockedThreads(), o2.getNumIdle()}); | ||
} | ||
|
||
if(o2.getNumIdle() == ZERO_VALUE || o2.getNumIdle() == ZERO_VALUE){ | ||
log.warn("finding 0 hector pool {}:{} /{}:{}", | ||
new Object[]{o1.getCassandraHost(), o1.getNumIdle(), o2.getCassandraHost(), o2.getNumIdle()}); | ||
} | ||
|
||
if(o1.getNumBlockedThreads() != o2.getNumBlockedThreads()){ | ||
return o1.getNumBlockedThreads() - o2.getNumBlockedThreads(); | ||
} else { | ||
return o2.getNumIdle() - o1.getNumIdle(); | ||
} | ||
} | ||
} | ||
|
||
public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) { | ||
|
||
List<HClientPool> hClientPoolList = Lists.newArrayList(pools); | ||
Collections.shuffle(hClientPoolList); | ||
Collections.sort(hClientPoolList, new FilteringCompare()); | ||
|
||
Iterator<HClientPool> iterator = hClientPoolList.iterator(); | ||
HClientPool concurrentHClientPool = iterator.next(); | ||
|
||
if ( excludeHosts != null && excludeHosts.size() > 0 ) { | ||
while (iterator.hasNext()) { | ||
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) { | ||
break; | ||
} | ||
concurrentHClientPool = (ConcurrentHClientPool) iterator.next(); | ||
} | ||
} | ||
|
||
return concurrentHClientPool; | ||
} | ||
|
||
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) { | ||
return new ConcurrentHClientPool(clientFactory, host); | ||
} | ||
|
||
} |