diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 69fec9b2ca..dc825f2f78 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -213,7 +213,10 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider new InstanceConfig(instanceName)) .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet()); - Set assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet(); + // TODO: Figure out why streaming the keySet directly in rare cases causes ConcurrentModificationException + // In theory, this should not be happening since cache refresh is at beginning of the pipeline, so could be some other reason. + // For now, we just copy the keySet to a new HashSet to avoid the exception. + Set assignableLiveInstanceNames = new HashSet<>(dataProvider.getAssignableLiveInstances().keySet()); Set assignableLiveInstanceLogicalIds = assignableLiveInstanceNames.stream().map( instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName, diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 018a4af71d..7cd08d86fa 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -147,6 +147,13 @@ public void beforeClass() throws Exception { @AfterClass public void afterClass() { + // Drop all DBs + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (MockParticipantManager p : _participants) { p.syncStop(); } @@ -210,16 +217,9 @@ private void resetInstances() { for (int i = 0; i < _participants.size(); i++) { // If instance is not connected to ZK, replace it if (!_participants.get(i).isConnected()) { - // Drop bad instance from the cluster. - InstanceConfig toDropInstanceConfig = _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i)); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participantNames.get(i), false); - _gSetupTool.getClusterManagementTool() - .dropInstance(CLUSTER_NAME, toDropInstanceConfig); - _participants.set(i, - createParticipant(_participantNames.get(i), toDropInstanceConfig.getLogicalId(LOGICAL_ID), - toDropInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1)); + // Replace the stopped participant with a new one and inherit the old instance config. + _participants.set(i, createParticipant(_participantNames.get(i))); _participants.get(i).syncStart(); - continue; } _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null); @@ -1210,17 +1210,7 @@ public void testNodeSwapAddSwapInFirst() { Collections.emptySet(), Set.of(instanceToSwapInName)); } - private MockParticipantManager createParticipant(String participantName, String logicalId, String zone, - InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { - InstanceConfig config = new InstanceConfig.Builder().setDomain( - String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, - logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) - .build(participantName); - if (capacity >= 0) { - config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity)); - } - _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); - + private MockParticipantManager createParticipant(String participantName) { // start dummy participants MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); StateMachineEngine stateMachine = participant.getStateMachineEngine(); @@ -1232,8 +1222,17 @@ private MockParticipantManager createParticipant(String participantName, String private void addParticipant(String participantName, String logicalId, String zone, InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { - MockParticipantManager participant = createParticipant(participantName, logicalId, zone, - instanceOperation, enabled, capacity); + InstanceConfig config = new InstanceConfig.Builder().setDomain( + String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, + logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) + .build(participantName); + + if (capacity >= 0) { + config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity)); + } + _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); + + MockParticipantManager participant = createParticipant(participantName); participant.syncStart(); _participants.add(participant);