Skip to content

Commit

Permalink
Prevent the spectator routing table from containing SWAP_IN instances.(
Browse files Browse the repository at this point in the history
…#2710)

Prevent the spectator routing table from containing SWAP_IN instances.
  • Loading branch information
zpinto authored and Xiaoyuan Lu committed Dec 13, 2023
1 parent 70b05d5 commit 3775e3d
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider

// Get the set of active logical ids.
Set<String> activeLogicalIds = activeInstances.stream().map(
instanceName -> assignableInstanceConfigMap.get(instanceName)
instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
new InstanceConfig(instanceName))
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());

Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
Set<String> assignableLiveInstanceLogicalIds =
assignableLiveInstanceNames.stream().map(
instanceName -> assignableInstanceConfigMap.get(instanceName)
instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
new InstanceConfig(instanceName))
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());

// Generate replica objects for all the resource partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
Expand All @@ -34,9 +36,11 @@
import org.apache.helix.common.caches.CurrentStateSnapshot;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.TargetExternalViewCache;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,13 +51,19 @@
class RoutingDataCache extends BasicClusterDataCache {
private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());

// When an instance has any of these instance operations, it should not be routable.
private static final ImmutableSet<String> NON_ROUTABLE_INSTANCE_OPERATIONS =
ImmutableSet.of(InstanceConstants.InstanceOperation.SWAP_IN.name());

private final Map<PropertyType, List<String>> _sourceDataTypeMap;

private CurrentStateCache _currentStateCache;
// TODO: CustomizedCache needs to be migrated to propertyCache. Once we migrate all cache to
// propertyCache, this hardcoded list of fields won't be necessary.
private Map<String, CustomizedViewCache> _customizedViewCaches;
private TargetExternalViewCache _targetExternalViewCache;
private Map<String, LiveInstance> _routableLiveInstanceMap;
private Map<String, InstanceConfig> _routableInstanceConfigMap;

public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
this (clusterName, ImmutableMap.of(sourceDataType, Collections.emptyList()));
Expand All @@ -73,6 +83,8 @@ public RoutingDataCache(String clusterName, Map<PropertyType, List<String>> sour
.forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType,
new CustomizedViewCache(clusterName, customizedStateType)));
_targetExternalViewCache = new TargetExternalViewCache(clusterName);
_routableInstanceConfigMap = new HashMap<>();
_routableLiveInstanceMap = new HashMap<>();
requireFullRefresh();
}

Expand All @@ -88,7 +100,26 @@ public synchronized void refresh(HelixDataAccessor accessor) {
LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();

// Store whether a refresh for routable instances is necessary, as the super.refresh() call will
// set the _propertyDataChangedMap values for the instance config and live instance change types to false.
boolean refreshRoutableInstanceConfigs =
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
// If there is an InstanceConfig change, update the routable instance configs and live instances.
// Must also do live instances because whether and instance is routable is based off of the instance config.
boolean refreshRoutableLiveInstances =
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)
|| refreshRoutableInstanceConfigs;

super.refresh(accessor);

if (refreshRoutableInstanceConfigs) {
updateRoutableInstanceConfigMap(_instanceConfigPropertyCache.getPropertyMap());
}
if (refreshRoutableLiveInstances) {
updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
_liveInstancePropertyCache.getPropertyMap());
}

for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
long start = System.currentTimeMillis();
switch (propertyType) {
Expand All @@ -114,7 +145,9 @@ public synchronized void refresh(HelixDataAccessor accessor) {
* TODO: logic.
**/
_liveInstancePropertyCache.refresh(accessor);
Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
_liveInstancePropertyCache.getPropertyMap());
Map<String, LiveInstance> liveInstanceMap = getRoutableLiveInstances();
_currentStateCache.refresh(accessor, liveInstanceMap);
LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down Expand Up @@ -150,6 +183,41 @@ public synchronized void refresh(HelixDataAccessor accessor) {
}
}

private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
_routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
(instanceConfigEntry) -> !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
instanceConfigEntry.getValue().getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private void updateRoutableLiveInstanceMap(Map<String, InstanceConfig> instanceConfigMap,
Map<String, LiveInstance> liveInstanceMap) {
_routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
(liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey())
&& !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* Returns the LiveInstances for each of the routable instances that are currently up and
* running.
*
* @return a map of LiveInstances
*/
public Map<String, LiveInstance> getRoutableLiveInstances() {
return Collections.unmodifiableMap(_routableLiveInstanceMap);
}

/**
* Returns the instance config map for all the routable instances that are in the cluster.
*
* @return a map of InstanceConfigs
*/
public Map<String, InstanceConfig> getRoutableInstanceConfigMap() {
return Collections.unmodifiableMap(_routableInstanceConfigMap);
}

/**
* Retrieves the TargetExternalView for all resources
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,28 +923,32 @@ protected void handleEvent(ClusterEvent event) {
case EXTERNALVIEW: {
String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
refreshExternalView(_dataCache.getExternalViews().values(),
_dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(),
keyReference);
}
break;
case TARGETEXTERNALVIEW: {
String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
refreshExternalView(_dataCache.getTargetExternalViews().values(),
_dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(),
keyReference);
}
break;
case CUSTOMIZEDVIEW:
for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
String keyReference = generateReferenceKey(propertyType.name(), customizedStateType);
refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(),
_dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), keyReference);
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(), keyReference);
}
break;
case CURRENTSTATES: {
String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);;
refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
_dataCache.getLiveInstances().values(), keyReference);
refreshCurrentState(_dataCache.getCurrentStatesMap(),
_dataCache.getRoutableInstanceConfigMap().values(),
_dataCache.getRoutableLiveInstances().values(), keyReference);
recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
}
break;
Expand Down
Loading

0 comments on commit 3775e3d

Please sign in to comment.