Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State assignment for SWAP_IN replicas to only assign top state if state model has "R" or "N" #2706

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ public static int getStateCount(String state, StateModelDefinition stateModelDef
int preferenceListSize) {
String num = stateModelDef.getNumInstancesPerState(state);
int stateCount = -1;
if ("N".equals(num)) {
if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCount = liveAndEnabledSize;
} else if ("R".equals(num)) {
} else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
stateCount = preferenceListSize;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
Expand Down Expand Up @@ -150,8 +149,13 @@ private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourc

commonInstances.forEach(swapOutInstance -> {
if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef,
stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) {

String topStateCount =
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
if (topStateCount.equals(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
|| topStateCount.equals(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
// If the swap-out instance's replica is a topState and the StateModel allows for
// another replica with the topState to be added, set the swap-in instance's replica
// to the topState.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateMo
for (String state : statePriorityList) {
String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(numInstancesPerState)) {
max = cache.getLiveInstances().size();
} else if ("R".equals(numInstancesPerState)) {
} else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
//if (idealState != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private static StateModelDefinition defineStateModel() {
builder.upperBound(LEADER, 1);
// dynamic constraint, R means it should be derived based on the replication
// factor.
builder.dynamicUpperBound(STANDBY, "R");
builder.dynamicUpperBound(STANDBY, StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

StateModelDefinition statemodelDefinition = builder.build();
return statemodelDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,13 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
throw new HelixException("Invalid or unsupported state model definition");
}
masterStateValue = state;
} else if (count.equalsIgnoreCase("R")) {
} else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model definition");
}
slaveStateValue = state;
} else if (count.equalsIgnoreCase("N")) {
} else if (count.equalsIgnoreCase(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model definition");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static StateModelDefinition build() {

// bounds
builder.upperBound(States.LEADER.name(), 1);
builder.dynamicUpperBound(States.STANDBY.name(), "R");
builder.dynamicUpperBound(States.STANDBY.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand Down Expand Up @@ -97,7 +98,7 @@ public static ZNRecord generateConfigForLeaderStandby() {
record.setMapField(key, metadata);
}
if (state.equals("STANDBY")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public static StateModelDefinition build() {

// bounds
builder.upperBound(States.MASTER.name(), 1);
builder.dynamicUpperBound(States.SLAVE.name(), "R");
builder.dynamicUpperBound(States.SLAVE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand Down Expand Up @@ -98,7 +99,7 @@ public static ZNRecord generateConfigForMasterSlave() {
metadata.put("count", "1");
record.setMapField(key, metadata);
} else if (state.equals("SLAVE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(), "R");
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand All @@ -87,7 +88,7 @@ public static ZNRecord generateConfigForOnlineOffline() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public static OnlineOfflineWithBootstrapSMD build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(), "R");
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public enum StateModelDefinitionProperty {
}

public static final int TOP_STATE_PRIORITY = 1;
public static final String STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES = "N";
public static final String STATE_REPLICA_COUNT_ALL_REPLICAS = "R";

/**
* state model's initial state
Expand Down Expand Up @@ -200,7 +202,7 @@ public String getInitialState() {
/**
* Number of instances that can be in each state
* @param state the state name
* @return maximum instance count per state, can be "N" or "R"
* @return maximum instance count per state, can be STATE_REPLICA_COUNT_ALL_NODES or STATE_REPLICA_COUNT_ALL_REPLICAS
*/
public String getNumInstancesPerState(String state) {
return _statesCountMap.get(state);
Expand Down Expand Up @@ -449,11 +451,11 @@ public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int
if (candidateNodeNum <= 0) {
break;
}
if ("N".equals(num)) {
if (STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCountMap.put(state, candidateNodeNum);
replicas -= candidateNodeNum;
break;
} else if ("R".equals(num)) {
} else if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
// wait until we get the counts for all other states
continue;
} else {
Expand All @@ -475,7 +477,7 @@ public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int
// get state count for R
for (String state : statesPriorityList) {
String num = getNumInstancesPerState(state);
if ("R".equals(num)) {
if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
if (candidateNodeNum > 0 && replicas > 0) {
stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.MASTER.name(), "N");
builder.dynamicUpperBound(States.MASTER.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);

return builder.build();
}
Expand All @@ -88,7 +89,7 @@ public static ZNRecord generateConfigForStorageSchemata() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("MASTER")) {
metadata.put("count", "N");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private boolean areStateCountsValid() {
try {
Integer.parseInt(count);
} catch (NumberFormatException e) {
if (!count.equals("N") && !count.equals("R")) {
if (!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
&& !count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
_logger.error("State " + state + " has invalid count " + count + ", state model: "
+ _stateModelDef.getId());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ public static String[] parseStates(String clusterName, StateModelDefinition stat
throw new HelixException("Invalid or unsupported state model definition");
}
masterStateValue = state;
} else if (count.equalsIgnoreCase("R")) {
} else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model definition");
}
slaveStateValue = state;
} else if (count.equalsIgnoreCase("N")) {
} else if (count.equalsIgnoreCase(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model definition");
}
Expand Down
2 changes: 1 addition & 1 deletion helix-core/src/test/java/org/apache/helix/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ public static StateModelDefinition generateStateModelDefForBootstrap() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("BOOTSTRAP")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private static StateModelDefinition defineStateModel() {
// static constraint
builder.upperBound("MASTER", 1);
// dynamic constraint, R means it should be derived based on the replication factor.
builder.dynamicUpperBound("SLAVE", "R");
builder.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

StateModelDefinition statemodelDefinition = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ private StateModelDefinition createReprioritizedStateModelDef(String stateModelN
.addState("ONLINE", 1).addState("OFFLINE").addState("DROPPED").addState("ERROR")
.initialState("OFFLINE").addTransition("ERROR", "OFFLINE", 1)
.addTransition("ONLINE", "OFFLINE", 2).addTransition("OFFLINE", "DROPPED", 3)
.addTransition("OFFLINE", "ONLINE", 4).dynamicUpperBound("ONLINE", "R")
.addTransition("OFFLINE", "ONLINE", 4)
.dynamicUpperBound("ONLINE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)
.upperBound("OFFLINE", -1).upperBound("DROPPED", -1).upperBound("ERROR", -1);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private ZNRecord generateConfigForMasterSlave() {
record.setMapField(key, metadata);
break;
case "SLAVE":
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
break;
case "OFFLINE":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testBasic() {
.upperBound("MASTER", 1)

// R indicates an upper bound of number of replicas for each partition
.dynamicUpperBound("SLAVE", "R")
.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)

// Add some high-priority transitions
.addTransition("SLAVE", "MASTER", 1).addTransition("OFFLINE", "SLAVE", 2)
Expand Down
Loading