Skip to content

Commit

Permalink
Make logic to determine state of replicas on SWAP_IN instance simpler…
Browse files Browse the repository at this point in the history
… and more predictable during an in-flight node swap. (#2706)
  • Loading branch information
zpinto authored and Xiaoyuan Lu committed Dec 8, 2023
1 parent a24ed72 commit c650cf6
Show file tree
Hide file tree
Showing 18 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ public static int getStateCount(String state, StateModelDefinition stateModelDef
int preferenceListSize) {
String num = stateModelDef.getNumInstancesPerState(state);
int stateCount = -1;
if ("N".equals(num)) {
if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCount = liveAndEnabledSize;
} else if ("R".equals(num)) {
} else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
stateCount = preferenceListSize;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
Expand Down Expand Up @@ -150,8 +149,13 @@ private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourc

commonInstances.forEach(swapOutInstance -> {
if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef,
stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) {

String topStateCount =
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
if (topStateCount.equals(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
|| topStateCount.equals(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
// If the swap-out instance's replica is a topState and the StateModel allows for
// another replica with the topState to be added, set the swap-in instance's replica
// to the topState.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateMo
for (String state : statePriorityList) {
String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(numInstancesPerState)) {
max = cache.getLiveInstances().size();
} else if ("R".equals(numInstancesPerState)) {
} else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
//if (idealState != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private static StateModelDefinition defineStateModel() {
builder.upperBound(LEADER, 1);
// dynamic constraint, R means it should be derived based on the replication
// factor.
builder.dynamicUpperBound(STANDBY, "R");
builder.dynamicUpperBound(STANDBY, StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

StateModelDefinition statemodelDefinition = builder.build();
return statemodelDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,13 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
throw new HelixException("Invalid or unsupported state model definition");
}
masterStateValue = state;
} else if (count.equalsIgnoreCase("R")) {
} else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model definition");
}
slaveStateValue = state;
} else if (count.equalsIgnoreCase("N")) {
} else if (count.equalsIgnoreCase(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model definition");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static StateModelDefinition build() {

// bounds
builder.upperBound(States.LEADER.name(), 1);
builder.dynamicUpperBound(States.STANDBY.name(), "R");
builder.dynamicUpperBound(States.STANDBY.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand Down Expand Up @@ -97,7 +98,7 @@ public static ZNRecord generateConfigForLeaderStandby() {
record.setMapField(key, metadata);
}
if (state.equals("STANDBY")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public static StateModelDefinition build() {

// bounds
builder.upperBound(States.MASTER.name(), 1);
builder.dynamicUpperBound(States.SLAVE.name(), "R");
builder.dynamicUpperBound(States.SLAVE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand Down Expand Up @@ -98,7 +99,7 @@ public static ZNRecord generateConfigForMasterSlave() {
metadata.put("count", "1");
record.setMapField(key, metadata);
} else if (state.equals("SLAVE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(), "R");
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return builder.build();
}
Expand All @@ -87,7 +88,7 @@ public static ZNRecord generateConfigForOnlineOffline() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public static OnlineOfflineWithBootstrapSMD build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(), "R");
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public enum StateModelDefinitionProperty {
}

public static final int TOP_STATE_PRIORITY = 1;
public static final String STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES = "N";
public static final String STATE_REPLICA_COUNT_ALL_REPLICAS = "R";

/**
* state model's initial state
Expand Down Expand Up @@ -200,7 +202,7 @@ public String getInitialState() {
/**
* Number of instances that can be in each state
* @param state the state name
* @return maximum instance count per state, can be "N" or "R"
* @return maximum instance count per state, can be STATE_REPLICA_COUNT_ALL_NODES or STATE_REPLICA_COUNT_ALL_REPLICAS
*/
public String getNumInstancesPerState(String state) {
return _statesCountMap.get(state);
Expand Down Expand Up @@ -449,11 +451,11 @@ public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int
if (candidateNodeNum <= 0) {
break;
}
if ("N".equals(num)) {
if (STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCountMap.put(state, candidateNodeNum);
replicas -= candidateNodeNum;
break;
} else if ("R".equals(num)) {
} else if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
// wait until we get the counts for all other states
continue;
} else {
Expand All @@ -475,7 +477,7 @@ public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int
// get state count for R
for (String state : statesPriorityList) {
String num = getNumInstancesPerState(state);
if ("R".equals(num)) {
if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
if (candidateNodeNum > 0 && replicas > 0) {
stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.MASTER.name(), "N");
builder.dynamicUpperBound(States.MASTER.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);

return builder.build();
}
Expand All @@ -88,7 +89,7 @@ public static ZNRecord generateConfigForStorageSchemata() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("MASTER")) {
metadata.put("count", "N");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private boolean areStateCountsValid() {
try {
Integer.parseInt(count);
} catch (NumberFormatException e) {
if (!count.equals("N") && !count.equals("R")) {
if (!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
&& !count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
_logger.error("State " + state + " has invalid count " + count + ", state model: "
+ _stateModelDef.getId());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ public static String[] parseStates(String clusterName, StateModelDefinition stat
throw new HelixException("Invalid or unsupported state model definition");
}
masterStateValue = state;
} else if (count.equalsIgnoreCase("R")) {
} else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model definition");
}
slaveStateValue = state;
} else if (count.equalsIgnoreCase("N")) {
} else if (count.equalsIgnoreCase(
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model definition");
}
Expand Down
2 changes: 1 addition & 1 deletion helix-core/src/test/java/org/apache/helix/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ public static StateModelDefinition generateStateModelDefForBootstrap() {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("BOOTSTRAP")) {
metadata.put("count", "-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private static StateModelDefinition defineStateModel() {
// static constraint
builder.upperBound("MASTER", 1);
// dynamic constraint, R means it should be derived based on the replication factor.
builder.dynamicUpperBound("SLAVE", "R");
builder.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);

StateModelDefinition statemodelDefinition = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ private StateModelDefinition createReprioritizedStateModelDef(String stateModelN
.addState("ONLINE", 1).addState("OFFLINE").addState("DROPPED").addState("ERROR")
.initialState("OFFLINE").addTransition("ERROR", "OFFLINE", 1)
.addTransition("ONLINE", "OFFLINE", 2).addTransition("OFFLINE", "DROPPED", 3)
.addTransition("OFFLINE", "ONLINE", 4).dynamicUpperBound("ONLINE", "R")
.addTransition("OFFLINE", "ONLINE", 4)
.dynamicUpperBound("ONLINE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)
.upperBound("OFFLINE", -1).upperBound("DROPPED", -1).upperBound("ERROR", -1);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private ZNRecord generateConfigForMasterSlave() {
record.setMapField(key, metadata);
break;
case "SLAVE":
metadata.put("count", "R");
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
break;
case "OFFLINE":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testBasic() {
.upperBound("MASTER", 1)

// R indicates an upper bound of number of replicas for each partition
.dynamicUpperBound("SLAVE", "R")
.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)

// Add some high-priority transitions
.addTransition("SLAVE", "MASTER", 1).addTransition("OFFLINE", "SLAVE", 2)
Expand Down

0 comments on commit c650cf6

Please sign in to comment.