Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch ApplicationClusterManager #2620

Merged
merged 9 commits into from
Sep 19, 2023
2 changes: 1 addition & 1 deletion .github/workflows/Helix-PR-CI.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Helix PR CI
on:
pull_request:
branches: [ master, metaclient ] # TODO: remove side branch
branches: [ master, metaclient, ApplicationClusterManager] # TODO: remove side branch
paths-ignore:
- '.github/**'
- 'helix-front/**'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ public enum InstanceDisabledType {
USER_OPERATION,
DEFAULT_INSTANCE_DISABLE_TYPE
}

public enum InstanceOperation {
EVACUATE, // Node will be removed after a period of time
SWAP_IN, // New node joining for swap operation
SWAP_OUT // Existing Node to be removed for swap operation
}
}
26 changes: 26 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ void enableInstance(String clusterName, String instanceName, boolean enabled,
*/
void enableInstance(String clusterName, List<String> instances, boolean enabled);

void setInstanceOperation(String clusterName, String instance,
InstanceConstants.InstanceOperation instanceOperation);

/**
* Disable or enable a resource
* @param clusterName
Expand Down Expand Up @@ -550,6 +553,12 @@ CustomizedView getResourceCustomizedView(String clusterName, String resourceName
*/
void rebalance(String clusterName, String resourceName, int replica);

/**
* Rebalance a cluster without respecting the delay
* @param clusterName
*/
void onDemandRebalance(String clusterName);

/**
* Add ideal state using a json format file
* @param clusterName
Expand Down Expand Up @@ -729,4 +738,21 @@ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);

/**
* Return if instance operation 'Evacuate' is finished.
* @param clusterName
* @param instancesNames
* @return Return true if there is no current state nor pending message on the instance.
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
* @param clusterName
* @param instancesNames
* @return true if the instance is ready for preparing joining cluster.
*/
boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
Expand All @@ -39,6 +40,7 @@
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -53,6 +55,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -109,14 +112,12 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> activeNodes = liveEnabledNodes;
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);

Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
Expand Down Expand Up @@ -157,15 +158,20 @@ public IdealState computeNewIdealState(String resourceName,

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
List<String> liveEnabledNodeList = new ArrayList<>(liveEnabledNodes);
// We will not assign partition to instances with evacuation and wap-out tag.
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
Collections.sort(allNodeList);
Collections.sort(liveEnabledNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
Expand Down Expand Up @@ -194,6 +200,14 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;

import org.apache.helix.HelixManager;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
Expand Down Expand Up @@ -82,37 +83,37 @@ public static long getRebalanceDelay(IdealState idealState, ClusterConfig cluste
}

/**
* @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
* delay rebalance configurations.
* @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
* while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(clusterConfig)) {
return new HashSet<>(liveEnabledNodes);
return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
}

/**
* @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
* and the resource delay rebalance configurations.
* @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
* while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
return new HashSet<>(liveEnabledNodes);
return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, delay, clusterConfig);
}

private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap,
long delay, ClusterConfig clusterConfig) {
Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
long currentTime = System.currentTimeMillis();
Expand All @@ -125,19 +126,38 @@ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> live
activeNodes.add(ins);
}
}
return activeNodes;
// TODO: change this after merging operation and helix-enable field.
return filterOutEvacuatingInstances(instanceConfigMap, activeNodes);
}

public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
InstanceConstants.InstanceOperation.EVACUATE.name()))
.collect(Collectors.toSet());
}

/**
* @return The time when an offline or disabled instance should be treated as inactive.
* Return -1 if it is inactive now.
* Return the time when an offline or disabled instance should be treated as inactive. Return -1
* if it is inactive now or forced to be rebalanced by an on-demand rebalance.
*
* @return A timestamp that represents the expected inactive time of a node.
*/
private static long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
long inactiveTime = Long.MAX_VALUE;
long lastOnDemandRebalanceTime = clusterConfig.getLastOnDemandRebalanceTimestamp();

// check the time instance went offline.
// Check if the given instance is offline
if (!liveInstances.contains(instance)) {
// Check if the offline instance is forced to be rebalanced by an on-demand rebalance.
// If so, return it as an inactive instance.
if (isInstanceForcedToBeRebalanced(offlineTime, delay, lastOnDemandRebalanceTime)) {
return -1L;
}

// Check the time instance went offline.
if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
inactiveTime = offlineTime + delay;
}
Expand All @@ -154,6 +174,13 @@ private static long getInactiveTime(String instance, Set<String> liveInstances,
disabledTime = batchDisableTime;
}
}

// Check if the disabled instance is forced to be rebalanced by an on-demand rebalance.
// If so, return it as an inactive instance.
if (isInstanceForcedToBeRebalanced(disabledTime, delay, lastOnDemandRebalanceTime)) {
return -1L;
}

if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
inactiveTime = disabledTime + delay;
}
Expand Down Expand Up @@ -417,6 +444,33 @@ private static int getMinActiveReplica(ResourceControllerDataProvider clusterDat
currentIdealState), currentIdealState, numReplica);
}

/**
* Given the offline/disabled time, delay, and the last on-demand rebalance time, this method checks
* if the node associated with the offline/disabled time is forced to be rebalanced by the on-demand
* rebalance.
* 1. If either the last on-demand rebalance time or the offline/disabled time is unavailable, then
* the node is not forced to be rebalanced.
* 2. If the current time doesn't surpass the delayed offline/disabled time and the last on-demand
* rebalance time is after the offline/disabled time, then the node is forced to be rebalanced.
*
* @param offlineOrDisabledTime A unix timestamp indicating the most recent time when a node went
* offline or was disabled.
* @param delay The delay window configuration of the current cluster
* @param lastOnDemandRebalanceTime A unix timestamp representing the most recent time when an
* on-demand rebalance was triggered.
* @return A boolean indicating whether a node is forced to be rebalanced
*/
private static boolean isInstanceForcedToBeRebalanced(Long offlineOrDisabledTime, long delay,
long lastOnDemandRebalanceTime) {
if (lastOnDemandRebalanceTime == -1 || offlineOrDisabledTime == null
|| offlineOrDisabledTime <= 0 || System.currentTimeMillis() > (offlineOrDisabledTime
+ delay)) {
return false;
}

return offlineOrDisabledTime < lastOnDemandRebalanceTime;
}

/**
* For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
* @param clusterData Cluster data cache.
Expand Down
Loading
Loading