diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index 6f263f87f..45178422a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -274,7 +274,10 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) { + "an already ongoing partition reassignment.", e); } if (!partitionsBeingReassigned.isEmpty()) { - if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) { + if (_config.getBoolean(ExecutorConfig.DELETE_STALE_PARTITIONS_REASSIGNMENTS)) { + LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned); + _executor.cancelStaleReassignments(); + } else if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) { LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned); _executor.fixStuckPartitionReassignments(); } else { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java index 168c6a982..17b9290bf 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoal.java @@ -5,6 +5,7 @@ package com.linkedin.kafka.cruisecontrol.analyzer.goals; import java.util.Arrays; +import java.util.Collections; import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.closeAdminClientWithTimeout; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.createAdminClient; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.parseAdminClientConfigs; @@ -48,31 +50,26 @@ public class LaggingReplicaReassignmentGoal extends AbstractGoal { private static final Logger LOG = LoggerFactory.getLogger(LaggingReplicaReassignmentGoal.class); - private boolean _laggingRecoveryNeeded; - private List _laggingPartitions; + protected static final ConcurrentHashMap LAGGING_PARTITIONS_MAP = new ConcurrentHashMap(); + private static final Object UPDATE_LAGGING_PARTITIONS_MAP_LOCK = new Object(); - protected ConcurrentHashMap _laggingPartitionsMap; + private static volatile boolean LAGGING_RECOVERY_NEEDED = false; - private ConcurrentHashMap _newLaggingPartitionsMap; - - private long _maxReplicaLagMs; - - private AdminClient _adminClient; + private static List LAGGING_PARTITIONS_LIST = Collections.synchronizedList(new ArrayList()); private KafkaCruiseControlConfig _parsedConfig; + private long _maxReplicaLagMs; + @Override public void configure(Map configs) { + LOG.info("Configuring LaggingReplicaReassignmentGoal"); _parsedConfig = new KafkaCruiseControlConfig(configs, false); - _adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig)); _balancingConstraint = new BalancingConstraint(_parsedConfig); _numWindows = _parsedConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG); _minMonitoredPartitionPercentage = _parsedConfig.getDouble(MonitorConfig.MIN_VALID_PARTITION_RATIO_CONFIG); - _laggingPartitionsMap = new ConcurrentHashMap(); _maxReplicaLagMs = (long) configs.get(AnalyzerConfig.MAX_LAGGING_REPLICA_REASSIGN_MS); - _laggingPartitions = new ArrayList(); - _laggingRecoveryNeeded = false; } @Override @@ -146,7 +143,7 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op LOG.info("updateGoalState"); checkIfReplicasLagging(clusterModel); - if (!_laggingRecoveryNeeded) { + if (!LAGGING_RECOVERY_NEEDED) { finish(); } @@ -154,55 +151,63 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op void checkIfReplicasLagging(ClusterModel clusterModel) throws OptimizationFailureException { long currentTimeMillis = System.currentTimeMillis(); - _newLaggingPartitionsMap = new ConcurrentHashMap(); + ConcurrentHashMap newLaggingPartitionsMap = new ConcurrentHashMap(); LOG.info("Checking for lagging replicas"); - if (_laggingPartitionsMap == null) { - _laggingPartitionsMap = new ConcurrentHashMap(); - } //List laggingPartitionInfos = clusterModel.getPartitionsWithLaggingReplicas(); - //_laggingPartitionsMap.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi)); + //LAGGING_PARTITIONS_MAP.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi)); for (PartitionInfo partition: clusterModel.getPartitionsWithLaggingReplicas()) { LOG.info(partition.toString()); PartitionInfoWrapper piw = new PartitionInfoWrapper(partition); - long lastSeenTime = _laggingPartitionsMap.getOrDefault(piw, currentTimeMillis); + long lastSeenTime = LAGGING_PARTITIONS_MAP.getOrDefault(piw, currentTimeMillis); if (currentTimeMillis - lastSeenTime >= _maxReplicaLagMs) { LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(), (currentTimeMillis - lastSeenTime) / (60 * 1000)); - _laggingRecoveryNeeded = true; - _laggingPartitions.add(partition); + LAGGING_RECOVERY_NEEDED = true; + LAGGING_PARTITIONS_LIST.add(partition); } - _newLaggingPartitionsMap.put(piw, lastSeenTime); + newLaggingPartitionsMap.put(piw, lastSeenTime); + } + synchronized (UPDATE_LAGGING_PARTITIONS_MAP_LOCK) { + LAGGING_PARTITIONS_MAP.clear(); + LAGGING_PARTITIONS_MAP.putAll(newLaggingPartitionsMap); } - _laggingPartitionsMap = _newLaggingPartitionsMap; - LOG.info("Lagging partitions map: {} on thread after {}", _laggingPartitionsMap.toString(), Thread.currentThread().getName()); + LOG.info("Lagging partitions map: {} on thread after {}", LAGGING_PARTITIONS_MAP.toString(), Thread.currentThread().getName()); } List getLaggingPartitions() { - return _laggingPartitions; + return LAGGING_PARTITIONS_LIST; } @Override protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException { - LOG.info("Current lagging partitions: {} ", _laggingPartitions.toString()); - if (_laggingRecoveryNeeded) { - Map> reassignments = new HashMap<>(); - // gather all the lagging partitions into a reassignments map - for (PartitionInfo laggingPartition: _laggingPartitions) { - List laggingReplicas = new LinkedList(Arrays.asList(laggingPartition.replicas())); - reassignments.put(new TopicPartition(laggingPartition.topic(), laggingPartition.partition()), - Optional.of(new NewPartitionReassignment(laggingReplicas.stream().map(node -> node.id()).collect(Collectors.toList())))); - _laggingPartitionsMap.remove(new PartitionInfoWrapper(laggingPartition)); - } - // use admin client to move them to same brokers - try { - _adminClient.alterPartitionReassignments(reassignments).all().get(); - reassignments.entrySet().stream().forEach(e -> LOG.info("Moved partition {}: {}", e.getKey(), e.getValue().get().targetReplicas())); - _laggingPartitions.clear(); - _laggingRecoveryNeeded = false; - } catch (InterruptedException | ExecutionException e) { - LOG.error("Unable to move replicas onto same brokers"); + LOG.info("Current lagging partitions: {} ", LAGGING_PARTITIONS_LIST.toString()); + if (LAGGING_RECOVERY_NEEDED) { + synchronized (UPDATE_LAGGING_PARTITIONS_MAP_LOCK) { + Map> reassignments = new HashMap<>(); + // gather all the lagging partitions into a reassignments map + for (PartitionInfo laggingPartition: LAGGING_PARTITIONS_LIST) { + List laggingReplicas = new LinkedList(Arrays.asList(laggingPartition.replicas())); + reassignments.put(new TopicPartition(laggingPartition.topic(), laggingPartition.partition()), + Optional.of(new NewPartitionReassignment(laggingReplicas.stream().map(node -> node.id()).collect(Collectors.toList())))); + //LAGGING_PARTITIONS_MAP.remove(new PartitionInfoWrapper(laggingPartition)); + } + // use admin client to move them to same brokers + LOG.info("Creating adminClient for partition reassignment"); + AdminClient adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig)); + try { + adminClient.alterPartitionReassignments(reassignments).all().get(); + reassignments.entrySet().stream(). + forEach(e -> LOG.info("Moved partition {}: {}", e.getKey(), e.getValue().get().targetReplicas())); + LAGGING_PARTITIONS_LIST.clear(); + LAGGING_RECOVERY_NEEDED = false; + LAGGING_PARTITIONS_MAP.clear(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Unable to move replicas onto same brokers"); + } + LOG.info("Closing adminClient"); + closeAdminClientWithTimeout(adminClient); } } @@ -226,7 +231,11 @@ public boolean equals(Object o) { PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o; PartitionInfo p2 = partitionInfoWrapperObj._pi; if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition() - && _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { + && ( + (_pi.leader() == null && p2.leader() == null) + || ((_pi.leader() != null && p2.leader() != null) && (_pi.leader().id() == p2.leader().id())) + ) + && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) { Set p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet()); if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) { return true; @@ -241,7 +250,7 @@ public int hashCode() { int result = 1; result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode()); result = prime * result + _pi.partition(); - result = prime * result + _pi.leader().id(); + result = prime * result + ((_pi.leader() == null) ? 0 : _pi.leader().id()); for (Node n: _pi.inSyncReplicas()) { result = prime * result + n.id(); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index eb38adcde..0a34e161b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -448,6 +448,14 @@ private ExecutorConfig() { + " Required because there can be cases where the rassignment gets into a limbo state during reassignment where it cannot" + " finish the reassignemnt or revert back to original (Eg. dest & origin brokers both went down)"; + /** + * delete.stale.partition.reassignments + */ + public static final String DELETE_STALE_PARTITIONS_REASSIGNMENTS = "delete.stale.partition.reassignments"; + public static final boolean DEFAULT_DELETE_STALE_PARTITIONS_REASSIGNMENTS = false; + public static final String DELETE_STALE_PARTITIONS_REASSIGNMENTS_DOC = "Delete Stale partition reassignments- eg. from " + + " an older instance of CruiseControl which died due t various reasons(node down/etc)"; + /** * Define configs for Executor. * @@ -712,6 +720,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_REMOVE_STUCK_PARTITIONS_REASSIGNMENTS, ConfigDef.Importance.HIGH, - REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC); + REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC) + .define(DELETE_STALE_PARTITIONS_REASSIGNMENTS, + ConfigDef.Type.BOOLEAN, + DEFAULT_DELETE_STALE_PARTITIONS_REASSIGNMENTS, + ConfigDef.Importance.HIGH, + DELETE_STALE_PARTITIONS_REASSIGNMENTS_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 2f0d63ef0..016d2069e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -855,6 +855,41 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException { } } + /*** + * Cancel reassignments external to CruiseControl so as to not be blocked. + * Reassignments aborted while CC is still running are handled by CC itself. + * This will not affect reassignments started by CC itself as that is handled in {@code KafkaCruiseControl.sanityCheckDryRun()} + * Required for cases where CC dies while an execution is in progress + * Eg. node running CC goes down + */ + public void cancelStaleReassignments() { + if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) { + throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned")); + } + Set partitionsBeingReassigned; + try { + partitionsBeingReassigned = ExecutionUtils.partitionsBeingReassigned(_adminClient); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + throw new IllegalStateException("Unable to retrieve current partition reassignments", e); + } + if (partitionsBeingReassigned.isEmpty()) { + LOG.info("No stale reassignments found"); + return; + } + Map> newReassignments = new HashMap<>(); + for (TopicPartition tp : partitionsBeingReassigned) { + newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue()); + } + try { + _adminClient.alterPartitionReassignments(newReassignments).all().get(); + LOG.info("Cancelled stale partition assignments {}", partitionsBeingReassigned.toString()); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error cancelling ongoing partition reassignments {}", e); + } finally { + _stuckPartitionsBeingReassinedSemaphore.release(); + } + } + /** * Check and clear stuck partitionReassignments- Required to handle deadlocked conditions where reassignment is stuck * in a limbo state due to the destination broker(s) being unavailable and one/more of the original brokers to revert to is also down @@ -862,7 +897,7 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException { public void fixStuckPartitionReassignments() { // make sure only one stuckPartitionReassginement call happening at any time if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) { - throw new IllegalStateException(String.format("Stuck partitions currently being reassigned")); + throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned")); } Map> ongoingPartitionReassignmentsToBeChanged = new HashMap<>(); try { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java index c09aa69b6..d5a0bdba2 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LaggingReplicaReassignmentGoalTest.java @@ -101,6 +101,7 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception { }).anyTimes(); clusterModel.clearSortedReplicas(); clusterModel.clearSortedReplicas(); + adminClient.close(); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList()); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1); EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12); @@ -121,9 +122,9 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception { // optimize again after sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); // should have been moved as 1st seen before sleep - assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1))); + assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1))); // still hasnt reached maxLagTimeMS threshold - assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2Copy))); + assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2Copy))); EasyMock.verify(clusterModel); } @@ -193,8 +194,8 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti } // optimize again after sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); - assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1))); - assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2))); + assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1))); + assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2))); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -202,8 +203,8 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti } // optimize again after sleep goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet(), new HashSet(), new HashSet())); - assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1))); - assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2Copy))); + assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1))); + assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2Copy))); EasyMock.verify(clusterModel); } }