diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 3e715e368e..7b26230675 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.Criteria; @@ -276,7 +277,8 @@ private void startMonitorThread() { /** Dedicated Thread pool can be provided in configuration or by client. * This method is to check it and update the thread pool if necessary. */ - private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) { + @VisibleForTesting + void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) { if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) { return; } @@ -488,10 +490,10 @@ public boolean scheduleTask(MessageTask task) { "Message handling task already sheduled for " + taskId, manager); } } - } catch (Exception e) { - LOG.error("Error while executing task. " + message, e); + } catch (Throwable t) { + LOG.error("Error while executing task. " + message, t); _statusUpdateUtil - .logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e, + .logError(message, HelixTaskExecutor.class, "Error while executing task " + t, manager); } return false; diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java deleted file mode 100644 index 08885784ec..0000000000 --- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java +++ /dev/null @@ -1,96 +0,0 @@ -package org.apache.helix; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.messaging.handling.AsyncCallbackService; -import org.apache.helix.messaging.handling.HelixStateTransitionHandler; -import org.apache.helix.messaging.handling.HelixTask; -import org.apache.helix.mock.MockManager; -import org.apache.helix.mock.participant.MockHelixTaskExecutor; -import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.Message; -import org.apache.helix.model.Message.MessageType; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.participant.statemachine.StateModelFactory; -import org.apache.helix.tools.StateModelConfigGenerator; -import org.testng.AssertJUnit; -import org.testng.annotations.Test; - -public class TestHelixTaskExecutor { - - @Test() - public void testCMTaskExecutor() throws Exception { - System.out.println("START TestCMTaskExecutor"); - String msgId = "TestMessageId"; - Message message = new Message(MessageType.TASK_REPLY, msgId); - - message.setMsgId(msgId); - message.setSrcName("cm-instance-0"); - message.setTgtName("cm-instance-1"); - message.setTgtSessionId("1234"); - message.setFromState("Offline"); - message.setToState("Slave"); - message.setPartitionName("TestDB_0"); - message.setResourceName("TestDB"); - message.setStateModelDef("MasterSlave"); - - MockManager manager = new MockManager("clusterName"); - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - StateModelDefinition stateModelDef = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); - Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); - - MockHelixTaskExecutor executor = new MockHelixTaskExecutor(); - MockMasterSlaveStateModel stateModel = new MockMasterSlaveStateModel(); - executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(), - new AsyncCallbackService()); - - NotificationContext context = new NotificationContext(manager); - CurrentState currentStateDelta = new CurrentState("TestDB"); - currentStateDelta.setState("TestDB_0", "OFFLINE"); - - StateModelFactory stateModelFactory = new StateModelFactory() { - - @Override - public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) { - // TODO Auto-generated method stub - return new MockMasterSlaveStateModel(); - } - - }; - HelixStateTransitionHandler handler = - new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, - currentStateDelta); - - HelixTask task = new HelixTask(message, context, handler, executor); - executor.scheduleTask(task); - for (int i = 0; i < 10; i++) { - if (!executor.isDone(task.getTaskId())) { - Thread.sleep(500); - } - } - AssertJUnit.assertTrue(stateModel.stateModelInvoked); - System.out.println("END TestCMTaskExecutor"); - } - -} diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestCMTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestCMTaskExecutor.java new file mode 100644 index 0000000000..2f1f731105 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestCMTaskExecutor.java @@ -0,0 +1,132 @@ +package org.apache.helix.messaging.handling; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.mock.MockManager; +import org.apache.helix.mock.participant.MockHelixTaskExecutor; +import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.tools.StateModelConfigGenerator; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import org.testng.AssertJUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + +public class TestCMTaskExecutor { + + @Spy + private HelixTaskExecutor _helixTaskExecutor; + private MockHelixTaskExecutor executor; + private MockMasterSlaveStateModel stateModel; + private HelixTask task; + + @BeforeMethod + public void beforeMethod() { + MockitoAnnotations.initMocks(this); + task = getHelixTask(); + } + + public HelixTask getHelixTask() { + String msgId = "TestMessageId"; + Message message = new Message(MessageType.TASK_REPLY, msgId); + + message.setMsgId(msgId); + message.setSrcName("cm-instance-0"); + message.setTgtName("cm-instance-1"); + message.setTgtSessionId("1234"); + message.setFromState("Offline"); + message.setToState("Slave"); + message.setPartitionName("TestDB_0"); + message.setResourceName("TestDB"); + message.setStateModelDef("MasterSlave"); + + MockManager manager = new MockManager("clusterName"); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + StateModelDefinition stateModelDef = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); + + executor = new MockHelixTaskExecutor(); + stateModel = new MockMasterSlaveStateModel(); + executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(), + new AsyncCallbackService()); + + NotificationContext context = new NotificationContext(manager); + CurrentState currentStateDelta = new CurrentState("TestDB"); + currentStateDelta.setState("TestDB_0", "OFFLINE"); + + StateModelFactory stateModelFactory = new StateModelFactory() { + + @Override + public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) { + // TODO Auto-generated method stub + return new MockMasterSlaveStateModel(); + } + + }; + HelixStateTransitionHandler handler = + new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, + currentStateDelta); + + return new HelixTask(message, context, handler, executor); + } + + @DataProvider(name = "throwableClass") + public static Object[][] throwableClass() { + return new Object[][]{ + {OutOfMemoryError.class}, + {IllegalStateException.class} + }; + } + + @Test(dataProvider = "throwableClass") + public void testThrowableHandlingOnMessageProcess(Class throwable) { + doThrow(throwable).when(_helixTaskExecutor).updateStateTransitionMessageThreadPool(any(), any()); + boolean isScheduled = _helixTaskExecutor.scheduleTask(task); + AssertJUnit.assertFalse(isScheduled); + } + + @Test() + public void testCMTaskExecutor() throws Exception { + System.out.println("START TestCMTaskExecutor"); + executor.scheduleTask(task); + for (int i = 0; i < 10; i++) { + if (!executor.isDone(task.getTaskId())) { + Thread.sleep(500); + } + } + AssertJUnit.assertTrue(stateModel.stateModelInvoked); + System.out.println("END TestCMTaskExecutor"); + } + +}