Skip to content

Commit

Permalink
fix/LaggingReplicaReassignmentGoal: store state in clusterModel
Browse files Browse the repository at this point in the history
  • Loading branch information
mavemuri committed Mar 17, 2022
1 parent 10d601f commit 615d581
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.util.Arrays;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -26,6 +25,7 @@
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
Expand All @@ -52,9 +52,9 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal {

private List<PartitionInfo> _laggingPartitions;

protected ConcurrentHashMap<PartitionInfoWrapper, Long> _laggingPartitionsMap;
protected HashMap<PartitionInfoWrapper, Long> _laggingPartitionsMap;

private ConcurrentHashMap<PartitionInfoWrapper, Long> _newLaggingPartitionsMap;
private HashMap<PartitionInfoWrapper, Long> _newLaggingPartitionsMap;

private long _maxReplicaLagMs;

Expand All @@ -64,15 +64,17 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal {

@Override
public void configure(Map<String, ?> configs) {
LOG.info("Configuring LaggingReplicaReassignmentGoal");
_parsedConfig = new KafkaCruiseControlConfig(configs, false);
_adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig));
_balancingConstraint = new BalancingConstraint(_parsedConfig);
_numWindows = _parsedConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG);
_minMonitoredPartitionPercentage = _parsedConfig.getDouble(MonitorConfig.MIN_VALID_PARTITION_RATIO_CONFIG);
_laggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
// _laggingPartitionsMap = new HashMap<PartitionInfoWrapper, Long>();
_maxReplicaLagMs = (long) configs.get(AnalyzerConfig.MAX_LAGGING_REPLICA_REASSIGN_MS);
_laggingPartitions = new ArrayList<PartitionInfo>();
_laggingRecoveryNeeded = false;
LOG.info("Configured LaggingReplicaReassignmentGoal");
}

@Override
Expand Down Expand Up @@ -154,26 +156,22 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op

void checkIfReplicasLagging(ClusterModel clusterModel) throws OptimizationFailureException {
long currentTimeMillis = System.currentTimeMillis();
_newLaggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
_newLaggingPartitionsMap = new HashMap<PartitionInfoWrapper, Long>();
LOG.info("Checking for lagging replicas");
if (_laggingPartitionsMap == null) {
_laggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
}
//List<PartitionInfo> laggingPartitionInfos = clusterModel.getPartitionsWithLaggingReplicas();
//_laggingPartitionsMap.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi));
LOG.info("Admin client details {}, {}", _adminClient.toString(), _adminClient.hashCode());
_laggingPartitionsMap = clusterModel.getLaggingPartitionsMap();
for (PartitionInfo partition: clusterModel.getPartitionsWithLaggingReplicas()) {
LOG.info(partition.toString());
PartitionInfoWrapper piw = new PartitionInfoWrapper(partition);
long lastSeenTime = _laggingPartitionsMap.getOrDefault(piw, currentTimeMillis);
if (currentTimeMillis - lastSeenTime >= _maxReplicaLagMs) {
LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(),
(currentTimeMillis - lastSeenTime) / (60 * 1000));
LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), (currentTimeMillis - lastSeenTime) / (60 * 1000));
_laggingRecoveryNeeded = true;
_laggingPartitions.add(partition);
}
_newLaggingPartitionsMap.put(piw, lastSeenTime);
}
_laggingPartitionsMap = _newLaggingPartitionsMap;
clusterModel.setLaggingPartitionsMap(_newLaggingPartitionsMap);
LOG.info("Lagging partitions map: {} on thread after {}", _laggingPartitionsMap.toString(), Thread.currentThread().getName());
}

