Skip to content

Commit

Permalink
add option to delete partition reassignments not started by CruiseCon…
Browse files Browse the repository at this point in the history
…trol
  • Loading branch information
mavemuri committed Jun 7, 2023
1 parent 352c261 commit b5ed6fa
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) {
LOG.info("External agent is reassigning partitions. "
+ "The request to stop it is submitted successfully: {}", partitionsBeingReassigned);
}
} else if (_config.getBoolean(ExecutorConfig.DELETE_STALE_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
_executor.cancelStaleReassignments();
} else if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
_executor.fixStuckPartitionReassignments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import java.util.Arrays;
import java.util.Collections;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.closeAdminClientWithTimeout;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.createAdminClient;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.parseAdminClientConfigs;

Expand All @@ -48,31 +50,26 @@
public class LaggingReplicaReassignmentGoal extends AbstractGoal {

private static final Logger LOG = LoggerFactory.getLogger(LaggingReplicaReassignmentGoal.class);
private boolean _laggingRecoveryNeeded;

private List<PartitionInfo> _laggingPartitions;
protected static final ConcurrentHashMap<PartitionInfoWrapper, Long> LAGGING_PARTITIONS_MAP = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
private static final Object UPDATE_LAGGING_PARTITIONS_MAP_LOCK = new Object();

protected ConcurrentHashMap<PartitionInfoWrapper, Long> _laggingPartitionsMap;
private static volatile boolean laggingRecoveryNeeded = false;

private ConcurrentHashMap<PartitionInfoWrapper, Long> _newLaggingPartitionsMap;

private long _maxReplicaLagMs;

private AdminClient _adminClient;
private static List<PartitionInfo> laggingPartitionsList = Collections.synchronizedList(new ArrayList<PartitionInfo>());

private KafkaCruiseControlConfig _parsedConfig;

private long _maxReplicaLagMs;

@Override
public void configure(Map<String, ?> configs) {
LOG.info("Configuring LaggingReplicaReassignmentGoal");
_parsedConfig = new KafkaCruiseControlConfig(configs, false);
_adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig));
_balancingConstraint = new BalancingConstraint(_parsedConfig);
_numWindows = _parsedConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG);
_minMonitoredPartitionPercentage = _parsedConfig.getDouble(MonitorConfig.MIN_VALID_PARTITION_RATIO_CONFIG);
_laggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
_maxReplicaLagMs = (long) configs.get(AnalyzerConfig.MAX_LAGGING_REPLICA_REASSIGN_MS);
_laggingPartitions = new ArrayList<PartitionInfo>();
_laggingRecoveryNeeded = false;
}

@Override
Expand Down Expand Up @@ -146,63 +143,71 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op

LOG.info("updateGoalState");
checkIfReplicasLagging(clusterModel);
if (!_laggingRecoveryNeeded) {
if (!laggingRecoveryNeeded) {
finish();
}

}

void checkIfReplicasLagging(ClusterModel clusterModel) throws OptimizationFailureException {
long currentTimeMillis = System.currentTimeMillis();
_newLaggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
ConcurrentHashMap<PartitionInfoWrapper, Long> newLaggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
LOG.info("Checking for lagging replicas");
if (_laggingPartitionsMap == null) {
_laggingPartitionsMap = new ConcurrentHashMap<PartitionInfoWrapper, Long>();
}
//List<PartitionInfo> laggingPartitionInfos = clusterModel.getPartitionsWithLaggingReplicas();
//_laggingPartitionsMap.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi));
//LAGGING_PARTITIONS_MAP.entrySet().removeIf(e -> !laggingPartitionInfos.contains(e.getKey()._pi));
for (PartitionInfo partition: clusterModel.getPartitionsWithLaggingReplicas()) {
LOG.info(partition.toString());
PartitionInfoWrapper piw = new PartitionInfoWrapper(partition);
long lastSeenTime = _laggingPartitionsMap.getOrDefault(piw, currentTimeMillis);
long lastSeenTime = LAGGING_PARTITIONS_MAP.getOrDefault(piw, currentTimeMillis);
if (currentTimeMillis - lastSeenTime >= _maxReplicaLagMs) {
LOG.info("Partition {} has been lagging for past {} minutes", partition.toString(),
(currentTimeMillis - lastSeenTime) / (60 * 1000));
_laggingRecoveryNeeded = true;
_laggingPartitions.add(partition);
laggingRecoveryNeeded = true;
laggingPartitionsList.add(partition);
}
_newLaggingPartitionsMap.put(piw, lastSeenTime);
newLaggingPartitionsMap.put(piw, lastSeenTime);
}
synchronized (UPDATE_LAGGING_PARTITIONS_MAP_LOCK) {
LAGGING_PARTITIONS_MAP.clear();
LAGGING_PARTITIONS_MAP.putAll(newLaggingPartitionsMap);
}
_laggingPartitionsMap = _newLaggingPartitionsMap;
LOG.info("Lagging partitions map: {} on thread after {}", _laggingPartitionsMap.toString(), Thread.currentThread().getName());
LOG.info("Lagging partitions map: {} on thread after {}", LAGGING_PARTITIONS_MAP.toString(), Thread.currentThread().getName());
}

