Skip to content

Commit

Permalink
Expose addListener and support listening on single instance config (#…
Browse files Browse the repository at this point in the history
…2752)

Expose addListener method in HelixManager to allow users to register any listeners that implement one of the Listener interfaces. Add support in CallbackHandler for InstanceConfigChangeListener to be used on a single instance config.
  • Loading branch information
zpinto authored Feb 27, 2024
1 parent c0fa6dc commit 6bdc888
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 17 deletions.
14 changes: 13 additions & 1 deletion helix-core/src/main/java/org/apache/helix/HelixManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

import org.apache.zookeeper.Watcher;

/**
* Class that represents the Helix Agent.
Expand Down Expand Up @@ -108,6 +108,18 @@ public interface HelixManager {
*/
void disconnect();

/**
* Add a change listener on the specified propertyKey for the specified
* changeType and eventTypes.
* @see org.apache.helix.api.listeners for the list of available listeners
* @param listener the listener to add
* @param propertyKey the property key to listen to
* @param changeType the type of change to listen to
* @param eventType the event type to listen for
*/
void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventType);

/**
* @see IdealStateChangeListener#onIdealStateChange(List, NotificationContext)
* @param listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,15 @@ public void invoke(NotificationContext changeContext) throws Exception {
configChangeListener.onConfigChange(configs, changeContext);
} else if (_listener instanceof InstanceConfigChangeListener) {
InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
List<InstanceConfig> configs = preFetch(_propertyKey);
List<InstanceConfig> configs = Collections.emptyList();
if (_propertyKey.getParams().length > 2 && _preFetchEnabled) {
// If there are more than 2 params, that means the property key is for a specific instance
// and will not have children.
InstanceConfig config = _accessor.getProperty(_propertyKey);
configs = config != null ? Collections.singletonList(config) : Collections.emptyList();
} else {
configs = preFetch(_propertyKey);
}
listener.onInstanceConfigChange(configs, changeContext);
}
} else if (_changeType == RESOURCE_CONFIG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ void checkConnected(long timeout) {
}
}

void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
@Override
public void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
EventType[] eventType) {
checkConnected(_waitForConnectedTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.Watcher;

public class MockCloudEventAwareHelixManager implements HelixManager {
private final HelixManagerProperty _helixManagerProperty;
Expand Down Expand Up @@ -110,6 +112,11 @@ public boolean isConnected() {
return false;
}

@Override
public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventType) {
}

@Override
public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.zookeeper.Watcher;

public class DummyClusterManager implements HelixManager {
HelixDataAccessor _accessor;
Expand Down Expand Up @@ -303,6 +305,11 @@ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
return null;
}

@Override
public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventType) {
}

@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
Expand Down Expand Up @@ -54,6 +57,7 @@
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -546,12 +550,16 @@ public void testNodeSwap() throws Exception {
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());

CustomIndividualInstanceConfigChangeListener instanceToSwapInInstanceConfigListener =
new CustomIndividualInstanceConfigChangeListener();
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
InstanceConstants.InstanceOperation.SWAP_IN, true, -1, instanceToSwapInInstanceConfigListener);

Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());

// Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
// and adding the SWAP_IN instance to the cluster.
Expand Down Expand Up @@ -583,6 +591,9 @@ public void testNodeSwap() throws Exception {
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());

// Check to make sure the throttle was enabled again after the swap was completed.
Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());

// Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Expand Down Expand Up @@ -1095,7 +1106,7 @@ public void testSwapEvacuateAdd() throws Exception {
}

@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWithSwapOutInstanceOffline")
public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+ new Date(System.currentTimeMillis()));
Expand All @@ -1113,7 +1124,7 @@ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
}

@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
+ new Date(System.currentTimeMillis()));
Expand All @@ -1136,7 +1147,7 @@ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
}

@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() {
public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() throws Exception {
System.out.println(
"START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at "
+ new Date(System.currentTimeMillis()));
Expand Down Expand Up @@ -1380,7 +1391,7 @@ public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
}

@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster")
public void testSwapEvacuateAddRemoveEvacuate() {
public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
System.currentTimeMillis()));
removeOfflineOrDisabledOrSwapInInstances();
Expand Down Expand Up @@ -1426,18 +1437,54 @@ private static void verifier(TestHelper.Verifier verifier, long timeout) throws
}, timeout));
}

private MockParticipantManager createParticipant(String participantName) {
private static class CustomIndividualInstanceConfigChangeListener implements InstanceConfigChangeListener {
private boolean throttlesEnabled;

public CustomIndividualInstanceConfigChangeListener() {
throttlesEnabled = true;
}

public boolean isThrottlesEnabled() {
return throttlesEnabled;
}

@Override
public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
NotificationContext context) {
if (instanceConfig.get(0).getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
throttlesEnabled = false;
} else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) {
throttlesEnabled = true;
}
}
}

private MockParticipantManager createParticipant(String participantName) throws Exception {
// start dummy participants
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, null);
StateMachineEngine stateMachine = participant.getStateMachineEngine();
// Using a delayed state model
StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
stateMachine.registerStateModelFactory("MasterSlave", delayFactory);
return participant;
}

private void addParticipant(String participantName) throws Exception {
addParticipant(participantName, UUID.randomUUID().toString(),
"zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
}

private void addParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity)
throws Exception {
addParticipant(participantName, logicalId, zone, instanceOperation, enabled, capacity, null);
}

private void addParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity,
InstanceConfigChangeListener listener) throws Exception {
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
Expand All @@ -1451,16 +1498,17 @@ private void addParticipant(String participantName, String logicalId, String zon
MockParticipantManager participant = createParticipant(participantName);

participant.syncStart();
if (listener != null) {
participant.addListener(listener,
new PropertyKey.Builder(CLUSTER_NAME).instanceConfig(participantName),
HelixConstants.ChangeType.INSTANCE_CONFIG,
new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged});
}
_participants.add(participant);
_participantNames.add(participantName);
_nextStartPort++;
}

private void addParticipant(String participantName) {
addParticipant(participantName, UUID.randomUUID().toString(),
"zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
}

private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -56,7 +57,7 @@
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;

import org.apache.zookeeper.Watcher;

public class MockManager implements HelixManager {
MockAccessor accessor;
Expand Down Expand Up @@ -328,6 +329,11 @@ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
return null;
}

@Override
public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventType) {
}

@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskConstants;
import org.apache.zookeeper.Watcher;
import org.testng.collections.Lists;

public class MockZKHelixManager implements HelixManager {
Expand Down Expand Up @@ -324,6 +326,11 @@ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
(ZkBaseDataAccessor<ZNRecord>) _accessor.getBaseDataAccessor(), TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.<String>newArrayList());
}

@Override
public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventType) {
}

@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
throws Exception {
Expand Down

0 comments on commit 6bdc888

Please sign in to comment.