Skip to content

Commit

Permalink
[apache/helix] -- Add SetPartitionToError for participants to self an…
Browse files Browse the repository at this point in the history
…notate a node to ERROR state
  • Loading branch information
Charanya Sudharsanan committed May 7, 2024
1 parent 1bfed31 commit c3c2cee
Show file tree
Hide file tree
Showing 12 changed files with 458 additions and 126 deletions.
12 changes: 12 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,18 @@ void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String r
*/
ClusterManagementMode getClusterManagementMode(String clusterName);

/**
* Set a list of partitions for an instance to ERROR state from any state.
* The partitions could be in any state and setPartitionsToError will bring them to ERROR
* state. ANY to ERROR state transition is required for this.
* @param clusterName
* @param instanceName
* @param resourceName
* @param partitionNames
*/
void setPartitionsToError(String clusterName, String instanceName, String resourceName,
List<String> partitionNames);

/**
* Reset a list of partitions in error state for an instance
* The partitions are assume to be in error state and reset will bring them from error
Expand Down
256 changes: 145 additions & 111 deletions helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,136 @@ public ClusterManagementMode getClusterManagementMode(String clusterName) {
: new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus());
}

@Override
public void setPartitionsToError(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
logger.info("Set partitions {} for resource {} on instance {} in cluster {} to ERROR state.",
partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName,
instanceName, clusterName);
sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames,
StateTransitionType.SET_TO_ERROR);
}

private void sendStateTransitionMessage(String clusterName, String instanceName,
String resourceName, List<String> partitionNames, StateTransitionType stateTransitionType) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
// check if the instance exists in the cluster
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
throw new HelixException(String.format(
(_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE
: SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName,
partitionNames, instanceName, instanceName, clusterName, stateTransitionType)));
}

// check resource exists in ideal state
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
if (idealState == null) {
throw new HelixException(
String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, resourceName, clusterName, stateTransitionType)));
}

// check partition exists in resource
Set<String> partitionsNames = new HashSet<String>(partitionNames);
Set<String> partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
? idealState.getRecord().getMapFields().keySet()
: idealState.getRecord().getListFields().keySet();
if (!partitions.containsAll(partitionsNames)) {
throw new HelixException(
String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
}

// check partition is in ERROR state if reset is set to True
String sessionId = liveInstance.getEphemeralOwner();
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
if (stateTransitionType.equals(StateTransitionType.RESET)) {
for (String partitionName : partitionNames) {
if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
throw new HelixException(String.format(
SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName, partitionNames,
instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
}
}
}

// check stateModelDef exists
String stateModelDef = idealState.getStateModelDefRef();
StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
if (stateModel == null) {
throw new HelixException(
String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType)));
}

// check there is no pending messages for the partitions exist
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
for (Message message : messages) {
if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
|| !sessionId.equals(message.getTgtSessionId())
|| !resourceName.equals(message.getResourceName())
|| !partitionsNames.contains(message.getPartitionName())) {
continue;
}

throw new HelixException(String.format(
"Can't %s state for %s.%s on %s, because a pending message %s exists for resource %s",
stateTransitionType.name(), resourceName, partitionNames, instanceName, message,
message.getResourceName()));
}

String adminName = null;
try {
adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
} catch (UnknownHostException e) {
logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
adminName = "UNKNOWN";
}

List<Message> stateTransitionMessages = new ArrayList<Message>();
List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
for (String partitionName : partitionNames) {
String msgId = UUID.randomUUID().toString();
Message message = new Message(MessageType.STATE_TRANSITION, msgId);
message.setSrcName(adminName);
message.setTgtName(instanceName);
message.setMsgState(MessageState.NEW);
message.setPartitionName(partitionName);
message.setResourceName(resourceName);
message.setTgtSessionId(sessionId);
message.setStateModelDef(stateModelDef);
message.setStateModelFactoryName(idealState.getStateModelFactoryName());
// if reset == TRUE, send ERROR to initialState message
// else, send * to ERROR state message
if (stateTransitionType.equals(StateTransitionType.RESET)) {
message.setFromState(HelixDefinedState.ERROR.toString());
message.setToState(stateModel.getInitialState());
}
if (stateTransitionType.equals(StateTransitionType.SET_TO_ERROR)) {
message.setFromState("*");
message.setToState(HelixDefinedState.ERROR.toString());
}
if (idealState.getResourceGroupName() != null) {
message.setResourceGroupName(idealState.getResourceGroupName());
}
if (idealState.getInstanceGroupTag() != null) {
message.setResourceTag(idealState.getInstanceGroupTag());
}

stateTransitionMessages.add(message);
messageKeys.add(keyBuilder.message(instanceName, message.getId()));
}

accessor.setChildren(messageKeys, stateTransitionMessages);
}

private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) {
String hostname = NetworkUtil.getLocalhostName();
logger.info(
Expand Down Expand Up @@ -1180,7 +1310,7 @@ private void processMaintenanceMode(String clusterName, final boolean enabled,
}
}

private enum ResetPartitionFailureReason {
private enum SetPartitionFailureReason {
INSTANCE_NOT_ALIVE("%s is not alive in cluster %s"),
INSTANCE_NON_EXISTENT("%s does not exist in cluster %s"),
RESOURCE_NON_EXISTENT("resource %s is not added to cluster %s"),
Expand All @@ -1190,129 +1320,33 @@ private enum ResetPartitionFailureReason {

private String message;

ResetPartitionFailureReason(String message) {
SetPartitionFailureReason(String message) {
this.message = message;
}

public String getMessage(String resourceName, List<String> partitionNames, String instanceName,
String errorStateEntity, String clusterName) {
return String.format("Can't reset state for %s.%s on %s, because " + message, resourceName,
partitionNames, instanceName, errorStateEntity, clusterName);
String errorStateEntity, String clusterName, StateTransitionType stateTransitionType) {
return String.format("Can't %s state for %s.%s on %s, because " + message,
stateTransitionType.name(), resourceName, partitionNames, instanceName, errorStateEntity,
clusterName);
}
}

private enum StateTransitionType {
// sets state from ERROR to INIT.
RESET,
// sets state from ANY to ERROR.
SET_TO_ERROR,
// Unknown StateTransitionType
UNDEFINED
}
@Override
public void resetPartition(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
logger.info("Reset partitions {} for resource {} on instance {} in cluster {}.",
partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName,
instanceName, clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
// check if the instance exists in the cluster
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
throw new HelixException(String.format(
(_zkClient.exists(instanceConfigPath) ? ResetPartitionFailureReason.INSTANCE_NOT_ALIVE
: ResetPartitionFailureReason.INSTANCE_NON_EXISTENT)
.getMessage(resourceName, partitionNames, instanceName, instanceName, clusterName)));
}

// check resource group exists
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
if (idealState == null) {
throw new HelixException(String.format(ResetPartitionFailureReason.RESOURCE_NON_EXISTENT
.getMessage(resourceName, partitionNames, instanceName, resourceName, clusterName)));
}

// check partition exists in resource group
Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
Set<String> partitions =
(idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ? idealState.getRecord()
.getMapFields().keySet() : idealState.getRecord().getListFields().keySet();
if (!partitions.containsAll(resetPartitionNames)) {
throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NON_EXISTENT
.getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(),
clusterName)));
}

// check partition is in ERROR state
String sessionId = liveInstance.getEphemeralOwner();
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
for (String partitionName : resetPartitionNames) {
if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NOT_ERROR
.getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(),
clusterName)));
}
}

// check stateModelDef exists and get initial state
String stateModelDef = idealState.getStateModelDefRef();
StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
if (stateModel == null) {
throw new HelixException(String.format(ResetPartitionFailureReason.STATE_MODEL_NON_EXISTENT
.getMessage(resourceName, partitionNames, instanceName, stateModelDef, clusterName)));
}

// check there is no pending messages for the partitions exist
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
for (Message message : messages) {
if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId
.equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName())
|| !resetPartitionNames.contains(message.getPartitionName())) {
continue;
}

throw new HelixException(String.format(
"Can't reset state for %s.%s on %s, because a pending message %s exists for resource %s",
resourceName, partitionNames, instanceName, message.toString(),
message.getResourceName()));
}

String adminName = null;
try {
adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
} catch (UnknownHostException e) {
// can ignore it
logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
adminName = "UNKNOWN";
}

List<Message> resetMessages = new ArrayList<Message>();
List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
for (String partitionName : resetPartitionNames) {
// send ERROR to initialState message
String msgId = UUID.randomUUID().toString();
Message message = new Message(MessageType.STATE_TRANSITION, msgId);
message.setSrcName(adminName);
message.setTgtName(instanceName);
message.setMsgState(MessageState.NEW);
message.setPartitionName(partitionName);
message.setResourceName(resourceName);
message.setTgtSessionId(sessionId);
message.setStateModelDef(stateModelDef);
message.setFromState(HelixDefinedState.ERROR.toString());
message.setToState(stateModel.getInitialState());
message.setStateModelFactoryName(idealState.getStateModelFactoryName());

if (idealState.getResourceGroupName() != null) {
message.setResourceGroupName(idealState.getResourceGroupName());
}
if (idealState.getInstanceGroupTag() != null) {
message.setResourceTag(idealState.getInstanceGroupTag());
}

resetMessages.add(message);
messageKeys.add(keyBuilder.message(instanceName, message.getId()));
}

accessor.setChildren(messageKeys, resetMessages);
sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, StateTransitionType.RESET);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ void postHandleMessage() {
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
_stateModelFactory.removeStateModel(_message.getResourceName(), partitionKey);
} else if (_stateModel.getCurrentState().equals(_message.getFromState())) {
} else if (_message.getFromState().equals("*")
|| _stateModel.getCurrentState().equals(_message.getFromState())) {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
// need this check because TaskRunner may change _stateModel before reach here.
_stateModel.updateState(toState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ private void reportMessageStat(HelixManager manager, Message message, HelixTaskR
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "--" + toState;
transition = transition.replaceAll("\\*", "ANY");

StateTransitionContext cxt =
new StateTransitionContext(manager.getClusterName(), manager.getInstanceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,15 @@ public void cancel() {
public boolean isCancelled() {
return _cancelled;
}

/*
* default transition to set partition in any state to error state
* @param message
* @param context
* @throws InterruptedException
*/
@Transition(to = "ERROR", from = "*")
public void onBecomeErrorFromAny(Message message, NotificationContext context) throws Exception {
logger.info("Default *->ERROR transition invoked.");
}
}
21 changes: 21 additions & 0 deletions helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public class ClusterSetup {
public static final String resetInstance = "resetInstance";
public static final String resetResource = "resetResource";

// set partitions to ERROR
public static final String setPartitionsToError = "setPartitionsToError";

// help
public static final String help = "help";

Expand Down Expand Up @@ -1114,6 +1117,13 @@ private static Options constructCommandLineOptions() {
removeCloudConfigOption.setRequired(false);
removeCloudConfigOption.setArgName("clusterName");

Option setPartitionsToErrorOption =
OptionBuilder.withLongOpt(setPartitionsToError)
.withDescription("Set a Partition to Error State").create();
setPartitionsToErrorOption.setArgs(4);
setPartitionsToErrorOption.setRequired(false);
setPartitionsToErrorOption.setArgName("clusterName instanceName resourceName partitionName");

OptionGroup group = new OptionGroup();
group.setRequired(true);
group.addOption(rebalanceOption);
Expand Down Expand Up @@ -1153,6 +1163,7 @@ private static Options constructCommandLineOptions() {
group.addOption(listStateModelOption);
group.addOption(addResourcePropertyOption);
group.addOption(removeResourcePropertyOption);
group.addOption(setPartitionsToErrorOption);

// set/get/remove config options
group.addOption(setConfOption);
Expand Down Expand Up @@ -1561,6 +1572,16 @@ public static int processCommandLineArgs(String[] cliArgs) throws Exception {
String newInstanceName = cmd.getOptionValues(swapInstance)[2];

setupTool.swapInstance(clusterName, oldInstanceName, newInstanceName);
} else if (cmd.hasOption(setPartitionsToError)) {
String[] args = cmd.getOptionValues(setPartitionsToError);

String clusterName = args[0];
String instanceName = args[1];
String resourceName = args[2];
List<String> partitionNames = Arrays.asList(Arrays.copyOfRange(args, 3, args.length));

setupTool.getClusterManagementTool().setPartitionsToError(clusterName, instanceName, resourceName, partitionNames);
return 0;
}
// set/get/remove config options
else if (cmd.hasOption(setConfig)) {
Expand Down
Loading

0 comments on commit c3c2cee

Please sign in to comment.