diff --git a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java index 522c08a2..4a59c9de 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/actions/kafka/BrokerRecoveryAction.java @@ -44,7 +44,7 @@ public class BrokerRecoveryAction extends NodeAction { private static final int retryIntervalMilliseconds = 1000; public static final String ATTR_TRY_TO_RESTART_KEY = "try_restart"; - public static final String ATTR_STOP_SERVICE_BEFORE_ACTION = "stop_service_before_action"; + public static final String ATTR_STOP_SERVICE_BEFORE_REPLACEMENT = "stop_service_before_replacement"; public static final String ATTR_NODE_EXISTS_KEY = "node_exists"; public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host"; public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run"; @@ -90,36 +90,6 @@ public void runAction() { String hostname = node.getCurrentNodeInfo().getHostname(); int port = node.getCurrentNodeInfo().getServicePort(); - // Try to gracefully shutdown service before recovery actions. - // This can make sure the broker is not the leader of any partition. - boolean stopServiceBeforeAction = true; - if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION)) { - stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION).getValue(); - } - if (stopServiceBeforeAction) { - OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_action")).inc(); - try { - ServiceStopAction stopServiceAction = new ServiceStopAction(); - stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); - getEngine().dispatchChild(this, stopServiceAction); - getResult().appendOut("Attempt to stop service before recovery action."); - try { - stopServiceAction.get(30, TimeUnit.SECONDS); - } catch (Exception e) { - getResult().appendErr( - "Unable to stop service before broker replacement. Will keep moving forward. Error message:" - + e.getMessage()); - } - } catch (Exception e) { - getResult().appendErr( - "Encounter unknown error when stopping service. Will keep moving forward. Error message:" - + e.getMessage()); - } - } else { - getResult().appendOut( - "Skip stopping service before recovery actions. The configuration is set to false."); - } - boolean tryRestart = false; if (containsAttribute(ATTR_TRY_TO_RESTART_KEY)) { tryRestart = getAttribute(ATTR_TRY_TO_RESTART_KEY).getValue(); @@ -160,6 +130,37 @@ public void runAction() { getResult().appendOut("Start replacing broker"); + // Try to gracefully shutdown service before replacing broker. + // This can make sure the broker is not the leader of any partition. + boolean stopServiceBeforeAction = true; + if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)) { + stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue(); + } + if (stopServiceBeforeAction) { + OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_replacement")).inc(); + try { + ServiceStopAction stopServiceAction = new ServiceStopAction(); + stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID); + getEngine().dispatchChild(this, stopServiceAction); + getResult().appendOut("Attempt to stop service before broker replacement."); + try { + stopServiceAction.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + getResult().appendErr( + "Unable to stop service before broker replacement. Will keep moving forward. Error message:" + + e.getMessage()); + } + } catch (Exception e) { + getResult().appendErr( + "Encounter unknown error when stopping service. Will keep moving forward. Error message:" + + e.getMessage()); + } + } else { + getResult().appendOut( + "Skip stopping service before broker replacement. " + + "The stop_service_before_replacement configuration is set to false."); + } + try { // if dry run skip dispatching the replacement action and report success, but still set the cooldown flag for the cluster if (!isDryRun) {