Skip to content

Commit

Permalink
[apache/helix] -- Updated code to be build compatible with JDK-8. (ap…
Browse files Browse the repository at this point in the history
…ache#2650)

We will release codebase to be build with JDK-8. We will continue to release the Open source Helix on Java 11 and this PR is for backward compatibility.
  • Loading branch information
himanshukandwal committed Oct 13, 2023
1 parent 56b0698 commit 171435c
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -55,7 +56,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private static Map<String, List<String>> findPartitionsMissingMinActiveReplica(
Map<String, ResourceAssignment> currentAssignment) {
return currentAssignment.entrySet()
.parallelStream()
.map(e -> Map.entry(e.getKey(), findPartitionsMissingMinActiveReplica(clusterData, e.getValue())))
.map(e -> new HashMap.SimpleEntry<>(e.getKey(), findPartitionsMissingMinActiveReplica(clusterData, e.getValue())))
.filter(e -> !e.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
.getInstance(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
// These failure types should be propagated to caller of computeNewIdealStates()
private static final List<HelixRebalanceException.Type> FAILURE_TYPES_TO_PROPAGATE =
List.of(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, HelixRebalanceException.Type.UNKNOWN_FAILURE);
ImmutableList.of(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, HelixRebalanceException.Type.UNKNOWN_FAILURE);

private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebala
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
optimalAssignment);
// stop immediately if any replica cannot find best assignable node
if (maybeBestNode.isEmpty() || optimalAssignment.hasAnyFailure()) {
if (!maybeBestNode.isPresent() || optimalAssignment.hasAnyFailure()) {
String errorMessage = String.format(
"Unable to find any available candidate node for partition %s; Fail reasons: %s",
replica.getPartitionName(), optimalAssignment.getFailures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private static Map<String, Set<AssignableReplica>> getAllAssignableReplicas(
}
}
}
return Map.entry(resourceName, replicas);
return new HashMap.SimpleEntry<>(resourceName, replicas);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -737,7 +738,7 @@ public void testRebalanceOverwrite() throws HelixRebalanceException, IOException
instances.add(offlineInstance);
when(clusterData.getAllInstances()).thenReturn(instances);
when(clusterData.getEnabledInstances()).thenReturn(instances);
when(clusterData.getEnabledLiveInstances()).thenReturn(Set.of(instance0, instance1, instance2));
when(clusterData.getEnabledLiveInstances()).thenReturn(ImmutableSet.of(instance0, instance1, instance2));
Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
Expand Down Expand Up @@ -862,10 +863,10 @@ private void validateRebalanceResult(Map<String, Resource> resourceMap,
public void testResourceWeightProvider() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
WagedResourceWeightsProvider dataProvider = new WagedResourceWeightsProvider(testCache);
Map<String, Integer> weights1 = Map.of("item1", 3, "item2", 6, "item3", 0);
Map<String, Integer> weights1 = ImmutableMap.of("item1", 3, "item2", 6, "item3", 0);
Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition1"), weights1);
Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition2"), weights1);
Map<String, Integer> weights2 = Map.of("item1", 5, "item2", 10, "item3", 0);
Map<String, Integer> weights2 = ImmutableMap.of("item1", 5, "item2", 10, "item3", 0);
Assert.assertEquals(dataProvider.getPartitionWeights("Resource2", "Partition2"), weights2);
}

Expand Down Expand Up @@ -898,7 +899,7 @@ public void testInstanceCapacityProvider() throws IOException, HelixRebalanceExc
}));
WagedInstanceCapacity provider = new WagedInstanceCapacity(clusterData);

Map<String, Integer> weights1 = Map.of("item1", 20, "item2", 40, "item3", 30);
Map<String, Integer> weights1 = ImmutableMap.of("item1", 20, "item2", 40, "item3", 30);
Map<String, Integer> capacity = provider.getInstanceAvailableCapacity("testInstanceId");
Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId"), weights1);
Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId1"), weights1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -50,9 +52,9 @@ public void testEmptyCS() {
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
ClusterConfig clsCfg = dataCache.getClusterConfig();
clsCfg.setInstanceCapacityKeys(List.of("s1", "s2", "s3"));
clsCfg.setInstanceCapacityKeys(ImmutableList.of("s1", "s2", "s3"));
dataCache.setClusterConfig(clsCfg);
dataCache.setInstanceConfigMap(Map.of(
dataCache.setInstanceConfigMap(ImmutableMap.of(
"a", new InstanceConfig("a")
));
runStage(event, stage);
Expand Down

0 comments on commit 171435c

Please sign in to comment.