diff --git a/config/cruisecontrol.properties b/config/cruisecontrol.properties
index 4f9329e99..4b69524e8 100644
--- a/config/cruisecontrol.properties
+++ b/config/cruisecontrol.properties
@@ -378,3 +378,5 @@ two.step.purgatory.retention.time.ms=1209600000
# The maximum number of requests in two-step (verification) purgatory.
two.step.purgatory.max.requests=25
+
+remove.stuck.partition.reassignments=false
\ No newline at end of file
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
index a1097052d..6f263f87f 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
@@ -274,8 +274,13 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) {
+ "an already ongoing partition reassignment.", e);
}
if (!partitionsBeingReassigned.isEmpty()) {
- throw new IllegalStateException(String.format("Cannot execute new proposals while there are ongoing partition reassignments "
+ if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) {
+ LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
+ _executor.fixStuckPartitionReassignments();
+ } else {
+ throw new IllegalStateException(String.format("Cannot execute new proposals while there are ongoing partition reassignments "
+ "initiated by external agent: %s", partitionsBeingReassigned));
+ }
} else if (_executor.hasOngoingLeaderElection()) {
throw new IllegalStateException("Cannot execute new proposals while there are ongoing leadership reassignments initiated by "
+ "external agent.");
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
index cc7a7ec54..eb38adcde 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
@@ -439,6 +439,15 @@ private ExecutorConfig() {
public static final String CONCURRENCY_ADJUSTER_MIN_ISR_RETENTION_MS_DOC = "The maximum time in ms to cache min.insync.replicas of topics."
+ " Relevant only if concurrency adjuster is enabled based on (At/Under)MinISR status of partitions.";
+ /**
+ * remove.stuck.partition.reassignments
+ */
+ public static final String REMOVE_STUCK_PARTITIONS_REASSIGNMENTS = "remove.stuck.partition.reassignments";
+ public static final boolean DEFAULT_REMOVE_STUCK_PARTITIONS_REASSIGNMENTS = false;
+ public static final String REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC = "Remove stuck partitions from ongoing reassignment."
+ + " Required because there can be cases where the rassignment gets into a limbo state during reassignment where it cannot"
+ + " finish the reassignemnt or revert back to original (Eg. dest & origin brokers both went down)";
+
/**
* Define configs for Executor.
*
@@ -698,6 +707,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_CONCURRENCY_ADJUSTER_MIN_ISR_RETENTION_MS,
atLeast(1),
ConfigDef.Importance.LOW,
- CONCURRENCY_ADJUSTER_MIN_ISR_RETENTION_MS_DOC);
+ CONCURRENCY_ADJUSTER_MIN_ISR_RETENTION_MS_DOC)
+ .define(REMOVE_STUCK_PARTITIONS_REASSIGNMENTS,
+ ConfigDef.Type.BOOLEAN,
+ DEFAULT_REMOVE_STUCK_PARTITIONS_REASSIGNMENTS,
+ ConfigDef.Importance.HIGH,
+ REMOVE_STUCK_PARTITIONS_REASSIGNMENTS_DOC);
}
}
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
index cfeb57780..2f0d63ef0 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
@@ -22,9 +22,12 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -38,11 +41,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.TopicDescription;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -129,6 +135,7 @@ public class Executor {
private final long _minExecutionProgressCheckIntervalMs;
public final long _slowTaskAlertingBackoffTimeMs;
private final KafkaCruiseControlConfig _config;
+ private final Semaphore _stuckPartitionsBeingReassinedSemaphore;
/**
* The executor class that execute the proposals generated by optimizer.
@@ -194,6 +201,7 @@ public Executor(KafkaCruiseControlConfig config,
_hasOngoingExecution = false;
_flipOngoingExecutionMutex = new Semaphore(1);
_noOngoingExecutionSemaphore = new Semaphore(1);
+ _stuckPartitionsBeingReassinedSemaphore = new Semaphore(1);
_uuid = null;
_reasonSupplier = null;
_executorNotifier = executorNotifier != null ? executorNotifier
@@ -826,6 +834,8 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
}
// Note that in case there is an ongoing partition reassignment, we do not unpause metric sampling.
if (hasOngoingPartitionReassignments) {
+ // check for stuck partition movements
+ fixStuckPartitionReassignments();
throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
} else {
boolean hasOngoingIntraBrokerReplicaMovement;
@@ -845,6 +855,45 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
}
}
+ /**
+ * Check and clear stuck partitionReassignments- Required to handle deadlocked conditions where reassignment is stuck
+ * in a limbo state due to the destination broker(s) being unavailable and one/more of the original brokers to revert to is also down
+ */
+ public void fixStuckPartitionReassignments() {
+ // make sure only one stuckPartitionReassginement call happening at any time
+ if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) {
+ throw new IllegalStateException(String.format("Stuck partitions currently being reassigned"));
+ }
+ Map> ongoingPartitionReassignmentsToBeChanged = new HashMap<>();
+ try {
+ Map ongoingPartitionReassignments = ExecutionUtils.ongoingPartitionReassignments(_adminClient);
+ Map topicDetails = _adminClient.describeTopics(ongoingPartitionReassignments.keySet().stream()
+ .map(tp -> tp.topic()).distinct().collect(Collectors.toList()))
+ .all().get();
+
+ Collection availableBrokers = _adminClient.describeCluster().nodes().get();
+ Set availableBrokersSet = availableBrokers.stream().map(Node::id).collect(Collectors.toSet());
+ for (Entry entry: ongoingPartitionReassignments.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ PartitionReassignment reassignment = entry.getValue();
+ // if reassingment replicas are not all available
+ // change the reassignment
+ if (!availableBrokersSet.containsAll(reassignment.replicas())) {
+ // just increase as much as possible
+ int newReplicasCount = Math.min(availableBrokersSet.size(), topicDetails.get(tp.topic()).partitions().get(0).replicas().size());
+ ongoingPartitionReassignmentsToBeChanged.put(tp, Optional.of(new NewPartitionReassignment(
+ availableBrokersSet.stream().limit(newReplicasCount).collect(Collectors.toList()))));
+ }
+ }
+ // try to change the partitionReassignments for the stuck partitions
+ _adminClient.alterPartitionReassignments(ongoingPartitionReassignmentsToBeChanged).all().get();
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Error fixing stuck partitionReassignments {}", e);
+ } finally {
+ _stuckPartitionsBeingReassinedSemaphore.release();
+ }
+ }
+
private void processExecuteProposalsFailure() {
_executionTaskManager.clear();
_uuid = null;