Skip to content

Commit

Permalink
fix/cruisecontrol: add partition movement timeout to executor
Browse files Browse the repository at this point in the history
There is an edge case wherein after the partition reassignment was submitted to kafka and before it finished, there was a partition leadership re-lection- this causes the reassignment to stall until there is another re-election. However, we do see cases where there is no re-election triggered leading to a partition reaissgnment being in IN_PROGRESS indefinitely and potentially missing new anomalies due to executor state being in INTER_BROKER_REPLICA_ACTION

By adding a max timeout, we avoid this state by cancelling such reassignemnts and retrying them later

includes minor cleanup
  • Loading branch information
mavemuri committed Jun 7, 2023
1 parent b5ed6fa commit 8a595cf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) {
}
} else if (_config.getBoolean(ExecutorConfig.DELETE_STALE_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
_executor.cancelStaleReassignments();
_executor.cancelStaleReassignments(partitionsBeingReassigned);
} else if (_config.getBoolean(ExecutorConfig.REMOVE_STUCK_PARTITIONS_REASSIGNMENTS)) {
LOG.info("Trying to resolve stuck partitions {}", partitionsBeingReassigned);
_executor.fixStuckPartitionReassignments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ public final class ExecutorConfig {
public static final String LEADER_MOVEMENT_TIMEOUT_MS_DOC = "The maximum time to wait for a leader movement to finish. "
+ "A leader movement will be marked as failed if it takes longer than this time to finish.";

/**
* <code>partition.reassignment.timeout.ms</code>
*/
public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = "partition.reassignment.timeout.ms";
public static final long DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG = TimeUnit.MINUTES.toMillis(60);
public static final String PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC = "The maximum time to wait for a partition reassignment to finish. "
+ "The reassignment will be marked as failed if it takes longer than this to finish.";

/**
* <code>task.execution.alerting.threshold.ms</code>
*/
Expand Down Expand Up @@ -716,6 +724,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_LEADER_MOVEMENT_TIMEOUT_MS,
ConfigDef.Importance.LOW,
LEADER_MOVEMENT_TIMEOUT_MS_DOC)
.define(PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG,
ConfigDef.Importance.LOW,
PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG_DOC)
.define(TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_TASK_EXECUTION_ALERTING_THRESHOLD_MS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class Executor {
private final ExecutorService _proposalExecutor;
private final AdminClient _adminClient;
private final double _leaderMovementTimeoutMs;
private final double _partitionReassignmentTimeoutMs;

private static final int NO_STOP_EXECUTION = 0;
private static final int STOP_EXECUTION = 1;
Expand Down Expand Up @@ -194,6 +195,7 @@ public Executor(KafkaCruiseControlConfig config,
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
_executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs;
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
_partitionReassignmentTimeoutMs = config.getLong(ExecutorConfig.PARTITION_REASSIGNMENT_TIMEOUT_MS_CONFIG);
_requestedExecutionProgressCheckIntervalMs = null;
_proposalExecutor =
Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
Expand Down Expand Up @@ -967,8 +969,6 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
}
// Note that in case there is an ongoing partition reassignment, we do not unpause metric sampling.
if (hasOngoingPartitionReassignments) {
// check for stuck partition movements
fixStuckPartitionReassignments();
throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
} else {
boolean hasOngoingIntraBrokerReplicaMovement;
Expand All @@ -992,28 +992,19 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
* This will not affect reassignments started by CC itself as that is handled in {@code KafkaCruiseControl.sanityCheckDryRun()}
* Required for cases where CC dies while an execution is in progress
* Eg. node running CC goes down
* @param partitionsBeingReassigned partitions being reassigned
*/
public void cancelStaleReassignments() {
public void cancelStaleReassignments(Set<TopicPartition> partitionsBeingReassigned) {
if (!_stuckPartitionsBeingReassinedSemaphore.tryAcquire()) {
throw new IllegalStateException(String.format("Stuck/Stale partitions currently being reassigned"));
}
Set<TopicPartition> partitionsBeingReassigned;
try {
partitionsBeingReassigned = ExecutionUtils.partitionsBeingReassigned(_adminClient);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new IllegalStateException("Unable to retrieve current partition reassignments", e);
}
if (partitionsBeingReassigned.isEmpty()) {
LOG.info("No stale reassignments found");
return;
}
Map<TopicPartition, Optional<NewPartitionReassignment>> newReassignments = new HashMap<>();
for (TopicPartition tp : partitionsBeingReassigned) {
newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue());
}
try {
_adminClient.alterPartitionReassignments(newReassignments).all().get();
LOG.info("Cancelled stale partition assignments {}", partitionsBeingReassigned.toString());
LOG.info("Cancelled stale partition reassignments {}", partitionsBeingReassigned.toString());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error cancelling ongoing partition reassignments {}", e);
} finally {
Expand Down Expand Up @@ -2054,6 +2045,11 @@ private boolean maybeMarkTaskAsDead(Cluster cluster,
break;

case INTER_BROKER_REPLICA_ACTION:
if (_time.milliseconds() > task.startTimeMs() + _partitionReassignmentTimeoutMs) {
_executionTaskManager.markTaskDead(task);
LOG.warn("Killing execution for task {} as it has exceeded maximum partition movement timeout", task);
return true;
}
for (ReplicaPlacementInfo broker : task.proposal().newReplicas()) {
if (cluster.nodeById(broker.brokerId()) == null
|| deadInterBrokerReassignments.contains(task.proposal().topicPartition())) {
Expand Down

0 comments on commit 8a595cf

Please sign in to comment.