From 8a595cf0389e2c0df4fd6e83b186f20da0e140e2 Mon Sep 17 00:00:00 2001 From: mavemuri Date: Wed, 6 Apr 2022 10:43:06 -0700 Subject: [PATCH] fix/cruisecontrol: add partition movement timeout to executor There is an edge case wherein after the partition reassignment was submitted to kafka and before it finished, there was a partition leadership re-lection- this causes the reassignment to stall until there is another re-election. However, we do see cases where there is no re-election triggered leading to a partition reaissgnment being in IN_PROGRESS indefinitely and potentially missing new anomalies due to executor state being in INTER_BROKER_REPLICA_ACTION By adding a max timeout, we avoid this state by cancelling such reassignemnts and retrying them later includes minor cleanup --- .../cruisecontrol/KafkaCruiseControl.java | 2 +- .../config/constants/ExecutorConfig.java | 13 ++++++++++ .../cruisecontrol/executor/Executor.java | 24 ++++++++----------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index f24476f12..8013f243f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -297,7 +297,7 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) { } } else if (_config.getBoolean(ExecutorConfig.DELETE_STALE_PARTITIONS_REASSIGNMENTS)) { LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned); - _executor.cancelStaleReassignments(); + _executor.cancelStaleReassignments(partitionsBeingReassigned); } else if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) { LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned); _executor.fixStuckPartitionReassignments(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index caedd549f..a1761aad0 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -250,6 +250,14 @@ public final class ExecutorConfig { public static final String LEADER_MOVEMENT_TIMEOUT_MS_DOC = "The maximum time to wait for a leader movement to finish. " + "A leader movement will be marked as failed if it takes longer than this time to finish."; + /** + * partition.reassignment.timeout.ms + */ + public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = "partition.reassignment.timeout.ms"; + public static final long DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = TimeUnit.MINUTES.toMillis(60); + public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC = "The maximum time to wait for a partition reassignment to finish. " + + "The reassignment will be marked as failed if it takes longer than this to finish."; + /** * task.execution.alerting.threshold.ms */ @@ -716,6 +724,11 @@ public static ConfigDef define(ConfigDef configDef) { DEFAULT_LEADER_MOVEMENT_TIMEOUT_MS, ConfigDef.Importance.LOW, LEADER_MOVEMENT_TIMEOUT_MS_DOC) + .define(PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG, + ConfigDef.Type.LONG, + DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG, + ConfigDef.Importance.LOW, + PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC) .define(TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_TASK_EXECUTION_ALERTING_THRESHOLD_MS, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 7bd907908..32534991f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -98,6 +98,7 @@ public class Executor { private final ExecutorService _proposalExecutor; private final AdminClient _adminClient; private final double _leaderMovementTimeoutMs; + private final double _partitionReassignmentTimeoutMs; private static final int NO_STOP_EXECUTION = 0; private static final int STOP_EXECUTION = 1; @@ -194,6 +195,7 @@ public Executor(KafkaCruiseControlConfig config, _defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG); _executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs; _leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG); + _partitionReassignmentTimeoutMs = config.getLong(ExecutorConfig.PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG); _requestedExecutionProgressCheckIntervalMs = null; _proposalExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG)); @@ -967,8 +969,6 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException { } // Note that in case there is an ongoing partition reassignment, we do not unpause metric sampling. if (hasOngoingPartitionReassignments) { - // check for stuck partition movements - fixStuckPartitionReassignments(); throw new OngoingExecutionException("There are ongoing inter-broker partition movements."); } else { boolean hasOngoingIntraBrokerReplicaMovement; @@ -992,28 +992,19 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException { * This will not affect reassignments started by CC itself as that is handled in {@code KafkaCruiseControl.sanityCheckDryRun()} * Required for cases where CC dies while an execution is in progress * Eg. node running CC goes down + * @param partitionsBeingReassigned partitions being reassigned */ - public void cancelStaleReassignments() { + public void cancelStaleReassignments(Set partitionsBeingReassigned) { if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) { throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned")); } - Set partitionsBeingReassigned; - try { - partitionsBeingReassigned = ExecutionUtils.partitionsBeingReassigned(_adminClient); - } catch (TimeoutException | InterruptedException | ExecutionException e) { - throw new IllegalStateException("Unable to retrieve current partition reassignments", e); - } - if (partitionsBeingReassigned.isEmpty()) { - LOG.info("No stale reassignments found"); - return; - } Map> newReassignments = new HashMap<>(); for (TopicPartition tp : partitionsBeingReassigned) { newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue()); } try { _adminClient.alterPartitionReassignments(newReassignments).all().get(); - LOG.info("Cancelled stale partition assignments {}", partitionsBeingReassigned.toString()); + LOG.info("Cancelled stale partition reassignments {}", partitionsBeingReassigned.toString()); } catch (InterruptedException | ExecutionException e) { LOG.error("Error cancelling ongoing partition reassignments {}", e); } finally { @@ -2054,6 +2045,11 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, break; case INTER_BROKER_REPLICA_ACTION: + if (_time.milliseconds() > task.startTimeMs() + _partitionReassignmentTimeoutMs) { + _executionTaskManager.markTaskDead(task); + LOG.warn("Killing execution for task {} as it has exceeded maximum partition movement timeout", task); + return true; + } for (ReplicaPlacementInfo broker : task.proposal().newReplicas()) { if (cluster.nodeById(broker.brokerId()) == null || deadInterBrokerReassignments.contains(task.proposal().topicPartition())) {