diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java index 9fe168834..168c6a982 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java @@ -6,6 +6,7 @@ import java.util.Arrays; import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.List; @@ -25,7 +26,6 @@ import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; -import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper; import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -52,9 +52,9 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal { private List _laggingPartitions; - protected HashMap _laggingPartitionsMap; + protected ConcurrentHashMap _laggingPartitionsMap; - private HashMap _newLaggingPartitionsMap; + private ConcurrentHashMap _newLaggingPartitionsMap; private long _maxReplicaLagMs; @@ -64,17 +64,15 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal { @Override public void configure(Map configs) { - LOG.info("Configuring LaggingReplicaReassignmentGoal"); _parsedConfig = new KafkaCruiseControlConfig(configs, false); _adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig)); _balancingConstraint = new BalancingConstraint(_parsedConfig); _numWindows = _parsedConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG); _minMonitoredPartitionPercentage = _parsedConfig.getDouble(MonitorConfig.MIN_VALID_PARTITION_RATIO_CONFIG); - // _laggingPartitionsMap = new HashMap(); + _laggingPartitionsMap = new ConcurrentHashMap(); _maxReplicaLagMs = (long) configs.get(AnalyzerConfig.MAX_LAGGING_REPLICA_REASSIGN_MS); _laggingPartitions = new ArrayList(); _laggingRecoveryNeeded = false; - LOG.info("Configured LaggingReplicaReassignmentGoal"); } @Override @@ -156,22 +154,26 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op void checkIfReplicasLagging(ClusterModel clusterModel) throws OptimizationFailureException { long currentTimeMillis = System.currentTimeMillis(); - _newLaggingPartitionsMap = new HashMap(); + _newLaggingPartitionsMap = new ConcurrentHashMap(); LOG.info("Checking for lagging replicas"); - LOG.info("Admin client details {}, {}", _adminClient.toString(), _adminClient.hashCode()); - _laggingPartitionsMap = clusterModel.getLaggingPartitionsMap(); + if (_laggingPartitionsMap == null) { + _laggingPartitionsMap = new ConcurrentHashMap(); + } + //List laggingPartitionInfos = clusterModel.getPartitionsWithLaggingReplicas(); + //_laggingPartitionsMap.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi)); for (PartitionInfo partition: clusterModel.getPartitionsWithLaggingReplicas()) { LOG.info(partition.toString()); PartitionInfoWrapper piw = new PartitionInfoWrapper(partition); long lastSeenTime = _laggingPartitionsMap.getOrDefault(piw, currentTimeMillis); if (currentTimeMillis - lastSeenTime >= _maxReplicaLagMs) { - LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), (currentTimeMillis - lastSeenTime) / (60 * 1000)); + LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), + (currentTimeMillis - lastSeenTime) / (60 * 1000)); _laggingRecoveryNeeded = true; _laggingPartitions.add(partition); } _newLaggingPartitionsMap.put(piw, lastSeenTime); } - clusterModel.setLaggingPartitionsMap(_newLaggingPartitionsMap); + _laggingPartitionsMap = _newLaggingPartitionsMap; LOG.info("Lagging partitions map: {} on thread after {}", _laggingPartitionsMap.toString(), Thread.currentThread().getName()); } @@ -202,6 +204,55 @@ protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set< } catch (InterruptedException | ExecutionException e) { LOG.error("Unable to move replicas onto same brokers"); } - } + } + + } + protected static class PartitionInfoWrapper { + + PartitionInfo _pi; + + public PartitionInfoWrapper(PartitionInfo pi) { + this._pi = pi; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof PartitionInfoWrapper)) { + return false; + } + PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o; + PartitionInfo p2 = partitionInfoWrapperObj._pi; + if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition() + && _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { + Set p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet()); + if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode()); + result = prime * result + _pi.partition(); + result = prime * result + _pi.leader().id(); + for (Node n: _pi.inSyncReplicas()) { + result = prime * result + n.id(); + } + return result; + } + + @Override + public String toString() { + return _pi.toString(); + } + } + } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java index 4275b53d2..40f4454a8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/ClusterModel.java @@ -73,7 +73,6 @@ public class ClusterModel implements Serializable { private int _unknownHostId; private final Map _capacityEstimationInfoByBrokerId; private final Cluster _cluster; - private final HashMap _laggingPartitionsMap; public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio) { this(generation, monitoredPartitionsRatio, null); @@ -114,26 +113,9 @@ public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio, _monitoredPartitionsRatio = monitoredPartitionsRatio; _unknownHostId = 0; _capacityEstimationInfoByBrokerId = new HashMap<>(); - _laggingPartitionsMap = new HashMap<>(); _cluster = cluster; } - /** - * set current LaggingPartitionsMap - */ - public void setLaggingPartitionsMap(HashMap laggingPartitionsMap) { - this._laggingPartitionsMap.clear(); - this._laggingPartitionsMap.putAll(laggingPartitionsMap); - } - - /** - * get current LaggingPartititonsMap - * @return laggingPartitionsMap - */ - public HashMap getLaggingPartitionsMap() { - return this._laggingPartitionsMap; - } - /** * @return The partitions which have lagging replicas without any offline partitions. * This would imply that the ReplicaFetcher is either unable to catch up or has stopped fetching for some reason. diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java deleted file mode 100644 index e7f80aff7..000000000 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/PartitionInfoWrapper.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. - */ - -package com.linkedin.kafka.cruisecontrol.model; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; - -public class PartitionInfoWrapper { - PartitionInfo _pi; - - public PartitionInfoWrapper(PartitionInfo pi) { - this._pi = pi; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof PartitionInfoWrapper)) { - return false; - } - PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o; - PartitionInfo p2 = partitionInfoWrapperObj._pi; - if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition() - && _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { - Set p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet()); - if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode()); - result = prime * result + _pi.partition(); - result = prime * result + ((_pi.leader() == null) ? 0 : _pi.leader().id()); - for (Node n: _pi.inSyncReplicas()) { - result = prime * result + n.id(); - } - return result; - } - - @Override - public String toString() { - return _pi.toString(); - } -} diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java index 67760cd57..c09aa69b6 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java @@ -6,7 +6,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -16,11 +15,11 @@ import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.LaggingReplicaReassignmentGoal.PartitionInfoWrapper; import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; -import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.common.KafkaFuture; @@ -88,48 +87,37 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception { PartitionInfo partitionInfo2Copy = new PartitionInfo("topic2", 0, node1, replicas, isr); List partitionsWithLaggingReplicas1 = new ArrayList(); partitionsWithLaggingReplicas1.add(partitionInfo1); - HashMap partitionsWithLaggingReplicasMap1 = new HashMap(); - partitionsWithLaggingReplicasMap1.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis()); List partitionsWithLaggingReplicas2 = new ArrayList(); partitionsWithLaggingReplicas2.add(partitionInfo2); - HashMap partitionsWithLaggingReplicasMap2 = new HashMap(); - partitionsWithLaggingReplicasMap2.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); List partitionsWithLaggingReplicas12 = new ArrayList(partitionsWithLaggingReplicas1); partitionsWithLaggingReplicas12.add(partitionInfo2); - HashMap partitionsWithLaggingReplicasMap12 = new HashMap(); - partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() - 1000); - partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); // send 1 partition w/ lagging replica EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(6); EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes(); EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0); - clusterModel.clearSortedReplicas(); - EasyMock.expectLastCall().andAnswer(() -> { - return null; - }).anyTimes(); - clusterModel.setLaggingPartitionsMap(EasyMock.anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { return null; }).anyTimes(); - - // clusterModel.clearSortedReplicas(); + clusterModel.clearSortedReplicas(); + clusterModel.clearSortedReplicas(); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12); + EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2); EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet(Arrays.asList(replicas).stream() .map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes(); EasyMock.expect(clusterModel.aliveBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(adminClient.alterPartitionReassignments(EasyMock.anyObject())).andReturn(aprResult); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap()); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap1); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap2); - EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2); - EasyMock.replay(clusterModel, clusterModelStats, clusterModelStatsComparator, adminClient, aprResult); // optimize once before sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } // optimize again after sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); // should have been moved as 1st seen before sleep @@ -177,32 +165,20 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti List partitionsWithLaggingReplicas12 = new ArrayList(); partitionsWithLaggingReplicas12.add(partitionInfo1); partitionsWithLaggingReplicas12.add(partitionInfo2); - HashMap partitionsWithLaggingReplicasMap12 = new HashMap(); - partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis()); - partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis()); - HashMap partitionsWithLaggingReplicasMap122 = new HashMap(); - partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() + 2000); - partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis() + 2000); // send 1 partition w/ lagging replica EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(9); EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes(); EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0); - clusterModel.clearSortedReplicas(); EasyMock.expectLastCall().andAnswer(() -> { return null; }).anyTimes(); - clusterModel.setLaggingPartitionsMap(EasyMock.anyObject()); - EasyMock.expectLastCall().andAnswer(() -> { - return null; - }).anyTimes(); - + clusterModel.clearSortedReplicas(); + clusterModel.clearSortedReplicas(); + clusterModel.clearSortedReplicas(); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12).times(2); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()).times(2); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap()).times(2); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2); - EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap122).times(2); EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet()).anyTimes(); EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet(Arrays.asList(replicas).stream() .map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes();