Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge feature branch - Application cluster manager #2714

Merged
merged 11 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
Expand Down Expand Up @@ -302,8 +304,15 @@ void enableInstance(String clusterName, String instanceName, boolean enabled,
*/
void enableInstance(String clusterName, List<String> instances, boolean enabled);

void setInstanceOperation(String clusterName, String instance,
InstanceConstants.InstanceOperation instanceOperation);
/**
* Set the instanceOperation field.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation
*/
void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation);

/**
* Disable or enable a resource
Expand Down Expand Up @@ -747,6 +756,26 @@ Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Check to see if swapping between two instances can be completed. Either the swapOut or
* swapIn instance can be passed in.
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed, false otherwise.
*/
boolean canCompleteSwap(String clusterName, String instanceName);

/**
* Check to see if swapping between two instances is ready to be completed and complete it if
* possible. Either the swapOut or swapIn instance can be passed in.
*
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed and was completed successfully, false
* otherwise.
*/
boolean completeSwapIfPossible(String clusterName, String instanceName);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ private void clearCachedComputation() {
HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
switch (changeType) {
case INSTANCE_CONFIG:
return snapshot.getInstanceConfigMap();
return snapshot.getAssignableInstanceConfigMap();
case IDEAL_STATE:
return snapshot.getIdealStateMap();
case RESOURCE_CONFIG:
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
return snapshot.getAssignableLiveInstances();
case CLUSTER_CONFIG:
ClusterConfig config = snapshot.getClusterConfig();
if (config == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,25 @@
class ResourceChangeSnapshot {

private Set<HelixConstants.ChangeType> _changedTypes;
private Map<String, InstanceConfig> _instanceConfigMap;
private Map<String, InstanceConfig> _allInstanceConfigMap;
private Map<String, InstanceConfig> _assignableInstanceConfigMap;
private Map<String, IdealState> _idealStateMap;
private Map<String, ResourceConfig> _resourceConfigMap;
private Map<String, LiveInstance> _liveInstances;
private Map<String, LiveInstance> _allLiveInstances;
private Map<String, LiveInstance> _assignableLiveInstances;
private ClusterConfig _clusterConfig;

/**
* Default constructor that constructs an empty snapshot.
*/
ResourceChangeSnapshot() {
_changedTypes = new HashSet<>();
_instanceConfigMap = new HashMap<>();
_allInstanceConfigMap = new HashMap<>();
_assignableInstanceConfigMap = new HashMap<>();
_idealStateMap = new HashMap<>();
_resourceConfigMap = new HashMap<>();
_liveInstances = new HashMap<>();
_allLiveInstances = new HashMap<>();
_assignableLiveInstances = new HashMap<>();
_clusterConfig = null;
}

Expand All @@ -80,12 +84,16 @@ class ResourceChangeSnapshot {
ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
boolean ignoreNonTopologyChange) {
_changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());

_instanceConfigMap = ignoreNonTopologyChange ?
_allInstanceConfigMap = ignoreNonTopologyChange ?
dataProvider.getInstanceConfigMap().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
new HashMap<>(dataProvider.getInstanceConfigMap());
_assignableInstanceConfigMap = ignoreNonTopologyChange ?
dataProvider.getAssignableInstanceConfigMap().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
new HashMap<>(dataProvider.getAssignableInstanceConfigMap());
_idealStateMap = ignoreNonTopologyChange ?
dataProvider.getIdealStates().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
Expand All @@ -99,7 +107,8 @@ class ResourceChangeSnapshot {
_clusterConfig = ignoreNonTopologyChange ?
ClusterConfigTrimmer.getInstance().trimProperty(dataProvider.getClusterConfig()) :
dataProvider.getClusterConfig();
_liveInstances = new HashMap<>(dataProvider.getLiveInstances());
_allLiveInstances = new HashMap<>(dataProvider.getLiveInstances());
_assignableLiveInstances = new HashMap<>(dataProvider.getAssignableLiveInstances());
}

/**
Expand All @@ -108,10 +117,12 @@ class ResourceChangeSnapshot {
*/
ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
_changedTypes = new HashSet<>(snapshot._changedTypes);
_instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
_allInstanceConfigMap = new HashMap<>(snapshot._allInstanceConfigMap);
_assignableInstanceConfigMap = new HashMap<>(snapshot._assignableInstanceConfigMap);
_idealStateMap = new HashMap<>(snapshot._idealStateMap);
_resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
_liveInstances = new HashMap<>(snapshot._liveInstances);
_allLiveInstances = new HashMap<>(snapshot._allLiveInstances);
_assignableLiveInstances = new HashMap<>(snapshot._assignableLiveInstances);
_clusterConfig = snapshot._clusterConfig;
}

Expand All @@ -120,7 +131,11 @@ Set<HelixConstants.ChangeType> getChangedTypes() {
}

Map<String, InstanceConfig> getInstanceConfigMap() {
return _instanceConfigMap;
return _allInstanceConfigMap;
}

Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
return _assignableInstanceConfigMap;
}

Map<String, IdealState> getIdealStateMap() {
Expand All @@ -132,7 +147,11 @@ Map<String, ResourceConfig> getResourceConfigMap() {
}

Map<String, LiveInstance> getLiveInstances() {
return _liveInstances;
return _allLiveInstances;
}

Map<String, LiveInstance> getAssignableLiveInstances() {
return _assignableLiveInstances;
}

ClusterConfig getClusterConfig() {
Expand Down
Loading
Loading