Skip to content

Commit

Permalink
Handling throwable during Helix message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
csudharsanan committed Dec 8, 2023
1 parent 7f2a88d commit ded7aa5
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
Expand Down Expand Up @@ -276,7 +277,8 @@ private void startMonitorThread() {
/** Dedicated Thread pool can be provided in configuration or by client.
* This method is to check it and update the thread pool if necessary.
*/
private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
@VisibleForTesting
void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
return;
}
Expand Down Expand Up @@ -488,10 +490,10 @@ public boolean scheduleTask(MessageTask task) {
"Message handling task already sheduled for " + taskId, manager);
}
}
} catch (Exception e) {
LOG.error("Error while executing task. " + message, e);
} catch (Throwable t) {
LOG.error("Error while executing task. " + message, t);
_statusUpdateUtil
.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e,
.logError(message, HelixTaskExecutor.class, "Error while executing task " + t,
manager);
}
return false;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.apache.helix.messaging.handling;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.mock.MockManager;
import org.apache.helix.mock.participant.MockHelixTaskExecutor;
import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;

public class TestCMTaskExecutor {

@Spy
private HelixTaskExecutor _helixTaskExecutor;
private MockHelixTaskExecutor executor;
private MockMasterSlaveStateModel stateModel;
private HelixTask task;

@BeforeMethod
public void beforeMethod() {
MockitoAnnotations.initMocks(this);
task = getHelixTask();
}

public HelixTask getHelixTask() {
String msgId = "TestMessageId";
Message message = new Message(MessageType.TASK_REPLY, msgId);

message.setMsgId(msgId);
message.setSrcName("cm-instance-0");
message.setTgtName("cm-instance-1");
message.setTgtSessionId("1234");
message.setFromState("Offline");
message.setToState("Slave");
message.setPartitionName("TestDB_0");
message.setResourceName("TestDB");
message.setStateModelDef("MasterSlave");

MockManager manager = new MockManager("clusterName");
HelixDataAccessor accessor = manager.getHelixDataAccessor();
StateModelDefinition stateModelDef =
new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef);

executor = new MockHelixTaskExecutor();
stateModel = new MockMasterSlaveStateModel();
executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
new AsyncCallbackService());

NotificationContext context = new NotificationContext(manager);
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState("TestDB_0", "OFFLINE");

StateModelFactory<MockMasterSlaveStateModel> stateModelFactory = new StateModelFactory<MockMasterSlaveStateModel>() {

@Override
public MockMasterSlaveStateModel createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockMasterSlaveStateModel();
}

};
HelixStateTransitionHandler handler =
new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
currentStateDelta);

return new HelixTask(message, context, handler, executor);
}

@DataProvider(name = "throwableClass")
public static Object[][] throwableClass() {
return new Object[][]{
{OutOfMemoryError.class},
{IllegalStateException.class}
};
}

@Test(dataProvider = "throwableClass")
public void testThrowableHandlingOnMessageProcess(Class throwable) {
doThrow(throwable).when(_helixTaskExecutor).updateStateTransitionMessageThreadPool(any(), any());
boolean isScheduled = _helixTaskExecutor.scheduleTask(task);
AssertJUnit.assertFalse(isScheduled);
}

@Test()
public void testCMTaskExecutor() throws Exception {
System.out.println("START TestCMTaskExecutor");
executor.scheduleTask(task);
for (int i = 0; i < 10; i++) {
if (!executor.isDone(task.getTaskId())) {
Thread.sleep(500);
}
}
AssertJUnit.assertTrue(stateModel.stateModelInvoked);
System.out.println("END TestCMTaskExecutor");
}

}

0 comments on commit ded7aa5

Please sign in to comment.