From 6bdc888042daefa7e75d93df44270db6d4a26849 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Mon, 26 Feb 2024 17:47:22 -0800 Subject: [PATCH] Expose addListener and support listening on single instance config (#2752) Expose addListener method in HelixManager to allow users to register any listeners that implement one of the Listener interfaces. Add support in CallbackHandler for InstanceConfigChangeListener to be used on a single instance config. --- .../java/org/apache/helix/HelixManager.java | 14 +++- .../helix/manager/zk/CallbackHandler.java | 10 ++- .../helix/manager/zk/ZKHelixManager.java | 3 +- .../MockCloudEventAwareHelixManager.java | 7 ++ .../stages/DummyClusterManager.java | 7 ++ .../rebalancer/TestInstanceOperation.java | 74 +++++++++++++++---- .../org/apache/helix/mock/MockManager.java | 8 +- .../helix/participant/MockZKHelixManager.java | 7 ++ 8 files changed, 113 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index b5c2a64481..c1d2ad18c5 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -51,7 +51,7 @@ import org.apache.helix.spectator.RoutingTableProvider; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; - +import org.apache.zookeeper.Watcher; /** * Class that represents the Helix Agent. @@ -108,6 +108,18 @@ public interface HelixManager { */ void disconnect(); + /** + * Add a change listener on the specified propertyKey for the specified + * changeType and eventTypes. + * @see org.apache.helix.api.listeners for the list of available listeners + * @param listener the listener to add + * @param propertyKey the property key to listen to + * @param changeType the type of change to listen to + * @param eventType the event type to listen for + */ + void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType, + Watcher.Event.EventType[] eventType); + /** * @see IdealStateChangeListener#onIdealStateChange(List, NotificationContext) * @param listener diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 2a19a7a4e9..c2809620e3 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -358,7 +358,15 @@ public void invoke(NotificationContext changeContext) throws Exception { configChangeListener.onConfigChange(configs, changeContext); } else if (_listener instanceof InstanceConfigChangeListener) { InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener; - List configs = preFetch(_propertyKey); + List configs = Collections.emptyList(); + if (_propertyKey.getParams().length > 2 && _preFetchEnabled) { + // If there are more than 2 params, that means the property key is for a specific instance + // and will not have children. + InstanceConfig config = _accessor.getProperty(_propertyKey); + configs = config != null ? Collections.singletonList(config) : Collections.emptyList(); + } else { + configs = preFetch(_propertyKey); + } listener.onInstanceConfigChange(configs, changeContext); } } else if (_changeType == RESOURCE_CONFIG) { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 7a62d19230..3473bed08a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -419,7 +419,8 @@ void checkConnected(long timeout) { } } - void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, + @Override + public void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType) { checkConnected(_waitForConnectedTimeout); diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java index de46257d39..740c932fc2 100644 --- a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java @@ -26,6 +26,7 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixCloudProperty; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; @@ -59,6 +60,7 @@ import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.zookeeper.Watcher; public class MockCloudEventAwareHelixManager implements HelixManager { private final HelixManagerProperty _helixManagerProperty; @@ -110,6 +112,11 @@ public boolean isConnected() { return false; } + @Override + public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType, + Watcher.Event.EventType[] eventType) { + } + @Override public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception { diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java index 5d23a321cd..4903a3d8bd 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java @@ -24,6 +24,7 @@ import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; @@ -53,6 +54,7 @@ import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.zookeeper.Watcher; public class DummyClusterManager implements HelixManager { HelixDataAccessor _accessor; @@ -303,6 +305,11 @@ public ZkHelixPropertyStore getHelixPropertyStore() { return null; } + @Override + public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType, + Watcher.Event.EventType[] eventType) { + } + @Override public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) throws Exception { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 2767577415..010153e64e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; @@ -25,8 +26,10 @@ import org.apache.helix.HelixRollbackException; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; +import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; @@ -54,6 +57,7 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -546,12 +550,16 @@ public void testNodeSwap() throws Exception { validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Collections.emptySet(), Collections.emptySet()); + CustomIndividualInstanceConfigChangeListener instanceToSwapInInstanceConfigListener = + new CustomIndividualInstanceConfigChangeListener(); // Add instance with InstanceOperation set to SWAP_IN String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), - InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + InstanceConstants.InstanceOperation.SWAP_IN, true, -1, instanceToSwapInInstanceConfigListener); + + Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled()); // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT // and adding the SWAP_IN instance to the cluster. @@ -583,6 +591,9 @@ public void testNodeSwap() throws Exception { Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + // Check to make sure the throttle was enabled again after the swap was completed. + Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled()); + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before // swap was completed. verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, @@ -1095,7 +1106,7 @@ public void testSwapEvacuateAdd() throws Exception { } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWithSwapOutInstanceOffline") - public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() { + public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws Exception { System.out.println( "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at " + new Date(System.currentTimeMillis())); @@ -1113,7 +1124,7 @@ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() { } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet") - public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() { + public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws Exception { System.out.println( "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at " + new Date(System.currentTimeMillis())); @@ -1136,7 +1147,7 @@ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() { } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet") - public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() { + public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() throws Exception { System.out.println( "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at " + new Date(System.currentTimeMillis())); @@ -1380,7 +1391,7 @@ public void testEvacuationWithOfflineInstancesInCluster() throws Exception { } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster") - public void testSwapEvacuateAddRemoveEvacuate() { + public void testSwapEvacuateAddRemoveEvacuate() throws Exception { System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date( System.currentTimeMillis())); removeOfflineOrDisabledOrSwapInInstances(); @@ -1426,9 +1437,33 @@ private static void verifier(TestHelper.Verifier verifier, long timeout) throws }, timeout)); } - private MockParticipantManager createParticipant(String participantName) { + private static class CustomIndividualInstanceConfigChangeListener implements InstanceConfigChangeListener { + private boolean throttlesEnabled; + + public CustomIndividualInstanceConfigChangeListener() { + throttlesEnabled = true; + } + + public boolean isThrottlesEnabled() { + return throttlesEnabled; + } + + @Override + public void onInstanceConfigChange(List instanceConfig, + NotificationContext context) { + if (instanceConfig.get(0).getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { + throttlesEnabled = false; + } else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) { + throttlesEnabled = true; + } + } + } + + private MockParticipantManager createParticipant(String participantName) throws Exception { // start dummy participants - MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, null); StateMachineEngine stateMachine = participant.getStateMachineEngine(); // Using a delayed state model StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory(); @@ -1436,8 +1471,20 @@ private MockParticipantManager createParticipant(String participantName) { return participant; } + private void addParticipant(String participantName) throws Exception { + addParticipant(participantName, UUID.randomUUID().toString(), + "zone_" + _participants.size() % ZONE_COUNT, null, true, -1); + } + private void addParticipant(String participantName, String logicalId, String zone, - InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { + InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) + throws Exception { + addParticipant(participantName, logicalId, zone, instanceOperation, enabled, capacity, null); + } + + private void addParticipant(String participantName, String logicalId, String zone, + InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity, + InstanceConfigChangeListener listener) throws Exception { InstanceConfig config = new InstanceConfig.Builder().setDomain( String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) @@ -1451,16 +1498,17 @@ private void addParticipant(String participantName, String logicalId, String zon MockParticipantManager participant = createParticipant(participantName); participant.syncStart(); + if (listener != null) { + participant.addListener(listener, + new PropertyKey.Builder(CLUSTER_NAME).instanceConfig(participantName), + HelixConstants.ChangeType.INSTANCE_CONFIG, + new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged}); + } _participants.add(participant); _participantNames.add(participantName); _nextStartPort++; } - private void addParticipant(String participantName) { - addParticipant(participantName, UUID.randomUUID().toString(), - "zone_" + _participants.size() % ZONE_COUNT, null, true, -1); - } - private void createTestDBs(long delayTime) throws InterruptedException { createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED", BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1, diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java index afa8d98807..b2d474a52e 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java @@ -25,6 +25,7 @@ import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; @@ -56,7 +57,7 @@ import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.ZkHelixPropertyStore; - +import org.apache.zookeeper.Watcher; public class MockManager implements HelixManager { MockAccessor accessor; @@ -328,6 +329,11 @@ public ZkHelixPropertyStore getHelixPropertyStore() { return null; } + @Override + public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType, + Watcher.Event.EventType[] eventType) { + } + @Override public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) throws Exception { diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java index 897d99dac4..9972165185 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java @@ -25,6 +25,7 @@ import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; @@ -58,6 +59,7 @@ import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.task.TaskConstants; +import org.apache.zookeeper.Watcher; import org.testng.collections.Lists; public class MockZKHelixManager implements HelixManager { @@ -324,6 +326,11 @@ public ZkHelixPropertyStore getHelixPropertyStore() { (ZkBaseDataAccessor) _accessor.getBaseDataAccessor(), TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.newArrayList()); } + @Override + public void addListener(Object listener, PropertyKey propertyKey, HelixConstants.ChangeType changeType, + Watcher.Event.EventType[] eventType) { + } + @Override public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) throws Exception {