Skip to content

Commit

Permalink
Make BrokerRecoveryAction run in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
yisheng-zhou committed Dec 12, 2023
1 parent e40649f commit 829d852
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void runAction() throws Exception {
}

getResult().appendOut(
"Node is currently stable after " + (time - now) + " seconds. Still waiting for " + (
duration - (time - now)) + " seconds.");
"Node is currently stable after " + (time - now) + " ms. Still waiting for " + (
duration - (time - now)) + " ms.");
Thread.sleep(30_000); // might want to make it configurable
}
markSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BrokerRecoveryAction extends NodeAction {
public static final String ATTR_NONEXISTENT_HOST_KEY = "nonexistent_host";
public static final String CONF_DRY_RUN_REPLACEMENT_KEY = "dry_run";
public static final String CONF_OVERRIDE_IMAGE_KEY = "override_image";
private boolean isDryRun = false;
private boolean isDryRun = false; // Enable dryrun to prevent EC2 actions being triggered. Useful for testing.
private String amiOverride = null;

@Override
Expand Down Expand Up @@ -209,14 +209,14 @@ protected boolean checkServiceStabilityAfterRestart(String nodeId) {
@Override
public String getName() {
// Different action names are required for BrokerRecoveryAction to be dispatched from same ClusterRecoveryAction.
String nodeId = "Unknown Node";
String name = "BrokerRecoveryAction - " + this.getUuid().toString(); // default to action uuid if nodeId is not set.
if (containsAttribute(OrionConstants.NODE_ID)) {
nodeId = getAttribute(OrionConstants.NODE_ID).getValue();
name = String.format(
"BrokerRecoveryAction for broker %s",
getAttribute(OrionConstants.NODE_ID).getValue().toString());
}
return String.format(
"BrokerRecoveryAction for %s %s",
nodeId,
(isDryRun ? " - Dry Run" : ""));
name = name + (isDryRun ? " - Dry Run" : "");
return name;
}

private boolean isHostReachable(String hostname, int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;

Expand Down Expand Up @@ -44,8 +45,7 @@ public void runAction() throws Exception {
logger.info(startNote);
getResult().appendOut(startNote);
try {
boolean isSucceed = healBrokers(candidates);
if (isSucceed) {
if (healBrokers(candidates) && isChildActionsSuccess()) {
markSucceeded();
} else {
markFailed(String.format(
Expand All @@ -58,51 +58,105 @@ public void runAction() throws Exception {
}
}

protected void healBroker(String deadBrokerId) throws Exception {
// This will trigger an action that will attempt to replace the broker.
// If agent is still online but Kafka process is down, it will try to restart the broker first.
/**
* Check if all the child actions finished.
* If any of the child actions failed, mark this action as failed.
* @return true if all the child actions are successful.
*/
private boolean isChildActionsSuccess() {
boolean isSuccess = true;
List<Action> childActions = getChildren();
if (childActions == null || childActions.isEmpty()) {
return true;
}
for (Action childAction : childActions) {
try {
childAction.get(); // wait for the child action to finish
} catch (Exception e) {
String errorMsg = String.format(
"Child action %s failed: %s",
childAction.getName(),
e.getMessage());
logger.warning(errorMsg);
getResult().appendOut(errorMsg);
isSuccess = false;
}
isSuccess = isSuccess && childAction.isSuccess();
}
return isSuccess;
}

/**
* This method triggers a BrokerRecoveryAction for a broker.
* If the broker is non-existent or dead, it will replace the broker with a new broker.
* If the broker is still online but Kafka process is down, it will try to restart the broker first. If restart fails, it will replace the broker.
* @param brokerId the id of the broker to be recovered.
* @throws Exception if the child action is not triggered successfully.
*/
protected void triggerBrokerRecoveryAction(String brokerId) throws Exception {

Action brokerRecoveryAction = newBrokerRecoveryAction();
brokerRecoveryAction.setAttribute(OrionConstants.NODE_ID, deadBrokerId, sensorSet);
brokerRecoveryAction.setAttribute(OrionConstants.NODE_ID, brokerId, sensorSet);
brokerRecoveryAction.setOwner("ClusterRecoveryAction");

if (nonExistentBrokers.contains(deadBrokerId)) {
if (nonExistentBrokers.contains(brokerId)) {
Node existingNode = cluster.getNodeMap().values().iterator().next();
String extractedName = deriveNonexistentHostname(
existingNode.getCurrentNodeInfo().getHostname(),
existingNode.getCurrentNodeInfo().getNodeId(),
deadBrokerId
brokerId
);
// setting these attributes to indicate that the node doesn't exist in cluster map, and should skip any node-related checks
brokerRecoveryAction.setAttribute(BrokerRecoveryAction.ATTR_NODE_EXISTS_KEY, false);
brokerRecoveryAction.setAttribute(BrokerRecoveryAction.ATTR_NONEXISTENT_HOST_KEY, extractedName);
}
if (maybeDeadBrokers.contains(deadBrokerId)) {
if (maybeDeadBrokers.contains(brokerId)) {
// Setting this flag in the action will restart the broker before replacing the broker
brokerRecoveryAction.setAttribute(BrokerRecoveryAction.ATTR_TRY_TO_RESTART_KEY, true);
String restartNote = "Will try to restart node " + deadBrokerId + " before replacing it. ";
String restartNote =
"BrokerRecoveryAction will try to restart node " + brokerId + " before replacing it.";
logger.info(restartNote);
getResult().appendOut(restartNote);
}
String dispatchNote = String.format(
"Dispatching BrokerRecoveryAction for node %s in cluster %s. ",
deadBrokerId,
String note = String.format(
"Trigger BrokerRecoveryAction for node %s in cluster %s. Check the child action for details.",
brokerId,
cluster.getClusterId());
logger.info(dispatchNote);
getResult().appendOut(dispatchNote);
getEngine().dispatch(brokerRecoveryAction);
logger.info(note);
getResult().appendOut(note);
dispatchChildAction(brokerRecoveryAction);
}

/**
* This method adds a child action to the action list.
* It also dispatches the child action to the action engine.
* @param childAction the child action to be added.
* @throws Exception if the child action is not triggered successfully.
*/
private void dispatchChildAction(Action childAction) throws Exception {
this.getChildren().add(childAction);
getEngine().dispatchChild(this, childAction);
}

/**
* This method triggers a BrokerRecoveryAction for each broker in candidates.
* It checks if the number of candidates is more than one.
* If the number of candidates is more than one, it will alert and not trigger any action.
* @param candidates a set of broker ids that are in bad states.
* @return true if all the child actions are triggered successfully.
* @throws Exception if any of the child actions is not triggered successfully.
*/
protected boolean healBrokers(Set<String> candidates) throws Exception {
String output;
boolean isSucceed = false;
boolean isActionTriggered = false;
if (candidates.size() == 1) {
output = String.format(
"ClusterRecoveryAction trys to recover broker %s. ",
"ClusterRecoveryAction attempts to recover broker %s. ",
candidates.iterator().next());
String deadBrokerId = candidates.iterator().next();
healBroker(deadBrokerId);
String brokerId = candidates.iterator().next();
logger.info(output);
isSucceed = true;
triggerBrokerRecoveryAction(brokerId);
isActionTriggered = true;
} else if (candidates.size() > 1){
// more than 1 brokers are dead... better alert and have human intervention
output = String.format("More than one brokers are in bad state - dead: %s, service down: %s. " +
Expand All @@ -125,7 +179,7 @@ protected boolean healBrokers(Set<String> candidates) throws Exception {
logger.warning(output);
}
getResult().appendOut(output);
return isSucceed;
return isActionTriggered;
}

public void setCandidates(Set<String> candidates) {
Expand All @@ -152,6 +206,13 @@ public void setSensorSet(Set<String> sensorSet) {
this.sensorSet = sensorSet;
}

/**
* This method derives the hostname of a non-existent node from the hostname of an existing node.
* @param existingHostname the hostname of an existing node.
* @param existingId the id of the existing node.
* @param nonExistingId the id of the non-existent node.
* @return the hostname of the non-existent node.
*/
protected static String deriveNonexistentHostname(String existingHostname, String existingId, String nonExistingId) {
existingHostname = existingHostname.split("\\.", 2)[0]; // sanitize potential suffixes
int diff = nonExistingId.length() - existingId.length();
Expand All @@ -172,6 +233,13 @@ protected BrokerRecoveryAction newBrokerRecoveryAction() {
return new BrokerRecoveryAction();
}

/**
* This method removes all the nodes that are replaced within cooldownMilliseconds from candidates.
* It also adds the new set of recovering nodes to the recovering node set.
* If the recovering node set reaches TTL, it replaces the recovering node set with candidates.
* @param candidates a set of nodes that are in bad states.
* @param cluster the cluster that the nodes belong to.
*/
public static void removeRecoveringNodesFromCandidates(Set<String> candidates, Cluster cluster) {
// Remove all the nodes that are replaced within cooldownMilliseconds from candidates.
if (candidates == null || candidates.isEmpty()) {
Expand Down

0 comments on commit 829d852

Please sign in to comment.