diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index d2e0c2681a..84a7154b18 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -421,6 +421,18 @@ void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String r */ ClusterManagementMode getClusterManagementMode(String clusterName); + /** + * Set a list of partitions for an instance to ERROR state from any state. + * The partitions could be in any state and setPartitionsToError will bring them to ERROR + * state. ANY to ERROR state transition is required for this. + * @param clusterName + * @param instanceName + * @param resourceName + * @param partitionNames + */ + void setPartitionsToError(String clusterName, String instanceName, String resourceName, + List partitionNames); + /** * Reset a list of partitions in error state for an instance * The partitions are assume to be in error state and reset will bring them from error diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index c7fe0861ba..754d02d386 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -1035,6 +1035,135 @@ public ClusterManagementMode getClusterManagementMode(String clusterName) { : new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus()); } + @Override + public void setPartitionsToError(String clusterName, String instanceName, String resourceName, + List partitionNames) { + logger.info("Set partitions {} for resource {} on instance {} in cluster {} to ERROR state.", + partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName, + instanceName, clusterName); + sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, + Boolean.FALSE); + } + + private void sendStateTransitionMessage(String clusterName, String instanceName, + String resourceName, List partitionNames, Boolean reset) { + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check the instance is alive + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + // check if the instance exists in the cluster + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + throw new HelixException(String.format( + (_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE + : SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName, + partitionNames, instanceName, instanceName, clusterName, reset))); + } + + // check resource exists in ideal state + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); + if (idealState == null) { + throw new HelixException( + String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName, + partitionNames, instanceName, resourceName, clusterName, reset))); + } + + // check partition exists in resource + Set partitionsNames = new HashSet(partitionNames); + Set partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) + ? idealState.getRecord().getMapFields().keySet() + : idealState.getRecord().getListFields().keySet(); + if (!partitions.containsAll(partitionsNames)) { + throw new HelixException( + String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName, + partitionNames, instanceName, partitionNames.toString(), clusterName, reset))); + } + + // check partition is in ERROR state if reset is set to True + String sessionId = liveInstance.getEphemeralOwner(); + CurrentState curState = + accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + if (reset.equals(Boolean.TRUE)) { + for (String partitionName : partitionNames) { + if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) { + throw new HelixException(String.format( + SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName, partitionNames, + instanceName, partitionNames.toString(), clusterName, Boolean.TRUE))); + } + } + } + + // check stateModelDef exists + String stateModelDef = idealState.getStateModelDefRef(); + StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); + if (stateModel == null) { + throw new HelixException( + String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName, + partitionNames, instanceName, stateModelDef, clusterName, reset))); + } + + // check there is no pending messages for the partitions exist + List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); + for (Message message : messages) { + if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) + || !sessionId.equals(message.getTgtSessionId()) + || !resourceName.equals(message.getResourceName()) + || !partitionsNames.contains(message.getPartitionName())) { + continue; + } + + throw new HelixException(String.format( + "Can't %s state for %s.%s on %s, because a pending message %s exists for resource %s", + reset.equals(Boolean.TRUE) ? "reset" : "set to ERROR", resourceName, partitionNames, + instanceName, message, message.getResourceName())); + } + + String adminName = null; + try { + adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; + } catch (UnknownHostException e) { + logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e); + adminName = "UNKNOWN"; + } + + List stateTransitionMessages = new ArrayList(); + List messageKeys = new ArrayList(); + for (String partitionName : partitionNames) { + String msgId = UUID.randomUUID().toString(); + Message message = new Message(MessageType.STATE_TRANSITION, msgId); + message.setSrcName(adminName); + message.setTgtName(instanceName); + message.setMsgState(MessageState.NEW); + message.setPartitionName(partitionName); + message.setResourceName(resourceName); + message.setTgtSessionId(sessionId); + message.setStateModelDef(stateModelDef); + message.setStateModelFactoryName(idealState.getStateModelFactoryName()); + // if reset == TRUE, send ERROR to initialState message + // else, send * to ERROR state message + if (reset.equals(Boolean.TRUE)) { + message.setFromState(HelixDefinedState.ERROR.toString()); + message.setToState(stateModel.getInitialState()); + } else { + message.setFromState("*"); + message.setToState(HelixDefinedState.ERROR.toString()); + } + if (idealState.getResourceGroupName() != null) { + message.setResourceGroupName(idealState.getResourceGroupName()); + } + if (idealState.getInstanceGroupTag() != null) { + message.setResourceTag(idealState.getInstanceGroupTag()); + } + + stateTransitionMessages.add(message); + messageKeys.add(keyBuilder.message(instanceName, message.getId())); + } + + accessor.setChildren(messageKeys, stateTransitionMessages); + } + private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) { String hostname = NetworkUtil.getLocalhostName(); logger.info( @@ -1180,7 +1309,7 @@ private void processMaintenanceMode(String clusterName, final boolean enabled, } } - private enum ResetPartitionFailureReason { + private enum SetPartitionFailureReason { INSTANCE_NOT_ALIVE("%s is not alive in cluster %s"), INSTANCE_NON_EXISTENT("%s does not exist in cluster %s"), RESOURCE_NON_EXISTENT("resource %s is not added to cluster %s"), @@ -1190,13 +1319,13 @@ private enum ResetPartitionFailureReason { private String message; - ResetPartitionFailureReason(String message) { + SetPartitionFailureReason(String message) { this.message = message; } public String getMessage(String resourceName, List partitionNames, String instanceName, - String errorStateEntity, String clusterName) { - return String.format("Can't reset state for %s.%s on %s, because " + message, resourceName, + String errorStateEntity, String clusterName, Boolean reset) { + return String.format("Can't %s state for %s.%s on %s, because " + message, reset == Boolean.TRUE ? "reset" : "set to Error", resourceName, partitionNames, instanceName, errorStateEntity, clusterName); } } @@ -1207,112 +1336,7 @@ public void resetPartition(String clusterName, String instanceName, String resou logger.info("Reset partitions {} for resource {} on instance {} in cluster {}.", partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName, instanceName, clusterName); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - - // check the instance is alive - LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); - if (liveInstance == null) { - // check if the instance exists in the cluster - String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - throw new HelixException(String.format( - (_zkClient.exists(instanceConfigPath) ? ResetPartitionFailureReason.INSTANCE_NOT_ALIVE - : ResetPartitionFailureReason.INSTANCE_NON_EXISTENT) - .getMessage(resourceName, partitionNames, instanceName, instanceName, clusterName))); - } - - // check resource group exists - IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); - if (idealState == null) { - throw new HelixException(String.format(ResetPartitionFailureReason.RESOURCE_NON_EXISTENT - .getMessage(resourceName, partitionNames, instanceName, resourceName, clusterName))); - } - - // check partition exists in resource group - Set resetPartitionNames = new HashSet(partitionNames); - Set partitions = - (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ? idealState.getRecord() - .getMapFields().keySet() : idealState.getRecord().getListFields().keySet(); - if (!partitions.containsAll(resetPartitionNames)) { - throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NON_EXISTENT - .getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(), - clusterName))); - } - - // check partition is in ERROR state - String sessionId = liveInstance.getEphemeralOwner(); - CurrentState curState = - accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); - for (String partitionName : resetPartitionNames) { - if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) { - throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NOT_ERROR - .getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(), - clusterName))); - } - } - - // check stateModelDef exists and get initial state - String stateModelDef = idealState.getStateModelDefRef(); - StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); - if (stateModel == null) { - throw new HelixException(String.format(ResetPartitionFailureReason.STATE_MODEL_NON_EXISTENT - .getMessage(resourceName, partitionNames, instanceName, stateModelDef, clusterName))); - } - - // check there is no pending messages for the partitions exist - List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); - for (Message message : messages) { - if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId - .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) - || !resetPartitionNames.contains(message.getPartitionName())) { - continue; - } - - throw new HelixException(String.format( - "Can't reset state for %s.%s on %s, because a pending message %s exists for resource %s", - resourceName, partitionNames, instanceName, message.toString(), - message.getResourceName())); - } - - String adminName = null; - try { - adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; - } catch (UnknownHostException e) { - // can ignore it - logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e); - adminName = "UNKNOWN"; - } - - List resetMessages = new ArrayList(); - List messageKeys = new ArrayList(); - for (String partitionName : resetPartitionNames) { - // send ERROR to initialState message - String msgId = UUID.randomUUID().toString(); - Message message = new Message(MessageType.STATE_TRANSITION, msgId); - message.setSrcName(adminName); - message.setTgtName(instanceName); - message.setMsgState(MessageState.NEW); - message.setPartitionName(partitionName); - message.setResourceName(resourceName); - message.setTgtSessionId(sessionId); - message.setStateModelDef(stateModelDef); - message.setFromState(HelixDefinedState.ERROR.toString()); - message.setToState(stateModel.getInitialState()); - message.setStateModelFactoryName(idealState.getStateModelFactoryName()); - - if (idealState.getResourceGroupName() != null) { - message.setResourceGroupName(idealState.getResourceGroupName()); - } - if (idealState.getInstanceGroupTag() != null) { - message.setResourceTag(idealState.getInstanceGroupTag()); - } - - resetMessages.add(message); - messageKeys.add(keyBuilder.message(instanceName, message.getId())); - } - - accessor.setChildren(messageKeys, resetMessages); + sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, Boolean.TRUE); } @Override diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index 0d67ced4b3..0a91370b07 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -176,7 +176,8 @@ void postHandleMessage() { deltaList.add(delta); _currentStateDelta.setDeltaList(deltaList); _stateModelFactory.removeStateModel(_message.getResourceName(), partitionKey); - } else if (_stateModel.getCurrentState().equals(_message.getFromState())) { + } else if (_message.getFromState().equals("*") + || _stateModel.getCurrentState().equals(_message.getFromState())) { // if the partition is not to be dropped, update _stateModel to the TO_STATE // need this check because TaskRunner may change _stateModel before reach here. _stateModel.updateState(toState); diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java index 143c14adef..5bb2a19c86 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java @@ -115,4 +115,15 @@ public void cancel() { public boolean isCancelled() { return _cancelled; } + + /* + * default transition to set partition in any state to error state + * @param message + * @param context + * @throws InterruptedException + */ + @Transition(to = "ERROR", from = "*") + public void onBecomeErrorFromAny(Message message, NotificationContext context) throws Exception { + logger.info("Default *->ERROR transition invoked."); + } } diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 633ce0341f..a9578632b0 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -134,6 +134,9 @@ public class ClusterSetup { public static final String resetInstance = "resetInstance"; public static final String resetResource = "resetResource"; + // set partitions to ERROR + public static final String setPartitionsToError = "setPartitionsToError"; + // help public static final String help = "help"; @@ -1114,6 +1117,13 @@ private static Options constructCommandLineOptions() { removeCloudConfigOption.setRequired(false); removeCloudConfigOption.setArgName("clusterName"); + Option setPartitionsToErrorOption = + OptionBuilder.withLongOpt(setPartitionsToError) + .withDescription("Set a Partition to Error State").create(); + setPartitionsToErrorOption.setArgs(4); + setPartitionsToErrorOption.setRequired(false); + setPartitionsToErrorOption.setArgName("clusterName instanceName resourceName partitionName"); + OptionGroup group = new OptionGroup(); group.setRequired(true); group.addOption(rebalanceOption); @@ -1153,6 +1163,7 @@ private static Options constructCommandLineOptions() { group.addOption(listStateModelOption); group.addOption(addResourcePropertyOption); group.addOption(removeResourcePropertyOption); + group.addOption(setPartitionsToErrorOption); // set/get/remove config options group.addOption(setConfOption); @@ -1561,6 +1572,16 @@ public static int processCommandLineArgs(String[] cliArgs) throws Exception { String newInstanceName = cmd.getOptionValues(swapInstance)[2]; setupTool.swapInstance(clusterName, oldInstanceName, newInstanceName); + } else if (cmd.hasOption(setPartitionsToError)) { + String[] args = cmd.getOptionValues(setPartitionsToError); + + String clusterName = args[0]; + String instanceName = args[1]; + String resourceName = args[2]; + List partitionNames = Arrays.asList(Arrays.copyOfRange(args, 3, args.length)); + + setupTool.getClusterManagementTool().setPartitionsToError(clusterName, instanceName, resourceName, partitionNames); + return 0; } // set/get/remove config options else if (cmd.hasOption(setConfig)) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java new file mode 100644 index 0000000000..5b13703b6f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java @@ -0,0 +1,99 @@ +package org.apache.helix.integration; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestSetPartitionsToErrorState extends ZkTestBase { + + @Test() + public void testSetPartitionsToErrorState() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 5; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 10, // partitions per resource + n, // number of nodes + 3, // replicas + "MasterSlave", true); // do rebalance + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start mock participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + // verify cluster + HashMap> errStateMap = new HashMap<>(); + errStateMap.put("TestDB0", new HashMap<>()); + boolean result = ClusterStateVerifier.verifyByZkCallback( + (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap))); + Assert.assertTrue(result, "Cluster verification fails"); + + // set a non exist partition to ERROR, should throw exception + try { + String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName + + " localhost_12918 TestDB0 TestDB0_nonExist"; + ClusterSetup.processCommandLineArgs(command.split("\\s+")); + Assert.fail("Should throw exception on setting a non-exist partition to error"); + } catch (Exception e) { + // OK + } + + // set one partition not in ERROR state to ERROR + String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName + + " localhost_12918 TestDB0 TestDB0_4"; + ClusterSetup.processCommandLineArgs(command.split("\\s+")); + errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918"); + result = ClusterStateVerifier.verifyByZkCallback( + (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap))); + Assert.assertTrue(result, "Cluster verification fails"); + + // set another partition not in ERROR state to ERROR + command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName + + " localhost_12918 TestDB0 TestDB0_7"; + ClusterSetup.processCommandLineArgs(command.split("\\s+")); + errStateMap.get("TestDB0").put("TestDB0_7", "localhost_12918"); + result = ClusterStateVerifier.verifyByZkCallback( + (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap))); + Assert.assertTrue(result, "Cluster verification fails"); + + // setting a partition already in ERROR state to ERROR - message does not get processed + command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName + + " localhost_12918 TestDB0 TestDB0_7"; + ClusterSetup.processCommandLineArgs(command.split("\\s+")); + result = ClusterStateVerifier.verifyByZkCallback( + (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap))); + Assert.assertTrue(result, "Cluster verification fails"); + + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + deleteCluster(clusterName); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index 59decd98e5..a695f69e65 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -589,6 +589,117 @@ public void testLegacyEnableDisablePartition() { 2); } + @Test(description = "Unit test for sanity check in setPartitionsToError()") + public void testSetPartitionsToError() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + String instanceName = "TestInstance"; + String testResource = "TestResource"; + String wrongTestInstance = "WrongTestInstance"; + String wrongTestResource = "WrongTestResource"; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.addCluster(clusterName, true); + admin.addInstance(clusterName, new InstanceConfig(instanceName)); + admin.enableInstance(clusterName, instanceName, true); + InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName); + + IdealState idealState = new IdealState(testResource); + idealState.setNumPartitions(3); + admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD()); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + admin.addResource(clusterName, testResource, idealState); + admin.enableResource(clusterName, testResource, true); + + /* + * This is a unit test for sanity check in setPartitionsToError(). + * There is no running controller in this test. We have end-to-end tests for + * setPartitionsToError() + * under integration/TestSetPartitionsToError. + */ + // setPartitionsToError is expected to throw an exception when provided with a nonexistent + // instance. + try { + admin.setPartitionsToError(clusterName, wrongTestInstance, testResource, + Arrays.asList("1", "2")); + Assert.fail("Should throw HelixException"); + } catch (HelixException expected) { + // This exception is expected because the instance name is made up. + Assert.assertEquals(expected.getMessage(), String.format( + "Can't set to Error State for %s.[1, 2] on WrongTestInstance, because %s does not exist in cluster %s", + testResource, wrongTestInstance, clusterName)); + } + + // setPartitionsToError is expected to throw an exception when provided with a non-live + // instance. + try { + admin.setPartitionsToError(clusterName, instanceName, testResource, Arrays.asList("1", "2")); + Assert.fail("Should throw HelixException"); + } catch (HelixException expected) { + // This exception is expected because the instance is not alive. + Assert.assertEquals(expected.getMessage(), + String.format( + "Can't set to Error State for %s.[1, 2] on %s, because %s is not alive in cluster %s", + testResource, instanceName, instanceName, clusterName)); + } + + HelixManager manager = initializeHelixManager(clusterName, instanceConfig.getInstanceName()); + manager.connect(); + + // setPartitionsToError is expected to throw an exception when provided with a nonexistent + // resource. + try { + admin.setPartitionsToError(clusterName, instanceName, wrongTestResource, + Arrays.asList("1", "2")); + Assert.fail("Should throw HelixException"); + } catch (HelixException expected) { + // This exception is expected because the resource is not added. + Assert.assertEquals(expected.getMessage(), String.format( + "Can't set to Error State for %s.[1, 2] on %s, because resource %s is not added to cluster %s", + wrongTestResource, instanceName, wrongTestResource, clusterName)); + } + + // setPartitionsToError is expected to throw an exception when partition does not exist. + try { + admin.setPartitionsToError(clusterName, instanceName, testResource, Arrays.asList("1", "2")); + Assert.fail("Should throw HelixException"); + } catch (HelixException expected) { + // This exception is expected because partitions do not exist. + Assert.assertEquals(expected.getMessage(), String.format( + "Can't set to Error State for %s.[1, 2] on %s, because not all [1, 2] exist in cluster %s", + testResource, instanceName, clusterName)); + } + + // clean up + manager.disconnect(); + admin.dropCluster(clusterName); + + // verify the cluster has been removed successfully + HelixDataAccessor dataAccessor = + new ZKHelixDataAccessor(className, new ZkBaseDataAccessor<>(_gZkClient)); + try { + Assert.assertTrue(TestHelper.verify( + () -> dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()).isEmpty(), + 1000)); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("There're live instances not cleaned up yet"); + assert false; + } + + try { + Assert.assertTrue(TestHelper.verify( + () -> dataAccessor.getChildNames(dataAccessor.keyBuilder().clusterConfig()).isEmpty(), + 1000)); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("The cluster is not cleaned up yet"); + assert false; + } + } + @Test public void testResetPartition() throws Exception { String className = TestHelper.getTestClassName(); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 9a1311b1c3..d9bc5d7fe6 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -364,6 +364,12 @@ public ClusterManagementMode getClusterManagementMode(String clusterName) { return null; } + @Override + public void setPartitionsToError(String clusterName, String instanceName, String resourceName, + List partitionNames) { + + } + @Override public void resetPartition(String clusterName, String instanceName, String resourceName, List partitionNames) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index ce3d27273e..fdad634afd 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -89,7 +89,8 @@ public enum Command { canCompleteSwap, completeSwapIfPossible, onDemandRebalance, - isEvacuateFinished + isEvacuateFinished, + setPartitionsToError } @Context diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index efeeee7f7e..ea98f66371 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -434,6 +434,16 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, OBJECT_MAPPER.getTypeFactory() .constructCollectionType(List.class, String.class))); break; + case setPartitionsToError: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not a match!"); + } + admin.setPartitionsToError(clusterId, instanceName, + node.get(PerInstanceProperties.resource.name()).textValue(), + (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER + .getTypeFactory().constructCollectionType(List.class, String.class))); + break; case setInstanceOperation: admin.setInstanceOperation(clusterId, instanceName, state); break; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 943444cad1..395f9bf858 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -37,11 +37,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.TestHelper; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; @@ -377,7 +379,7 @@ public void testDeleteInstance() { } @Test(dependsOnMethods = "testDeleteInstance") - public void updateInstance() throws IOException { + public void updateInstance() throws Exception { System.out.println("Start test :" + TestHelper.getTestMethodName()); // Disable instance Entity entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE); @@ -461,11 +463,11 @@ public void updateInstance() throws IOException { String dbName = "_db_0_"; List partitionsToDisable = Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "1", CLUSTER_NAME + dbName + "3"); + String RESOURCE_NAME = CLUSTER_NAME + dbName.substring(0, dbName.length() - 1); entity = Entity.entity( OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(), - INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), - CLUSTER_NAME + dbName.substring(0, dbName.length() - 1), + INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME, PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToDisable)), MediaType.APPLICATION_JSON_TYPE); @@ -474,13 +476,11 @@ public void updateInstance() throws IOException { InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); Assert.assertEquals( - new HashSet<>(instanceConfig.getDisabledPartitionsMap() - .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))), + new HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)), new HashSet<>(partitionsToDisable)); entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(ImmutableMap .of(AbstractResource.Properties.id.name(), INSTANCE_NAME, - PerInstanceAccessor.PerInstanceProperties.resource.name(), - CLUSTER_NAME + dbName.substring(0, dbName.length() - 1), + PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME, PerInstanceAccessor.PerInstanceProperties.partitions.name(), ImmutableList.of(CLUSTER_NAME + dbName + "1"))), MediaType.APPLICATION_JSON_TYPE); @@ -488,8 +488,7 @@ public void updateInstance() throws IOException { .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); - Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap() - .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))), + Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)), new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "3"))); // test set instance operation @@ -595,6 +594,32 @@ public void updateInstance() throws IOException { evacuateFinishedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class); Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertTrue(evacuateFinishedResult.get("successful")); + + // test setPartitionsToError + List partitionsToSetToError = Arrays.asList(CLUSTER_NAME + dbName + "7"); + + entity = Entity.entity( + OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(), + INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME, + PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToSetToError)), + MediaType.APPLICATION_JSON_TYPE); + + response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setPartitionsToError") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + + TestHelper.verify(() -> { + ExternalView externalView = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME); + Set responseForAllPartitions = new HashSet(); + for (String partition : partitionsToSetToError) { + responseForAllPartitions.add(externalView.getStateMap(partition) + .get(INSTANCE_NAME) == HelixDefinedState.ERROR.toString()); + } + return !responseForAllPartitions.contains(Boolean.FALSE); + }, TestHelper.WAIT_DURATION); + System.out.println("End test :" + TestHelper.getTestMethodName()); }