Skip to content

Commit

Permalink
Move the logic before replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
yisheng-zhou committed Nov 26, 2024
1 parent dacaec3 commit b1e2ddc
Showing 1 changed file with 32 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class BrokerRecoveryAction extends NodeAction {
private static final int retryIntervalMilliseconds = 1000;

public static final String ATTR_TRY_TO_RESTART_KEY = "try_restart";
public static final String ATTR_STOP_SERVICE_BEFORE_ACTION = "stop_service_before_action";
public static final String ATTR_STOP_SERVICE_BEFORE_REPLACEMENT = "stop_service_before_replacement";
public static final String ATTR_NODE_EXISTS_KEY = "node_exists";
public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host";
public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run";
Expand Down Expand Up @@ -90,36 +90,6 @@ public void runAction() {
String hostname = node.getCurrentNodeInfo().getHostname();
int port = node.getCurrentNodeInfo().getServicePort();

// Try to gracefully shutdown service before recovery actions.
// This can make sure the broker is not the leader of any partition.
boolean stopServiceBeforeAction = true;
if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION)) {
stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_ACTION).getValue();
}
if (stopServiceBeforeAction) {
OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_action")).inc();
try {
ServiceStopAction stopServiceAction = new ServiceStopAction();
stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID);
getEngine().dispatchChild(this, stopServiceAction);
getResult().appendOut("Attempt to stop service before recovery action.");
try {
stopServiceAction.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
getResult().appendErr(
"Unable to stop service before broker replacement. Will keep moving forward. Error message:"
+ e.getMessage());
}
} catch (Exception e) {
getResult().appendErr(
"Encounter unknown error when stopping service. Will keep moving forward. Error message:"
+ e.getMessage());
}
} else {
getResult().appendOut(
"Skip stopping service before recovery actions. The configuration is set to false.");
}

boolean tryRestart = false;
if (containsAttribute(ATTR_TRY_TO_RESTART_KEY)) {
tryRestart = getAttribute(ATTR_TRY_TO_RESTART_KEY).getValue();
Expand Down Expand Up @@ -160,6 +130,37 @@ public void runAction() {

getResult().appendOut("Start replacing broker");

// Try to gracefully shutdown service before replacing broker.
// This can make sure the broker is not the leader of any partition.
boolean stopServiceBeforeAction = true;
if (containsAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT)) {
stopServiceBeforeAction = getAttribute(ATTR_STOP_SERVICE_BEFORE_REPLACEMENT).getValue();
}
if (stopServiceBeforeAction) {
OrionServer.METRICS.counter(metricPrefix.resolve("stop_service_before_replacement")).inc();
try {
ServiceStopAction stopServiceAction = new ServiceStopAction();
stopServiceAction.copyAttributeFrom(this, OrionConstants.NODE_ID);
getEngine().dispatchChild(this, stopServiceAction);
getResult().appendOut("Attempt to stop service before broker replacement.");
try {
stopServiceAction.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
getResult().appendErr(
"Unable to stop service before broker replacement. Will keep moving forward. Error message:"
+ e.getMessage());
}
} catch (Exception e) {
getResult().appendErr(
"Encounter unknown error when stopping service. Will keep moving forward. Error message:"
+ e.getMessage());
}
} else {
getResult().appendOut(
"Skip stopping service before broker replacement. " +
"The stop_service_before_replacement configuration is set to false.");
}

try {
// if dry run skip dispatching the replacement action and report success, but still set the cooldown flag for the cluster
if (!isDryRun) {
Expand Down

0 comments on commit b1e2ddc

Please sign in to comment.