List<PartitionInfo> getLaggingPartitions() {
return _laggingPartitions;
return laggingPartitionsList;
}

@Override
protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals,
OptimizationOptions optimizationOptions) throws OptimizationFailureException {

LOG.info("Current lagging partitions: {} ", _laggingPartitions.toString());
if (_laggingRecoveryNeeded) {
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
// gather all the lagging partitions into a reassignments map
for (PartitionInfo laggingPartition: _laggingPartitions) {
List<Node> laggingReplicas = new LinkedList<Node>(Arrays.asList(laggingPartition.replicas()));
reassignments.put(new TopicPartition(laggingPartition.topic(), laggingPartition.partition()),
Optional.of(new NewPartitionReassignment(laggingReplicas.stream().map(node -> node.id()).collect(Collectors.toList()))));
_laggingPartitionsMap.remove(new PartitionInfoWrapper(laggingPartition));
}
// use admin client to move them to same brokers
try {
_adminClient.alterPartitionReassignments(reassignments).all().get();
reassignments.entrySet().stream().forEach(e -> LOG.info("Moved partition {}: {}", e.getKey(), e.getValue().get().targetReplicas()));
_laggingPartitions.clear();
_laggingRecoveryNeeded = false;
} catch (InterruptedException | ExecutionException e) {
LOG.error("Unable to move replicas onto same brokers");
LOG.info("Current lagging partitions: {} ", laggingPartitionsList.toString());
if (laggingRecoveryNeeded) {
synchronized (UPDATE_LAGGING_PARTITIONS_MAP_LOCK) {
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
// gather all the lagging partitions into a reassignments map
for (PartitionInfo laggingPartition: laggingPartitionsList) {
List<Node> laggingReplicas = new LinkedList<Node>(Arrays.asList(laggingPartition.replicas()));
reassignments.put(new TopicPartition(laggingPartition.topic(), laggingPartition.partition()),
Optional.of(new NewPartitionReassignment(laggingReplicas.stream().map(node -> node.id()).collect(Collectors.toList()))));
//LAGGING_PARTITIONS_MAP.remove(new PartitionInfoWrapper(laggingPartition));
}
// use admin client to move them to same brokers
LOG.info("Creating adminClient for partition reassignment");
AdminClient adminClient = createAdminClient(parseAdminClientConfigs(_parsedConfig));
try {
adminClient.alterPartitionReassignments(reassignments).all().get();
reassignments.entrySet().stream().
forEach(e -> LOG.info("Moved partition {}: {}", e.getKey(), e.getValue().get().targetReplicas()));
laggingPartitionsList.clear();
laggingRecoveryNeeded = false;
LAGGING_PARTITIONS_MAP.clear();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Unable to move replicas onto same brokers");
}
LOG.info("Closing adminClient");
closeAdminClientWithTimeout(adminClient);
}
}

Expand All @@ -226,7 +231,11 @@ public boolean equals(Object o) {
PartitionInfoWrapper partitionInfoWrapperObj = (PartitionInfoWrapper) o;
PartitionInfo p2 = partitionInfoWrapperObj._pi;
if (_pi.topic().equals(p2.topic()) && _pi.partition() == p2.partition()
&& _pi.leader().id() == p2.leader().id() && _pi.inSyncReplicas().length == p2.inSyncReplicas().length) {
&& (
(_pi.leader() == null && p2.leader() == null)
|| (_pi.leader() != null && p2.leader() != null && _pi.leader().id() == p2.leader().id())
)
&& _pi.inSyncReplicas().length == p2.inSyncReplicas().length) {
Set<Integer> p2ISRSet = Arrays.stream(p2.inSyncReplicas()).map(isr -> isr.id()).collect(Collectors.toSet());
if (Arrays.stream(_pi.inSyncReplicas()).allMatch(isr -> p2ISRSet.contains(isr.id()))) {
return true;
Expand All @@ -241,7 +250,7 @@ public int hashCode() {
int result = 1;
result = prime * result + ((_pi.topic() == null) ? 0 : _pi.topic().hashCode());
result = prime * result + _pi.partition();
result = prime * result + _pi.leader().id();
result = prime * result + ((_pi.leader() == null) ? 0 : _pi.leader().id());
for (Node n: _pi.inSyncReplicas()) {
result = prime * result + n.id();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,17 @@ public final class ExecutorConfig {
+ " Required because there can be cases where the reassignment gets into a limbo state during reassignment where it cannot"
+ " finish the reassignment or revert back to original (Eg. dest & origin brokers both went down)";

/*
* <code>delete.stale.partition.reassignments</code>
*/
public static final String DELETE_STALE_PARTITIONS_REASSIGNMENTS = "delete.stale.partition.reassignments";
public static final boolean DEFAULT_DELETE_STALE_PARTITIONS_REASSIGNMENTS = false;
public static final String DELETE_STALE_PARTITIONS_REASSIGNMENTS_DOC = "Delete Stale partition reassignments- eg. from "
+ " an older instance of CruiseControl which died due t various reasons(node down/etc)";

private ExecutorConfig() {
}

/**
* Define configs for Executor.
*
Expand Down Expand Up @@ -894,6 +903,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_REMOVE_STUCK_PARTITIONS_REASSIGNMENTS,
ConfigDef.Importance.HIGH,
REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC);
REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC)
.define(DELETE_STALE_PARTITIONS_REASSIGNMENTS,
ConfigDef.Type.BOOLEAN,
DEFAULT_DELETE_STALE_PARTITIONS_REASSIGNMENTS,
ConfigDef.Importance.HIGH,
DELETE_STALE_PARTITIONS_REASSIGNMENTS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -986,14 +986,49 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
}
}

/***
* Cancel reassignments external to CruiseControl so as to not be blocked.
* Reassignments aborted while CC is still running are handled by CC itself.
* This will not affect reassignments started by CC itself as that is handled in {@code KafkaCruiseControl.sanityCheckDryRun()}
* Required for cases where CC dies while an execution is in progress
* Eg. node running CC goes down
*/
public void cancelStaleReassignments() {
if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) {
throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned"));
}
Set<TopicPartition> partitionsBeingReassigned;
try {
partitionsBeingReassigned = ExecutionUtils.partitionsBeingReassigned(_adminClient);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new IllegalStateException("Unable to retrieve current partition reassignments", e);
}
if (partitionsBeingReassigned.isEmpty()) {
LOG.info("No stale reassignments found");
return;
}
Map<TopicPartition, Optional<NewPartitionReassignment>> newReassignments = new HashMap<>();
for (TopicPartition tp : partitionsBeingReassigned) {
newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue());
}
try {
_adminClient.alterPartitionReassignments(newReassignments).all().get();
LOG.info("Cancelled stale partition assignments {}", partitionsBeingReassigned.toString());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error cancelling ongoing partition reassignments {}", e);
} finally {
_stuckPartitionsBeingReassinedSemaphore.release();
}
}

/**
* Check and clear stuck partitionReassignments- Required to handle deadlocked conditions where reassignment is stuck
* in a limbo state due to the destination broker(s) being unavailable and one/more of the original brokers to revert to is also down
*/
public void fixStuckPartitionReassignments() {
// make sure only one stuckPartitionReassginement call happening at any time
if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) {
throw new IllegalStateException(String.format("Stuck partitions currently being reassigned"));
throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned"));
}
Map<TopicPartition, Optional<NewPartitionReassignment>> ongoingPartitionReassignmentsToBeChanged = new HashMap<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception {
AdminClient adminClient = EasyMock.createMock(AdminClient.class);
PowerMock.mockStatic(KafkaCruiseControlUtils.class);
EasyMock.expect(KafkaCruiseControlUtils.createAdminClient(EasyMock.anyObject())).andReturn(adminClient);
KafkaCruiseControlUtils.closeAdminClientWithTimeout(EasyMock.anyObject());
EasyMock.expectLastCall();
EasyMock.expect(KafkaCruiseControlUtils.parseAdminClientConfigs(EasyMock.anyObject())).andReturn(configs);
PowerMock.replay(KafkaCruiseControlUtils.class);

Expand Down Expand Up @@ -101,6 +103,7 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception {
}).anyTimes();
clusterModel.clearSortedReplicas();
clusterModel.clearSortedReplicas();
adminClient.close();
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(new ArrayList<PartitionInfo>());
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas1);
EasyMock.expect(clusterModel.getPartitionsWithLaggingReplicas()).andReturn(partitionsWithLaggingReplicas12);
Expand All @@ -121,9 +124,9 @@ public void checkifLaggingPartitionReplicaMoved() throws Exception {
// optimize again after sleep
goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet<String>(), new HashSet<Integer>(), new HashSet<Integer>()));
// should have been moved as 1st seen before sleep
assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1)));
assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1)));
// still hasnt reached maxLagTimeMS threshold
assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2Copy)));
assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2Copy)));
EasyMock.verify(clusterModel);
}

Expand Down Expand Up @@ -193,17 +196,17 @@ public void checkNonLaggingPartitionNotMoved() throws OptimizationFailureExcepti
}
// optimize again after sleep
goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet<String>(), new HashSet<Integer>(), new HashSet<Integer>()));
assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1)));
assertFalse(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2)));
assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1)));
assertFalse(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2)));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// optimize again after sleep
goal.optimize(clusterModel, optimizedGoals, new OptimizationOptions(new HashSet<String>(), new HashSet<Integer>(), new HashSet<Integer>()));
assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo1)));
assertTrue(goal._laggingPartitionsMap.containsKey(new PartitionInfoWrapper(partitionInfo2Copy)));
assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo1)));
assertTrue(goal.LAGGING_PARTITIONS_MAP.containsKey(new PartitionInfoWrapper(partitionInfo2Copy)));
EasyMock.verify(clusterModel);
}
}

0 comments on commit b5ed6fa

Please sign in to comment.