diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 3e715e368eb..15e6f76e788 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -276,7 +276,7 @@ private void startMonitorThread() { /** Dedicated Thread pool can be provided in configuration or by client. * This method is to check it and update the thread pool if necessary. */ - private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) { + public void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) { if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) { return; } @@ -488,11 +488,17 @@ public boolean scheduleTask(MessageTask task) { "Message handling task already sheduled for " + taskId, manager); } } - } catch (Exception e) { - LOG.error("Error while executing task. " + message, e); - _statusUpdateUtil - .logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e, - manager); + } catch (Throwable e) { + String errorMessage = "Error while executing task. " + message; + LOG.error(errorMessage, e); + if (e instanceof Exception) { + _statusUpdateUtil + .logError(message, HelixTaskExecutor.class, new Exception(e), errorMessage, + manager); + } else { + _statusUpdateUtil + .logError(message, HelixTaskExecutor.class, errorMessage, manager); + } } return false; } diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java index 08885784ecf..38662824e6f 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java @@ -23,6 +23,7 @@ import org.apache.helix.messaging.handling.AsyncCallbackService; import org.apache.helix.messaging.handling.HelixStateTransitionHandler; import org.apache.helix.messaging.handling.HelixTask; +import org.apache.helix.messaging.handling.HelixTaskExecutor; import org.apache.helix.mock.MockManager; import org.apache.helix.mock.participant.MockHelixTaskExecutor; import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel; @@ -32,65 +33,102 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.tools.StateModelConfigGenerator; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; import org.testng.AssertJUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + public class TestHelixTaskExecutor { - @Test() - public void testCMTaskExecutor() throws Exception { - System.out.println("START TestCMTaskExecutor"); - String msgId = "TestMessageId"; - Message message = new Message(MessageType.TASK_REPLY, msgId); - - message.setMsgId(msgId); - message.setSrcName("cm-instance-0"); - message.setTgtName("cm-instance-1"); - message.setTgtSessionId("1234"); - message.setFromState("Offline"); - message.setToState("Slave"); - message.setPartitionName("TestDB_0"); - message.setResourceName("TestDB"); - message.setStateModelDef("MasterSlave"); - - MockManager manager = new MockManager("clusterName"); - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - StateModelDefinition stateModelDef = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); - Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); - - MockHelixTaskExecutor executor = new MockHelixTaskExecutor(); - MockMasterSlaveStateModel stateModel = new MockMasterSlaveStateModel(); - executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(), - new AsyncCallbackService()); - - NotificationContext context = new NotificationContext(manager); - CurrentState currentStateDelta = new CurrentState("TestDB"); - currentStateDelta.setState("TestDB_0", "OFFLINE"); - - StateModelFactory stateModelFactory = new StateModelFactory() { - - @Override - public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) { - // TODO Auto-generated method stub - return new MockMasterSlaveStateModel(); - } - - }; - HelixStateTransitionHandler handler = - new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, - currentStateDelta); - - HelixTask task = new HelixTask(message, context, handler, executor); - executor.scheduleTask(task); - for (int i = 0; i < 10; i++) { - if (!executor.isDone(task.getTaskId())) { - Thread.sleep(500); - } + @Spy + private HelixTaskExecutor _helixTaskExecutor; + private MockHelixTaskExecutor executor; + private MockMasterSlaveStateModel stateModel; + private HelixTask task; + + @BeforeMethod + public void beforeMethod() { + MockitoAnnotations.initMocks(this); + task = getHelixTask(); + } + + public HelixTask getHelixTask() { + String msgId = "TestMessageId"; + Message message = new Message(MessageType.TASK_REPLY, msgId); + + message.setMsgId(msgId); + message.setSrcName("cm-instance-0"); + message.setTgtName("cm-instance-1"); + message.setTgtSessionId("1234"); + message.setFromState("Offline"); + message.setToState("Slave"); + message.setPartitionName("TestDB_0"); + message.setResourceName("TestDB"); + message.setStateModelDef("MasterSlave"); + + MockManager manager = new MockManager("clusterName"); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + StateModelDefinition stateModelDef = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); + + executor = new MockHelixTaskExecutor(); + stateModel = new MockMasterSlaveStateModel(); + executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(), + new AsyncCallbackService()); + + NotificationContext context = new NotificationContext(manager); + CurrentState currentStateDelta = new CurrentState("TestDB"); + currentStateDelta.setState("TestDB_0", "OFFLINE"); + + StateModelFactory stateModelFactory = new StateModelFactory() { + + @Override + public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) { + // TODO Auto-generated method stub + return new MockMasterSlaveStateModel(); + } + + }; + HelixStateTransitionHandler handler = + new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, + currentStateDelta); + + return new HelixTask(message, context, handler, executor); + } + + @DataProvider(name = "throwableClass") + public static Object[][] throwableClass() { + return new Object[][]{ + {OutOfMemoryError.class}, + {IllegalStateException.class} + }; + } + + @Test(dataProvider = "throwableClass") + public void testThrowableHandlingOnMessageProcess(Class throwable) { + doThrow(throwable).when(_helixTaskExecutor).updateStateTransitionMessageThreadPool(any(), any()); + boolean isScheduled = _helixTaskExecutor.scheduleTask(task); + AssertJUnit.assertFalse(isScheduled); + } + + @Test() + public void testCMTaskExecutor() throws Exception { + System.out.println("START TestCMTaskExecutor"); + executor.scheduleTask(task); + for (int i = 0; i < 10; i++) { + if (!executor.isDone(task.getTaskId())) { + Thread.sleep(500); + } + } + AssertJUnit.assertTrue(stateModel.stateModelInvoked); + System.out.println("END TestCMTaskExecutor"); } - AssertJUnit.assertTrue(stateModel.stateModelInvoked); - System.out.println("END TestCMTaskExecutor"); - } }