Expand Down Expand Up @@ -204,55 +202,6 @@ protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<
} catch (InterruptedException | ExecutionException e) {
LOG.error("Unable to move replicas onto same brokers");
}
}

}
protected static class PartitionInfoWrapper {

PartitionInfo _pi;

public PartitionInfoWrapper(PartitionInfo pi) {
this._pi = pi;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof PartitionInfoWrapper)) {
return false;
}
PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o;
PartitionInfo p2 = partitionInfoWrapperObj._pi;
if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition()
&& _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) {
Set<Integer> p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet());
if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) {
return true;
}
}
return false;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode());
result = prime * result + _pi.partition();
result = prime * result + _pi.leader().id();
for (Node n: _pi.inSyncReplicas()) {
result = prime * result + n.id();
}
return result;
}

@Override
public String toString() {
return _pi.toString();
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ClusterModel implements Serializable {
private int _unknownHostId;
private final Map<Integer, String> _capacityEstimationInfoByBrokerId;
private final Cluster _cluster;
private final HashMap<PartitionInfoWrapper, Long> _laggingPartitionsMap;

public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio) {
this(generation, monitoredPartitionsRatio, null);
Expand Down Expand Up @@ -113,9 +114,26 @@ public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio,
_monitoredPartitionsRatio = monitoredPartitionsRatio;
_unknownHostId = 0;
_capacityEstimationInfoByBrokerId = new HashMap<>();
_laggingPartitionsMap = new HashMap<>();
_cluster = cluster;
}

/**
* set current LaggingPartitionsMap
*/
public void setLaggingPartitionsMap(HashMap<PartitionInfoWrapper, Long> laggingPartitionsMap) {
this._laggingPartitionsMap.clear();
this._laggingPartitionsMap.putAll(laggingPartitionsMap);
}

/**
* get current LaggingPartititonsMap
* @return laggingPartitionsMap
*/
public HashMap<PartitionInfoWrapper, Long> getLaggingPartitionsMap() {
return this._laggingPartitionsMap;
}

/**
* @return The partitions which have lagging replicas without any offline partitions.
* This would imply that the ReplicaFetcher is either unable to catch up or has stopped fetching for some reason.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.model;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

public class PartitionInfoWrapper {
PartitionInfo _pi;

public PartitionInfoWrapper(PartitionInfo pi) {
this._pi = pi;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof PartitionInfoWrapper)) {
return false;
}
PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o;
PartitionInfo p2 = partitionInfoWrapperObj._pi;
if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition()
&& _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) {
Set<Integer> p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet());
if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) {
return true;
}
}
return false;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode());
result = prime * result + _pi.partition();
result = prime * result + ((_pi.leader() == null) ? 0 : _pi.leader().id());
for (Node n: _pi.inSyncReplicas()) {
result = prime * result + n.id();
}
return result;
}

@Override
public String toString() {
return _pi.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -15,11 +16,11 @@
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LaggingReplicaReassignmentGoal.PartitionInfoWrapper;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.PartitionInfoWrapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.common.KafkaFuture;
Expand Down Expand Up @@ -87,37 +88,48 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception {
PartitionInfo partitionInfo2Copy = new PartitionInfo("topic2", 0, node1, replicas, isr);
List<PartitionInfo> partitionsWithLaggingReplicas1 = new ArrayList<PartitionInfo>();
partitionsWithLaggingReplicas1.add(partitionInfo1);
HashMap<PartitionInfoWrapper, Long> partitionsWithLaggingReplicasMap1 = new HashMap<PartitionInfoWrapper, Long>();
partitionsWithLaggingReplicasMap1.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis());
List<PartitionInfo> partitionsWithLaggingReplicas2 = new ArrayList<PartitionInfo>();
partitionsWithLaggingReplicas2.add(partitionInfo2);
HashMap<PartitionInfoWrapper, Long> partitionsWithLaggingReplicasMap2 = new HashMap<PartitionInfoWrapper, Long>();
partitionsWithLaggingReplicasMap2.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis());
List<PartitionInfo> partitionsWithLaggingReplicas12 = new ArrayList<PartitionInfo>(partitionsWithLaggingReplicas1);
partitionsWithLaggingReplicas12.add(partitionInfo2);
HashMap<PartitionInfoWrapper, Long> partitionsWithLaggingReplicasMap12 = new HashMap<PartitionInfoWrapper, Long>();
partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() - 1000);
partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis());
// send 1 partition w/ lagging replica
EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(6);
EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes();
EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0);

clusterModel.clearSortedReplicas();
EasyMock.expectLastCall().andAnswer(() -> {
return null;
}).anyTimes();
clusterModel.clearSortedReplicas();
clusterModel.clearSortedReplicas();
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList<PartitionInfo>());
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2);
clusterModel.setLaggingPartitionsMap(EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
return null;
}).anyTimes();

// clusterModel.clearSortedReplicas();
EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet<Broker>()).anyTimes();
EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet<Broker>(Arrays.asList(replicas).stream()
.map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes();
EasyMock.expect(clusterModel.aliveBrokers()).andReturn(new TreeSet<Broker>()).anyTimes();
EasyMock.expect(adminClient.alterPartitionReassignments(EasyMock.anyObject())).andReturn(aprResult);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap<PartitionInfoWrapper, Long>());
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList<PartitionInfo>());
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap1);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap2);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas2);

EasyMock.replay(clusterModel, clusterModelStats, clusterModelStatsComparator, adminClient, aprResult);
// optimize once before sleep
goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet<String>(), new HashSet<Integer>(), new HashSet<Integer>()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// optimize again after sleep
goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet<String>(), new HashSet<Integer>(), new HashSet<Integer>()));
// should have been moved as 1st seen before sleep
Expand Down Expand Up @@ -165,20 +177,32 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti
List<PartitionInfo> partitionsWithLaggingReplicas12 = new ArrayList<PartitionInfo>();
partitionsWithLaggingReplicas12.add(partitionInfo1);
partitionsWithLaggingReplicas12.add(partitionInfo2);
HashMap<PartitionInfoWrapper, Long> partitionsWithLaggingReplicasMap12 = new HashMap<PartitionInfoWrapper, Long>();
partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis());
partitionsWithLaggingReplicasMap12.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis());
HashMap<PartitionInfoWrapper, Long> partitionsWithLaggingReplicasMap122 = new HashMap<PartitionInfoWrapper, Long>();
partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo1), System.currentTimeMillis() + 2000);
partitionsWithLaggingReplicasMap122.put(new PartitionInfoWrapper(partitionInfo2), System.currentTimeMillis() + 2000);
// send 1 partition w/ lagging replica
EasyMock.expect(clusterModelStats.numPartitionsWithLaggingReplicas()).andReturn(1).times(9);
EasyMock.expect(clusterModel.getClusterStats(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(clusterModelStats).anyTimes();
EasyMock.expect(clusterModelStatsComparator.compare(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(0);

clusterModel.clearSortedReplicas();
EasyMock.expectLastCall().andAnswer(() -> {
return null;
}).anyTimes();
clusterModel.clearSortedReplicas();
clusterModel.clearSortedReplicas();
clusterModel.clearSortedReplicas();
clusterModel.setLaggingPartitionsMap(EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
return null;
}).anyTimes();

EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap12).times(2);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList<PartitionInfo>()).times(2);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(new HashMap<PartitionInfoWrapper, Long>()).times(2);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12).times(2);
EasyMock.expect(clusterModel.getLaggingPartitionsMap()).andReturn(partitionsWithLaggingReplicasMap122).times(2);
EasyMock.expect(clusterModel.brokenBrokers()).andReturn(new TreeSet<Broker>()).anyTimes();
EasyMock.expect(clusterModel.brokers()).andReturn(new TreeSet<Broker>(Arrays.asList(replicas).stream()
.map(node -> generateBroker(node.id(), 0)).collect(Collectors.toList()))).anyTimes();
Expand Down

0 comments on commit 615d581

Please sign in to comment.