Skip to content

Commit

Permalink
Handling throwable during Helix message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
csudharsanan committed Nov 8, 2023
1 parent 7f2a88d commit 87b6192
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private void startMonitorThread() {
/** Dedicated Thread pool can be provided in configuration or by client.
* This method is to check it and update the thread pool if necessary.
*/
private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
public void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
return;
}
Expand Down Expand Up @@ -488,11 +488,17 @@ public boolean scheduleTask(MessageTask task) {
"Message handling task already sheduled for " + taskId, manager);
}
}
} catch (Exception e) {
LOG.error("Error while executing task. " + message, e);
_statusUpdateUtil
.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e,
manager);
} catch (Throwable e) {
String errorMessage = "Error while executing task. " + message;
LOG.error(errorMessage, e);
if (e instanceof Exception) {
_statusUpdateUtil
.logError(message, HelixTaskExecutor.class, new Exception(e), errorMessage,
manager);
} else {
_statusUpdateUtil
.logError(message, HelixTaskExecutor.class, errorMessage, manager);
}
}
return false;
}
Expand Down
146 changes: 92 additions & 54 deletions helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
import org.apache.helix.messaging.handling.HelixTask;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.mock.MockManager;
import org.apache.helix.mock.participant.MockHelixTaskExecutor;
import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel;
Expand All @@ -32,65 +33,102 @@
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;

public class TestHelixTaskExecutor {

@Test()
public void testCMTaskExecutor() throws Exception {
System.out.println("START TestCMTaskExecutor");
String msgId = "TestMessageId";
Message message = new Message(MessageType.TASK_REPLY, msgId);

message.setMsgId(msgId);
message.setSrcName("cm-instance-0");
message.setTgtName("cm-instance-1");
message.setTgtSessionId("1234");
message.setFromState("Offline");
message.setToState("Slave");
message.setPartitionName("TestDB_0");
message.setResourceName("TestDB");
message.setStateModelDef("MasterSlave");

MockManager manager = new MockManager("clusterName");
HelixDataAccessor accessor = manager.getHelixDataAccessor();
StateModelDefinition stateModelDef =
new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef);

MockHelixTaskExecutor executor = new MockHelixTaskExecutor();
MockMasterSlaveStateModel stateModel = new MockMasterSlaveStateModel();
executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
new AsyncCallbackService());

NotificationContext context = new NotificationContext(manager);
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState("TestDB_0", "OFFLINE");

StateModelFactory<MockMasterSlaveStateModel> stateModelFactory = new StateModelFactory<MockMasterSlaveStateModel>() {

@Override
public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockMasterSlaveStateModel();
}

};
HelixStateTransitionHandler handler =
new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
currentStateDelta);

HelixTask task = new HelixTask(message, context, handler, executor);
executor.scheduleTask(task);
for (int i = 0; i < 10; i++) {
if (!executor.isDone(task.getTaskId())) {
Thread.sleep(500);
}
@Spy
private HelixTaskExecutor _helixTaskExecutor;
private MockHelixTaskExecutor executor;
private MockMasterSlaveStateModel stateModel;
private HelixTask task;

@BeforeMethod
public void beforeMethod() {
MockitoAnnotations.initMocks(this);
task = getHelixTask();
}

public HelixTask getHelixTask() {
String msgId = "TestMessageId";
Message message = new Message(MessageType.TASK_REPLY, msgId);

message.setMsgId(msgId);
message.setSrcName("cm-instance-0");
message.setTgtName("cm-instance-1");
message.setTgtSessionId("1234");
message.setFromState("Offline");
message.setToState("Slave");
message.setPartitionName("TestDB_0");
message.setResourceName("TestDB");
message.setStateModelDef("MasterSlave");

MockManager manager = new MockManager("clusterName");
HelixDataAccessor accessor = manager.getHelixDataAccessor();
StateModelDefinition stateModelDef =
new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef);

executor = new MockHelixTaskExecutor();
stateModel = new MockMasterSlaveStateModel();
executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
new AsyncCallbackService());

NotificationContext context = new NotificationContext(manager);
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState("TestDB_0", "OFFLINE");

StateModelFactory<MockMasterSlaveStateModel> stateModelFactory = new StateModelFactory<MockMasterSlaveStateModel>() {

@Override
public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockMasterSlaveStateModel();
}

};
HelixStateTransitionHandler handler =
new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
currentStateDelta);

return new HelixTask(message, context, handler, executor);
}

@DataProvider(name = "throwableClass")
public static Object[][] throwableClass() {
return new Object[][]{
{OutOfMemoryError.class},
{IllegalStateException.class}
};
}

@Test(dataProvider = "throwableClass")
public void testThrowableHandlingOnMessageProcess(Class throwable) {
doThrow(throwable).when(_helixTaskExecutor).updateStateTransitionMessageThreadPool(any(), any());
boolean isScheduled = _helixTaskExecutor.scheduleTask(task);
AssertJUnit.assertFalse(isScheduled);
}

@Test()
public void testCMTaskExecutor() throws Exception {
System.out.println("START TestCMTaskExecutor");
executor.scheduleTask(task);
for (int i = 0; i < 10; i++) {
if (!executor.isDone(task.getTaskId())) {
Thread.sleep(500);
}
}
AssertJUnit.assertTrue(stateModel.stateModelInvoked);
System.out.println("END TestCMTaskExecutor");
}
AssertJUnit.assertTrue(stateModel.stateModelInvoked);
System.out.println("END TestCMTaskExecutor");
}

}

0 comments on commit 87b6192

Please sign in to comment.