diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
index f24476f12..8013f243f 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
@@ -297,7 +297,7 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) {
}
} else if (_config.getBoolean(ExecutorConfig.DELETE_STALE_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
- _executor.cancelStaleReassignments();
+ _executor.cancelStaleReassignments(partitionsBeingReassigned);
} else if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
_executor.fixStuckPartitionReassignments();
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
index caedd549f..a1761aad0 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
@@ -250,6 +250,14 @@ public final class ExecutorConfig {
public static final String LEADER_MOVEMENT_TIMEOUT_MS_DOC = "The maximum time to wait for a leader movement to finish. "
+ "A leader movement will be marked as failed if it takes longer than this time to finish.";
+ /**
+ * partition.reassignment.timeout.ms
+ */
+ public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = "partition.reassignment.timeout.ms";
+ public static final long DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = TimeUnit.MINUTES.toMillis(60);
+ public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC = "The maximum time to wait for a partition reassignment to finish. "
+ + "The reassignment will be marked as failed if it takes longer than this to finish.";
+
/**
* task.execution.alerting.threshold.ms
*/
@@ -716,6 +724,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_LEADER_MOVEMENT_TIMEOUT_MS,
ConfigDef.Importance.LOW,
LEADER_MOVEMENT_TIMEOUT_MS_DOC)
+ .define(PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG,
+ ConfigDef.Importance.LOW,
+ PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC)
.define(TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_TASK_EXECUTION_ALERTING_THRESHOLD_MS,
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
index 7bd907908..32534991f 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
@@ -98,6 +98,7 @@ public class Executor {
private final ExecutorService _proposalExecutor;
private final AdminClient _adminClient;
private final double _leaderMovementTimeoutMs;
+ private final double _partitionReassignmentTimeoutMs;
private static final int NO_STOP_EXECUTION = 0;
private static final int STOP_EXECUTION = 1;
@@ -194,6 +195,7 @@ public Executor(KafkaCruiseControlConfig config,
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
_executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs;
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
+ _partitionReassignmentTimeoutMs = config.getLong(ExecutorConfig.PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG);
_requestedExecutionProgressCheckIntervalMs = null;
_proposalExecutor =
Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
@@ -967,8 +969,6 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
}
// Note that in case there is an ongoing partition reassignment, we do not unpause metric sampling.
if (hasOngoingPartitionReassignments) {
- // check for stuck partition movements
- fixStuckPartitionReassignments();
throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
} else {
boolean hasOngoingIntraBrokerReplicaMovement;
@@ -992,28 +992,19 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
* This will not affect reassignments started by CC itself as that is handled in {@code KafkaCruiseControl.sanityCheckDryRun()}
* Required for cases where CC dies while an execution is in progress
* Eg. node running CC goes down
+ * @param partitionsBeingReassigned partitions being reassigned
*/
- public void cancelStaleReassignments() {
+ public void cancelStaleReassignments(Set partitionsBeingReassigned) {
if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) {
throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned"));
}
- Set partitionsBeingReassigned;
- try {
- partitionsBeingReassigned = ExecutionUtils.partitionsBeingReassigned(_adminClient);
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- throw new IllegalStateException("Unable to retrieve current partition reassignments", e);
- }
- if (partitionsBeingReassigned.isEmpty()) {
- LOG.info("No stale reassignments found");
- return;
- }
Map> newReassignments = new HashMap<>();
for (TopicPartition tp : partitionsBeingReassigned) {
newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue());
}
try {
_adminClient.alterPartitionReassignments(newReassignments).all().get();
- LOG.info("Cancelled stale partition assignments {}", partitionsBeingReassigned.toString());
+ LOG.info("Cancelled stale partition reassignments {}", partitionsBeingReassigned.toString());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error cancelling ongoing partition reassignments {}", e);
} finally {
@@ -2054,6 +2045,11 @@ private boolean maybeMarkTaskAsDead(Cluster cluster,
break;
case INTER_BROKER_REPLICA_ACTION:
+ if (_time.milliseconds() > task.startTimeMs() + _partitionReassignmentTimeoutMs) {
+ _executionTaskManager.markTaskDead(task);
+ LOG.warn("Killing execution for task {} as it has exceeded maximum partition movement timeout", task);
+ return true;
+ }
for (ReplicaPlacementInfo broker : task.proposal().newReplicas()) {
if (cluster.nodeById(broker.brokerId()) == null
|| deadInterBrokerReassignments.contains(task.proposal().topicPartition())) {