diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index a869a904ef..69fec9b2ca 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -209,13 +209,15 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider // Get the set of active logical ids. Set activeLogicalIds = activeInstances.stream().map( - instanceName -> assignableInstanceConfigMap.get(instanceName) + instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName, + new InstanceConfig(instanceName)) .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet()); Set assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet(); Set assignableLiveInstanceLogicalIds = assignableLiveInstanceNames.stream().map( - instanceName -> assignableInstanceConfigMap.get(instanceName) + instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName, + new InstanceConfig(instanceName)) .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet()); // Generate replica objects for all the resource partitions. diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index 727bd8df9a..8872e9edac 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -23,8 +23,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; @@ -34,9 +36,11 @@ import org.apache.helix.common.caches.CurrentStateSnapshot; import org.apache.helix.common.caches.CustomizedViewCache; import org.apache.helix.common.caches.TargetExternalViewCache; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.model.CurrentState; import org.apache.helix.model.CustomizedView; import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +51,10 @@ class RoutingDataCache extends BasicClusterDataCache { private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName()); + // When an instance has any of these instance operations, it should not be routable. + private static final ImmutableSet NON_ROUTABLE_INSTANCE_OPERATIONS = + ImmutableSet.of(InstanceConstants.InstanceOperation.SWAP_IN.name()); + private final Map> _sourceDataTypeMap; private CurrentStateCache _currentStateCache; @@ -54,6 +62,8 @@ class RoutingDataCache extends BasicClusterDataCache { // propertyCache, this hardcoded list of fields won't be necessary. private Map _customizedViewCaches; private TargetExternalViewCache _targetExternalViewCache; + private Map _routableLiveInstanceMap; + private Map _routableInstanceConfigMap; public RoutingDataCache(String clusterName, PropertyType sourceDataType) { this (clusterName, ImmutableMap.of(sourceDataType, Collections.emptyList())); @@ -73,6 +83,8 @@ public RoutingDataCache(String clusterName, Map> sour .forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType, new CustomizedViewCache(clusterName, customizedStateType))); _targetExternalViewCache = new TargetExternalViewCache(clusterName); + _routableInstanceConfigMap = new HashMap<>(); + _routableLiveInstanceMap = new HashMap<>(); requireFullRefresh(); } @@ -88,7 +100,26 @@ public synchronized void refresh(HelixDataAccessor accessor) { LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName); long startTime = System.currentTimeMillis(); + // Store whether a refresh for routable instances is necessary, as the super.refresh() call will + // set the _propertyDataChangedMap values for the instance config and live instance change types to false. + boolean refreshRoutableInstanceConfigs = + _propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false); + // If there is an InstanceConfig change, update the routable instance configs and live instances. + // Must also do live instances because whether and instance is routable is based off of the instance config. + boolean refreshRoutableLiveInstances = + _propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false) + || refreshRoutableInstanceConfigs; + super.refresh(accessor); + + if (refreshRoutableInstanceConfigs) { + updateRoutableInstanceConfigMap(_instanceConfigPropertyCache.getPropertyMap()); + } + if (refreshRoutableLiveInstances) { + updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(), + _liveInstancePropertyCache.getPropertyMap()); + } + for (PropertyType propertyType : _sourceDataTypeMap.keySet()) { long start = System.currentTimeMillis(); switch (propertyType) { @@ -114,7 +145,9 @@ public synchronized void refresh(HelixDataAccessor accessor) { * TODO: logic. **/ _liveInstancePropertyCache.refresh(accessor); - Map liveInstanceMap = getLiveInstances(); + updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(), + _liveInstancePropertyCache.getPropertyMap()); + Map liveInstanceMap = getRoutableLiveInstances(); _currentStateCache.refresh(accessor, liveInstanceMap); LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms"); } @@ -150,6 +183,41 @@ public synchronized void refresh(HelixDataAccessor accessor) { } } + private void updateRoutableInstanceConfigMap(Map instanceConfigMap) { + _routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter( + (instanceConfigEntry) -> !NON_ROUTABLE_INSTANCE_OPERATIONS.contains( + instanceConfigEntry.getValue().getInstanceOperation())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private void updateRoutableLiveInstanceMap(Map instanceConfigMap, + Map liveInstanceMap) { + _routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter( + (liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey()) + && !NON_ROUTABLE_INSTANCE_OPERATIONS.contains( + instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * Returns the LiveInstances for each of the routable instances that are currently up and + * running. + * + * @return a map of LiveInstances + */ + public Map getRoutableLiveInstances() { + return Collections.unmodifiableMap(_routableLiveInstanceMap); + } + + /** + * Returns the instance config map for all the routable instances that are in the cluster. + * + * @return a map of InstanceConfigs + */ + public Map getRoutableInstanceConfigMap() { + return Collections.unmodifiableMap(_routableInstanceConfigMap); + } + /** * Retrieves the TargetExternalView for all resources * diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 0d97c9fec3..c27f084627 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -923,14 +923,16 @@ protected void handleEvent(ClusterEvent event) { case EXTERNALVIEW: { String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); refreshExternalView(_dataCache.getExternalViews().values(), - _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); } break; case TARGETEXTERNALVIEW: { String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE); refreshExternalView(_dataCache.getTargetExternalViews().values(), - _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); } break; @@ -938,13 +940,15 @@ protected void handleEvent(ClusterEvent event) { for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) { String keyReference = generateReferenceKey(propertyType.name(), customizedStateType); refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(), - _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), keyReference); + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); } break; case CURRENTSTATES: { String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);; - refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(), - _dataCache.getLiveInstances().values(), keyReference); + refreshCurrentState(_dataCache.getCurrentStatesMap(), + _dataCache.getRoutableInstanceConfigMap().values(), + _dataCache.getRoutableLiveInstances().values(), keyReference); recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot()); } break; diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index b7c90d8412..3f0aa5d9ec 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -1,6 +1,8 @@ package org.apache.helix.integration.rebalancer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -16,8 +18,12 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; import org.apache.helix.HelixRollbackException; +import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; @@ -41,10 +47,12 @@ import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelInfo; import org.apache.helix.participant.statemachine.Transition; +import org.apache.helix.spectator.RoutingTableProvider; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -70,6 +78,10 @@ public class TestInstanceOperation extends ZkTestBase { ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY"); private int REPLICA = 3; protected ClusterControllerManager _controller; + private HelixManager _spectator; + private RoutingTableProvider _routingTableProviderDefault; + private RoutingTableProvider _routingTableProviderEV; + private RoutingTableProvider _routingTableProviderCS; List _participants = new ArrayList<>(); private List _originalParticipantNames = new ArrayList<>(); List _participantNames = new ArrayList<>(); @@ -113,6 +125,15 @@ public void beforeClass() throws Exception { _configAccessor = new ConfigAccessor(_gZkClient); _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + // start spectator + _spectator = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, + ZK_ADDR); + _spectator.connect(); + _routingTableProviderDefault = new RoutingTableProvider(_spectator); + _routingTableProviderEV = new RoutingTableProvider(_spectator, PropertyType.EXTERNALVIEW); + _routingTableProviderCS = new RoutingTableProvider(_spectator, PropertyType.CURRENTSTATES); + setupClusterConfig(); createTestDBs(DEFAULT_RESOURCE_DELAY_TIME); @@ -122,6 +143,18 @@ public void beforeClass() throws Exception { _admin = new ZKHelixAdmin(_gZkClient); } + @AfterClass + public void afterClass() { + for (MockParticipantManager p : _participants) { + p.syncStop(); + } + _controller.syncStop(); + _routingTableProviderDefault.shutdown(); + _routingTableProviderEV.shutdown(); + _routingTableProviderCS.shutdown(); + _spectator.disconnect(); + } + private void setupClusterConfig() { _stateModelDelay = 3L; ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); @@ -696,12 +729,21 @@ public void testNodeSwap() throws Exception { // Assert canSwapBeCompleted is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Assert completeSwapIfPossible is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // Validate that the SWAP_IN instance is now in the routing tables. + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); @@ -760,6 +802,10 @@ public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Set.of(instanceToSwapInName), Collections.emptySet()); + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Assert canSwapBeCompleted is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); @@ -821,6 +867,10 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Set.of(instanceToSwapInName), Collections.emptySet()); + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Assert canSwapBeCompleted is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); @@ -832,6 +882,10 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { // Wait for cluster to converge. Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Validate there are no partitions on the SWAP_IN instance. Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0); @@ -905,6 +959,10 @@ public void testNodeSwapAfterEMM() throws Exception { validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Set.of(instanceToSwapInName), Collections.emptySet()); + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Assert canSwapBeCompleted is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); @@ -914,6 +972,9 @@ public void testNodeSwapAfterEMM() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // Validate that the SWAP_IN instance is now in the routing tables. + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); @@ -1116,6 +1177,10 @@ public void testNodeSwapAddSwapInFirst() { validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Set.of(instanceToSwapInName), Collections.emptySet()); + // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. + validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); + // Assert canSwapBeCompleted is true Assert.assertTrue(_gSetupTool.getClusterManagementTool() .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); @@ -1125,6 +1190,9 @@ public void testNodeSwapAddSwapInFirst() { Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // Validate that the SWAP_IN instance is now in the routing tables. + validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); @@ -1246,6 +1314,47 @@ private Map getPartitionsAndStatesOnInstance(Map> getResourcePartitionStateOnInstance( + Map evs, String instanceName) { + Map> stateByPartitionByResource = new HashMap<>(); + for (String resourceEV : evs.keySet()) { + for (String partition : evs.get(resourceEV).getPartitionSet()) { + if (evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) { + if (!stateByPartitionByResource.containsKey(resourceEV)) { + stateByPartitionByResource.put(resourceEV, new HashMap<>()); + } + stateByPartitionByResource.get(resourceEV) + .put(partition, evs.get(resourceEV).getStateMap(partition).get(instanceName)); + } + } + } + + return stateByPartitionByResource; + } + + private Set getInstanceNames(Collection instanceConfigs) { + return instanceConfigs.stream().map(InstanceConfig::getInstanceName) + .collect(Collectors.toSet()); + } + + private void validateRoutingTablesInstance(Map evs, String instanceName, + boolean shouldContain) { + RoutingTableProvider[] routingTableProviders = + new RoutingTableProvider[]{_routingTableProviderDefault, _routingTableProviderEV, _routingTableProviderCS}; + getResourcePartitionStateOnInstance(evs, instanceName).forEach((resource, partitions) -> { + partitions.forEach((partition, state) -> { + Arrays.stream(routingTableProviders).forEach(rtp -> Assert.assertEquals( + getInstanceNames(rtp.getInstancesForResource(resource, partition, state)).contains( + instanceName), shouldContain)); + }); + }); + + Arrays.stream(routingTableProviders).forEach(rtp -> { + Assert.assertEquals(getInstanceNames(rtp.getInstanceConfigs()).contains(instanceName), + shouldContain); + }); + } + private void validateEVCorrect(ExternalView actual, ExternalView original, Map swapOutInstancesToSwapInInstances, Set inFlightSwapInInstances, Set completedSwapInInstanceNames) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java index e8f4f82b2b..cbf2998604 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java @@ -263,7 +263,7 @@ private void validatePropagationLatency(PropertyType type, final long upperBound } @Test(dependsOnMethods = "testRoutingTableWithCurrentStates") - public void TestInconsistentStateEventProcessing() throws Exception { + public void testInconsistentStateEventProcessing() throws Exception { // This test requires an additional HelixManager since one of the provider event processing will // be blocked. HelixManager helixManager = HelixManagerFactory @@ -305,10 +305,10 @@ public void TestInconsistentStateEventProcessing() throws Exception { IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); String targetPartitionName = idealState.getPartitionSet().iterator().next(); - // Wait until the routingtable is updated. + // Wait until the routing table is updated. BlockingCurrentStateRoutingTableProvider finalRoutingTableCS = routingTableCS; Assert.assertTrue(TestHelper.verify( - () -> finalRoutingTableCS.getInstances(db, targetPartitionName, "MASTER").size() > 0, + () -> !finalRoutingTableCS.getInstances(db, targetPartitionName, "MASTER").isEmpty(), 2000)); String targetNodeName = routingTableCS.getInstances(db, targetPartitionName, "MASTER").get(0).getInstanceName(); @@ -352,7 +352,7 @@ public void TestInconsistentStateEventProcessing() throws Exception { } } - @Test(dependsOnMethods = { "TestInconsistentStateEventProcessing" }) + @Test(dependsOnMethods = {"testInconsistentStateEventProcessing"}) public void testWithSupportSourceDataType() { new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown(); new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW).shutdown();