diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java index 168c6a982..9fe168834 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java @@ -6,7 +6,6 @@ import java.util.Arrays; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.List; @@ -26,6 +25,7 @@ import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; +import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper; import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -52,9 +52,9 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal { private List _laggingPartitions; - protected ConcurrentHashMap _laggingPartitionsMap; + protected HashMap _laggingPartitionsMap; - private ConcurrentHashMap _newLaggingPartitionsMap; + private HashMap _newLaggingPartitionsMap; private long _maxReplicaLagMs; @@ -64,15 +64,17 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal { @Override public void configure(Map configs) { + LOG.info("Configuring LaggingReplicaReassignmentGoal"); _parsedConfig = new KafkaCruiseControlConfig(configs, false); _adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig)); _balancingConstraint = new BalancingConstraint(_parsedConfig); _numWindows = _parsedConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG); _minMonitoredPartitionPercentage = _parsedConfig.getDouble(MonitorConfig.MIN_VALID_PARTITION_RATIO_CONFIG); - _laggingPartitionsMap = new ConcurrentHashMap(); + // _laggingPartitionsMap = new HashMap(); _maxReplicaLagMs = (long) configs.get(AnalyzerConfig.MAX_LAGGING_REPLICA_REASSIGN_MS); _laggingPartitions = new ArrayList(); _laggingRecoveryNeeded = false; + LOG.info("Configured LaggingReplicaReassignmentGoal"); } @Override @@ -154,26 +156,22 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op void checkIfReplicasLagging(ClusterModel clusterModel) throws OptimizationFailureException { long currentTimeMillis = System.currentTimeMillis(); - _newLaggingPartitionsMap = new ConcurrentHashMap(); + _newLaggingPartitionsMap = new HashMap(); LOG.info("Checking for lagging replicas"); - if (_laggingPartitionsMap == null) { - _laggingPartitionsMap = new ConcurrentHashMap(); - } - //List laggingPartitionInfos = clusterModel.getPartitionsWithLaggingReplicas(); - //_laggingPartitionsMap.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi)); + LOG.info("Admin client details {}, {}", _adminClient.toString(), _adminClient.hashCode()); + _laggingPartitionsMap = clusterModel.getLaggingPartitionsMap(); for (PartitionInfo partition: clusterModel.getPartitionsWithLaggingReplicas()) { LOG.info(partition.toString()); PartitionInfoWrapper piw = new PartitionInfoWrapper(partition); long lastSeenTime = _laggingPartitionsMap.getOrDefault(piw, currentTimeMillis); if (currentTimeMillis - lastSeenTime >= _maxReplicaLagMs) { - LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), - (currentTimeMillis - lastSeenTime) / (60 * 1000)); + LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), (currentTimeMillis - lastSeenTime) / (60 * 1000)); _laggingRecoveryNeeded = true; _laggingPartitions.add(partition); } _newLaggingPartitionsMap.put(piw, lastSeenTime); } - _laggingPartitionsMap = _newLaggingPartitionsMap; + clusterModel.setLaggingPartitionsMap(_newLaggingPartitionsMap); LOG.info("Lagging partitions map: {} on thread after {}", _laggingPartitionsMap.toString(), Thread.currentThread().getName()); } @@ -204,55 +202,6 @@ protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set< } catch (InterruptedException | ExecutionException e) { LOG.error("Unable to move replicas onto same brokers"); } - } - - } - protected static class PartitionInfoWrapper { - - PartitionInfo _pi; - - public PartitionInfoWrapper(PartitionInfo pi) { - this._pi = pi; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof PartitionInfoWrapper)) { - return false; - } - PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o; - PartitionInfo p2 = partitionInfoWrapperObj._pi; - if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition() - && _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { - Set p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet()); - if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode()); - result = prime * result + _pi.partition(); - result = prime * result + _pi.leader().id(); - for (Node n: _pi.inSyncReplicas()) { - result = prime * result + n.id(); - } - return result; - } - - @Override - public String toString() { - return _pi.toString(); - } - + } } - } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java index 40f4454a8..4275b53d2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java @@ -73,6 +73,7 @@ public class ClusterModel implements Serializable { private int _unknownHostId; private final Map _capacityEstimationInfoByBrokerId; private final Cluster _cluster; + private final HashMap _laggingPartitionsMap; public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio) { this(generation, monitoredPartitionsRatio, null); @@ -113,9 +114,26 @@ public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio, _monitoredPartitionsRatio = monitoredPartitionsRatio; _unknownHostId = 0; _capacityEstimationInfoByBrokerId = new HashMap<>(); + _laggingPartitionsMap = new HashMap<>(); _cluster = cluster; } + /** + * set current LaggingPartitionsMap + */ + public void setLaggingPartitionsMap(HashMap laggingPartitionsMap) { + this._laggingPartitionsMap.clear(); + this._laggingPartitionsMap.putAll(laggingPartitionsMap); + } + + /** + * get current LaggingPartititonsMap + * @return laggingPartitionsMap + */ + public HashMap getLaggingPartitionsMap() { + return this._laggingPartitionsMap; + } + /** * @return The partitions which have lagging replicas without any offline partitions. * This would imply that the ReplicaFetcher is either unable to catch up or has stopped fetching for some reason. diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java new file mode 100644 index 000000000..e7f80aff7 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.model; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; + +public class PartitionInfoWrapper { + PartitionInfo _pi; + + public PartitionInfoWrapper(PartitionInfo pi) { + this._pi = pi; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof PartitionInfoWrapper)) { + return false; + } + PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o; + PartitionInfo p2 = partitionInfoWrapperObj._pi; + if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition() + && _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { + Set p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet()); + if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode()); + result = prime * result + _pi.partition(); + result = prime * result + ((_pi.leader() == null) ? 0 : _pi.leader().id()); + for (Node n: _pi.inSyncReplicas()) { + result = prime * result + n.id(); + } + return result; + } + + @Override + public String toString() { + return _pi.toString(); + } +} diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java index c09aa69b6..67760cd57 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -15,11 +16,11 @@ import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator; -import com.linkedin.kafka.cruisecontrol.analyzer.goals.LaggingReplicaReassignmentGoal.PartitionInfoWrapper; import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; +import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.common.KafkaFuture; @@ -87,37 +88,48 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception { PartitionInfo partitionInfo2Copy = new PartitionInfo("topic2", 0, node1, replicas, isr); List partitionsWithLaggingReplicas1 = new ArrayList(); partitionsWithLaggingReplicas1.add(partitionInfo1); + HashMap partitionsWithLaggingReplicasMap1 = new HashMap(); + partitionsWithLaggingReplicasMap1.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis()); List partitionsWithLaggingReplicas2 = new ArrayList(); partitionsWithLaggingReplicas2.add(partitionInfo2); + HashMap partitionsWithLaggingReplicasMap2 = new HashMap(); + partitionsWithLaggingReplicasMap2.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); List partitionsWithLaggingReplicas12 = new ArrayList(partitionsWithLaggingReplicas1); partitionsWithLaggingReplicas12.add(partitionInfo2); + HashMap partitionsWithLaggingReplicasMap12 = new HashMap(); + partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() - 1000); + partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); // send 1 partition w/ lagging replica EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(6); EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes(); EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0); - + clusterModel.clearSortedReplicas(); EasyMock.expectLastCall().andAnswer(() -> { return null; }).anyTimes(); - clusterModel.clearSortedReplicas(); - clusterModel.clearSortedReplicas(); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2); + clusterModel.setLaggingPartitionsMap(EasyMock.anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + return null; + }).anyTimes(); + + // clusterModel.clearSortedReplicas(); EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet(Arrays.asList(replicas).stream() .map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes(); EasyMock.expect(clusterModel.aliveBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(adminClient.alterPartitionReassignments(EasyMock.anyObject())).andReturn(aprResult); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap()); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap1); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap2); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2); + EasyMock.replay(clusterModel, clusterModelStats, clusterModelStatsComparator, adminClient, aprResult); // optimize once before sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } // optimize again after sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); // should have been moved as 1st seen before sleep @@ -165,20 +177,32 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti List partitionsWithLaggingReplicas12 = new ArrayList(); partitionsWithLaggingReplicas12.add(partitionInfo1); partitionsWithLaggingReplicas12.add(partitionInfo2); + HashMap partitionsWithLaggingReplicasMap12 = new HashMap(); + partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis()); + partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); + HashMap partitionsWithLaggingReplicasMap122 = new HashMap(); + partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() + 2000); + partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis() + 2000); // send 1 partition w/ lagging replica EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(9); EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes(); EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0); + clusterModel.clearSortedReplicas(); EasyMock.expectLastCall().andAnswer(() -> { return null; }).anyTimes(); - clusterModel.clearSortedReplicas(); - clusterModel.clearSortedReplicas(); - clusterModel.clearSortedReplicas(); + clusterModel.setLaggingPartitionsMap(EasyMock.anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + return null; + }).anyTimes(); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12).times(2); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()).times(2); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap()).times(2); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2); + EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap122).times(2); EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet(Arrays.asList(replicas).stream() .map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes();