diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 7cd08d86fa..bf6db29008 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -52,6 +52,8 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,9 +61,12 @@ public class TestInstanceOperation extends ZkTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); + public static final int TIMEOUT = 10000; private final int ZONE_COUNT = 4; - protected final int NUM_NODE = 10; + protected final int START_NUM_NODE = 10; protected static final int START_PORT = 12918; + private static int _nextStartPort = START_PORT; protected static final int PARTITIONS = 20; protected final String CLASS_NAME = getShortClassName(); @@ -85,7 +90,6 @@ public class TestInstanceOperation extends ZkTestBase { private RoutingTableProvider _routingTableProviderEV; private RoutingTableProvider _routingTableProviderCS; List _participants = new ArrayList<>(); - private List _originalParticipantNames = new ArrayList<>(); List _participantNames = new ArrayList<>(); private Set _allDBs = new HashSet<>(); private ZkHelixClusterVerifier _clusterVerifier; @@ -104,9 +108,8 @@ public void beforeClass() throws Exception { _gSetupTool.addCluster(CLUSTER_NAME, true); - for (int i = 0; i < NUM_NODE; i++) { - String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _originalParticipantNames.add(participantName); + for (int i = 0; i < START_NUM_NODE; i++) { + String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(participantName); } @@ -171,7 +174,6 @@ private void setupClusterConfig() { clusterConfig.setDelayRebalaceEnabled(true); clusterConfig.setRebalanceDelayTime(1800000L); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - enabledTopologyAwareRebalance(); Assert.assertTrue(_clusterVerifier.verifyByPolling()); } @@ -196,34 +198,23 @@ private void disableTopologyAwareRebalance() { Assert.assertTrue(_clusterVerifier.verifyByPolling()); } - private void resetInstances() { - // Disable and drop any participants that are not in the original participant list. - Set droppedParticipants = new HashSet<>(); + private void removeOfflineOrDisabledOrSwapInInstances() { + // Remove all instances that are not live, disabled, or in SWAP_IN state. for (int i = 0; i < _participants.size(); i++) { String participantName = _participantNames.get(i); - if (!_originalParticipantNames.contains(participantName)) { - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantName, false); - _participants.get(i).syncStop(); - _gSetupTool.getClusterManagementTool() - .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName)); - droppedParticipants.add(participantName); - } - } - - // Remove the dropped instance from _participants and _participantNames - _participantNames.removeIf(droppedParticipants::contains); - _participants.removeIf(p -> droppedParticipants.contains(p.getInstanceName())); - - for (int i = 0; i < _participants.size(); i++) { - // If instance is not connected to ZK, replace it - if (!_participants.get(i).isConnected()) { - // Replace the stopped participant with a new one and inherit the old instance config. - _participants.set(i, createParticipant(_participantNames.get(i))); - _participants.get(i).syncStart(); + InstanceConfig instanceConfig = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName); + if (!_participants.get(i).isConnected() || !instanceConfig.getInstanceEnabled() + || instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { + if (_participants.get(i).isConnected()) { + _participants.get(i).syncStop(); + } + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig); + _participantNames.remove(i); + _participants.remove(i); + i--; } - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participantNames.get(i), true); } Assert.assertTrue(_clusterVerifier.verifyByPolling()); @@ -328,200 +319,36 @@ public void testAddingNodeWithEvacuationTag() throws Exception { } } - @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag") - public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { - System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date(System.currentTimeMillis())); - // add a resource where downward state transition is slow - createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA, - REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName()); - _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); - // add a resource where downward state transition is slow - createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", - PARTITIONS, REPLICA, REPLICA - 1); - _allDBs.add("TEST_DB4_DELAYED_WAGED"); - // wait for assignment to finish - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - - // set bootstrap ST delay to a large number - _stateModelDelay = -10000L; - // evacuate an instance - String instanceToEvacuate = _participants.get(0).getInstanceName(); - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); - // Messages should be pending at all instances besides the evacuate one - for (String participant : _participantNames) { - if (participant.equals(instanceToEvacuate)) { - continue; - } - TestHelper.verify( - () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); - } - Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); - Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); - - // sleep a bit so ST messages can start executing - Thread.sleep(Math.abs(_stateModelDelay / 100)); - // before we cancel, check current EV - Map assignment = getEVs(); - for (String resource : _allDBs) { - // check every replica has >= 3 partitions and a top state partition - validateAssignmentInEv(assignment.get(resource)); - } - - // cancel the evacuation - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); - - assignment = getEVs(); - for (String resource : _allDBs) { - // check every replica has >= 3 active replicas, even before cluster converge - validateAssignmentInEv(assignment.get(resource)); - } - - // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST. - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - - // EV should contain all participants, check resources one by one - assignment = getEVs(); - for (String resource : _allDBs) { - Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); - // check every replica has >= 3 active replicas again - validateAssignmentInEv(assignment.get(resource)); - } - } - - @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") - public void testEvacuateAndCancelBeforeDropFinish() throws Exception { - System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new Date(System.currentTimeMillis())); - - // set DROP ST delay to a large number - _stateModelDelay = 10000L; - - // evacuate an instance - String instanceToEvacuate = _participants.get(0).getInstanceName(); - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); - - // message should be pending at the to evacuate participant - TestHelper.verify( - () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); - Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); - - // cancel evacuation - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); - // check every replica has >= 3 active replicas, even before cluster converge - Map assignment = getEVs(); - for (String resource : _allDBs) { - validateAssignmentInEv(assignment.get(resource)); - } - - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - - // EV should contain all participants, check resources one by one - assignment = getEVs(); - for (String resource : _allDBs) { - Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); - // check every replica has >= 3 active replicas - validateAssignmentInEv(assignment.get(resource)); - } - } - - @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") - public void testMarkEvacuationAfterEMM() throws Exception { - System.out.println("START TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date(System.currentTimeMillis())); - _stateModelDelay = 1000L; - Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); - _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, - null); - addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE)); - - - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - Map assignment = getEVs(); - for (String resource : _allDBs) { - Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE))); - } - - // set evacuate operation - String instanceToEvacuate = _participants.get(0).getInstanceName(); - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); - - // there should be no evacuation happening - for (String resource : _allDBs) { - Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); - } - - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - - // exit MM - _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, - null); - - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - - assignment = getEVs(); - List currentActiveInstances = - _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); - for (String resource : _allDBs) { - validateAssignmentInEv(assignment.get(resource)); - Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); - Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); - Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); - } - Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); - - _stateModelDelay = 3L; - } - - @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") - public void testEvacuationWithOfflineInstancesInCluster() throws Exception { - System.out.println("START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date(System.currentTimeMillis())); - _participants.get(1).syncStop(); - _participants.get(2).syncStop(); - - String evacuateInstanceName = _participants.get(_participants.size()-2).getInstanceName(); - _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE); + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithEvacuationTag") + public void testNodeSwapNoTopologySetup() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date( + System.currentTimeMillis())); + removeOfflineOrDisabledOrSwapInInstances(); - Map assignment; - // EV should contain all participants, check resources one by one - assignment = getEVs(); - for (String resource : _allDBs) { - TestHelper.verify(() -> { - ExternalView ev = assignment.get(resource); - for (String partition : ev.getPartitionSet()) { - AtomicInteger activeReplicaCount = new AtomicInteger(); - ev.getStateMap(partition) - .values() - .stream() - .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") - || v.equals("STANDBY")) - .forEach(v -> activeReplicaCount.getAndIncrement()); - if (activeReplicaCount.get() < REPLICA - 1 || (ev.getStateMap(partition).containsKey(evacuateInstanceName) - && ev.getStateMap(partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition) - .get(evacuateInstanceName) - .equals("LEADER"))) { - return false; - } - } - return true; - }, 30000); - } + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); - resetInstances(); - dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED")); + // Add instance with InstanceOperation set to SWAP_IN + // There should be an error that the logicalId does not have SWAP_OUT instance because, + // helix can't determine what topology key to use to get the logicalId if TOPOLOGY is not set. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); } - @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster") + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapNoTopologySetup") public void testAddingNodeWithSwapOutInstanceOperation() throws Exception { System.out.println( "START TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new Date( System.currentTimeMillis())); enabledTopologyAwareRebalance(); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to SWAP_OUT String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -531,7 +358,7 @@ public void testAddingNodeWithSwapOutInstanceOperation() throws Exception { InstanceConstants.InstanceOperation.SWAP_OUT); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), InstanceConstants.InstanceOperation.SWAP_OUT, true, -1); @@ -543,7 +370,7 @@ public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws Excepti "START TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to null String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -553,7 +380,7 @@ public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws Excepti .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), InstanceConstants.InstanceOperation.SWAP_IN, true, -1); @@ -564,10 +391,10 @@ public void testNodeSwapWithNoSwapOutNode() throws Exception { System.out.println("START TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Add new instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, "1000", "zone_1000", InstanceConstants.InstanceOperation.SWAP_IN, true, -1); } @@ -578,7 +405,7 @@ public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws Exception "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to SWAP_OUT String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -590,9 +417,14 @@ public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws Exception // Add instance with same logicalId with InstanceOperation unset // This should work because adding instance with InstanceOperation unset will automatically // set the InstanceOperation to SWAP_IN. - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1); + + Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationEnabled") @@ -601,7 +433,7 @@ public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { "START TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to SWAP_OUT String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -611,15 +443,14 @@ public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { InstanceConstants.InstanceOperation.SWAP_OUT); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), InstanceConstants.InstanceOperation.SWAP_IN, true, -1); // Add another instance with InstanceOperation set to SWAP_IN with same logicalId as previously // added SWAP_IN instance. - String secondInstanceToSwapInName = - PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String secondInstanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(secondInstanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), @@ -627,35 +458,10 @@ public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair") - public void testNodeSwapNoTopologySetup() throws Exception { - System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date( - System.currentTimeMillis())); - disableTopologyAwareRebalance(); - resetInstances(); - - // Set instance's InstanceOperation to SWAP_OUT - String instanceToSwapOutName = _participants.get(0).getInstanceName(); - _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, - InstanceConstants.InstanceOperation.SWAP_OUT); - - // Add instance with InstanceOperation set to SWAP_IN - // There should be an error that the logicalId does not have SWAP_OUT instance because, - // helix can't determine what topology key to use to get the logicalId if TOPOLOGY is not set. - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); - InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() - .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); - addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), - instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), - InstanceConstants.InstanceOperation.SWAP_IN, true, -1); - } - - @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapNoTopologySetup") public void testNodeSwapWrongFaultZone() throws Exception { System.out.println("START TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date( System.currentTimeMillis())); - // Re-enable topology aware rebalancing and set TOPOLOGY. - enabledTopologyAwareRebalance(); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to SWAP_OUT String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -664,7 +470,7 @@ public void testNodeSwapWrongFaultZone() throws Exception { // Add instance with InstanceOperation set to SWAP_IN // There should be an error because SWAP_IN instance must be in the same FAULT_ZONE as the SWAP_OUT instance. - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), @@ -676,7 +482,7 @@ public void testNodeSwapWrongFaultZone() throws Exception { public void testNodeSwapWrongCapacity() throws Exception { System.out.println("START TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Set instance's InstanceOperation to SWAP_OUT String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -685,7 +491,7 @@ public void testNodeSwapWrongCapacity() throws Exception { // Add instance with InstanceOperation set to SWAP_IN // There should be an error because SWAP_IN instance must have same capacity as the SWAP_OUT node. - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), @@ -697,7 +503,7 @@ public void testNodeSwapWrongCapacity() throws Exception { public void testNodeSwap() throws Exception { System.out.println( "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EV Map originalEVs = getEVs(); @@ -717,7 +523,7 @@ public void testNodeSwap() throws Exception { Collections.emptySet(), Collections.emptySet()); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), @@ -752,13 +558,11 @@ public void testNodeSwap() throws Exception { // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), - 0); // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before // swap was completed. - validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, - Collections.emptySet(), Set.of(instanceToSwapInName)); + verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT); } @Test(dependsOnMethods = "testNodeSwap") @@ -767,7 +571,7 @@ public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EVs Map originalEVs = getEVs(); @@ -787,7 +591,7 @@ public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception Collections.emptySet(), Collections.emptySet()); // Add instance with InstanceOperation unset, should automatically be set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); @@ -823,13 +627,11 @@ public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), - 0); // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before // swap was completed. - validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, - Collections.emptySet(), Set.of(instanceToSwapInName)); + verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT); } @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled") @@ -838,7 +640,7 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { "START TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EVs Map originalEVs = getEVs(); @@ -853,12 +655,12 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { InstanceConstants.InstanceOperation.SWAP_OUT); // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT - Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Collections.emptySet(), Collections.emptySet()); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), @@ -903,12 +705,9 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); - // Validate there are no partitions on the SWAP_IN instance. - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0); - // Validate that the SWAP_OUT instance has the same partitions as it had before. - validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, - Collections.emptySet(), Collections.emptySet()); + verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet())), TIMEOUT); } @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete") @@ -916,7 +715,7 @@ public void testNodeSwapAfterEMM() throws Exception { System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EVs Map originalEVs = getEVs(); @@ -940,7 +739,7 @@ public void testNodeSwapAfterEMM() throws Exception { Collections.emptySet(), Collections.emptySet()); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), @@ -983,13 +782,11 @@ public void testNodeSwapAfterEMM() throws Exception { // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), - 0); // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before // swap was completed. - validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, - Collections.emptySet(), Set.of(instanceToSwapInName)); + verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT); } @Test(dependsOnMethods = "testNodeSwapAfterEMM") @@ -998,7 +795,7 @@ public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception { "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EVs Map originalEVs = getEVs(); @@ -1026,7 +823,7 @@ public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception { Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE")); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), InstanceConstants.InstanceOperation.SWAP_IN, true, -1); @@ -1067,8 +864,10 @@ public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception { // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), - 0); + + verifier( + () -> (getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).isEmpty()), + TIMEOUT); } @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWithSwapOutInstanceDisabled") @@ -1076,7 +875,7 @@ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() { System.out.println( "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Get the SWAP_OUT instance. String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -1084,7 +883,7 @@ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() { .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); // Add instance with InstanceOperation set to SWAP_IN enabled before setting SWAP_OUT instance. - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1); } @@ -1094,7 +893,7 @@ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() { System.out.println( "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Get the SWAP_OUT instance. String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -1102,7 +901,7 @@ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() { .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); @@ -1117,7 +916,7 @@ public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() { System.out.println( "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at " + new Date(System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Get the SWAP_OUT instance. String instanceToSwapOutName = _participants.get(0).getInstanceName(); @@ -1125,7 +924,7 @@ public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() { .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); @@ -1139,10 +938,10 @@ public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() { } @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut") - public void testNodeSwapAddSwapInFirst() { + public void testNodeSwapAddSwapInFirst() throws Exception { System.out.println("START TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date( System.currentTimeMillis())); - resetInstances(); + removeOfflineOrDisabledOrSwapInInstances(); // Store original EV Map originalEVs = getEVs(); @@ -1155,13 +954,13 @@ public void testNodeSwapAddSwapInFirst() { .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); // Add instance with InstanceOperation set to SWAP_IN - String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT - Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Collections.emptySet(), Collections.emptySet()); @@ -1201,13 +1000,234 @@ public void testNodeSwapAddSwapInFirst() { // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); - Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), - 0); // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before // swap was completed. - validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, - Collections.emptySet(), Set.of(instanceToSwapInName)); + verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT); + } + + @Test(dependsOnMethods = "testNodeSwapAddSwapInFirst") + public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { + System.out.println( + "START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date( + System.currentTimeMillis())); + removeOfflineOrDisabledOrSwapInInstances(); + + // add a resource where downward state transition is slow + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", + PARTITIONS, REPLICA, REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); + // add a resource where downward state transition is slow + createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", + PARTITIONS, REPLICA, REPLICA - 1); + _allDBs.add("TEST_DB4_DELAYED_WAGED"); + // wait for assignment to finish + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // set bootstrap ST delay to a large number + _stateModelDelay = -10000L; + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, + InstanceConstants.InstanceOperation.EVACUATE); + // Messages should be pending at all instances besides the evacuate one + for (String participant : _participantNames) { + if (participant.equals(instanceToEvacuate)) { + continue; + } + verifier(() -> ((_dataAccessor.getChildNames( + _dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); + } + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + + // sleep a bit so ST messages can start executing + Thread.sleep(Math.abs(_stateModelDelay / 100)); + // before we cancel, check current EV + Map assignment = getEVs(); + for (String resource : _allDBs) { + // check every replica has >= 3 partitions and a top state partition + validateAssignmentInEv(assignment.get(resource)); + } + + // cancel the evacuation + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + + assignment = getEVs(); + for (String resource : _allDBs) { + // check every replica has >= 3 active replicas, even before cluster converge + validateAssignmentInEv(assignment.get(resource)); + } + + // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEVs(); + for (String resource : _allDBs) { + Assert.assertTrue( + getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas again + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") + public void testEvacuateAndCancelBeforeDropFinish() throws Exception { + System.out.println( + "START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new Date( + System.currentTimeMillis())); + + // set DROP ST delay to a large number + _stateModelDelay = 10000L; + + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, + InstanceConstants.InstanceOperation.EVACUATE); + + // message should be pending at the to evacuate participant + verifier(() -> ((_dataAccessor.getChildNames( + _dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + + // cancel evacuation + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + // check every replica has >= 3 active replicas, even before cluster converge + Map assignment = getEVs(); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEVs(); + for (String resource : _allDBs) { + Assert.assertTrue( + getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") + public void testMarkEvacuationAfterEMM() throws Exception { + System.out.println("START TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date( + System.currentTimeMillis())); + _stateModelDelay = 1000L; + Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); + String newParticipantName = PARTICIPANT_PREFIX + "_" + _nextStartPort; + addParticipant(newParticipantName); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Map assignment = getEVs(); + for (String resource : _allDBs) { + Assert.assertFalse( + getParticipantsInEv(assignment.get(resource)).contains(newParticipantName)); + } + + // set evacuate operation + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, + InstanceConstants.InstanceOperation.EVACUATE); + + // there should be no evacuation happening + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // exit MM + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + assignment = getEVs(); + List currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)) + .collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + + _stateModelDelay = 3L; + } + + @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") + public void testEvacuationWithOfflineInstancesInCluster() throws Exception { + System.out.println( + "START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date( + System.currentTimeMillis())); + _participants.get(1).syncStop(); + _participants.get(2).syncStop(); + + String evacuateInstanceName = _participants.get(_participants.size() - 2).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, + InstanceConstants.InstanceOperation.EVACUATE); + + Map assignment; + // EV should contain all participants, check resources one by one + assignment = getEVs(); + for (String resource : _allDBs) { + verifier(() -> { + ExternalView ev = assignment.get(resource); + for (String partition : ev.getPartitionSet()) { + AtomicInteger activeReplicaCount = new AtomicInteger(); + ev.getStateMap(partition).values().stream().filter( + v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals( + "FOLLOWER") || v.equals("STANDBY")) + .forEach(v -> activeReplicaCount.getAndIncrement()); + if (activeReplicaCount.get() < REPLICA - 1 || ( + ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap( + partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition) + .get(evacuateInstanceName).equals("LEADER"))) { + return false; + } + } + return true; + }, 30000); + } + + removeOfflineOrDisabledOrSwapInInstances(); + addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort); + addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort); + dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED")); + } + + /** + * Verifies that the given verifier returns true within the given timeout. Handles AssertionError + * by returning false, which TestHelper.verify will not do. Asserts that return value from + * TestHelper.verify is true. + * + * @param verifier the verifier to run + * @param timeout the timeout to wait for the verifier to return true + * @throws Exception if TestHelper.verify throws an exception + */ + private static void verifier(TestHelper.Verifier verifier, long timeout) throws Exception { + Assert.assertTrue(TestHelper.verify(() -> { + try { + boolean result = verifier.verify(); + if (!result) { + LOG.error("Verifier returned false, retrying..."); + } + return result; + } catch (AssertionError e) { + LOG.error("Caught AssertionError on verifier attempt: ", e); + return false; + } + }, timeout)); } private MockParticipantManager createParticipant(String participantName) { @@ -1237,6 +1257,7 @@ private void addParticipant(String participantName, String logicalId, String zon participant.syncStart(); _participants.add(participant); _participantNames.add(participantName); + _nextStartPort++; } private void addParticipant(String participantName) { @@ -1257,10 +1278,10 @@ private void createTestDBs(long delayTime) throws InterruptedException { PARTITIONS, REPLICA, REPLICA - 1); _allDBs.add("TEST_DB2_WAGED"); - Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); } - private void dropTestDBs(Set dbs) { + private void dropTestDBs(Set dbs) throws Exception { for (String db : dbs) { _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db); _allDBs.remove(db); @@ -1394,7 +1415,7 @@ private void validateEVCorrect(ExternalView actual, ExternalView original, } } - private void validateEVsCorrect(Map actuals, + private boolean validateEVsCorrect(Map actuals, Map originals, Map swapOutInstancesToSwapInInstances, Set inFlightSwapInInstances, Set completedSwapInInstanceNames) { Assert.assertEquals(actuals.keySet(), originals.keySet()); @@ -1402,6 +1423,7 @@ private void validateEVsCorrect(Map actuals, validateEVCorrect(actuals.get(resource), originals.get(resource), swapOutInstancesToSwapInInstances, inFlightSwapInInstances, completedSwapInInstanceNames); } + return true; } private void validateAssignmentInEv(ExternalView ev) { @@ -1461,8 +1483,8 @@ public StDelayMSStateModel() { private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{ while(sleepTime >0 && !isCancelled()) { - Thread.sleep(5000); - sleepTime = sleepTime - 5000; + Thread.sleep(TIMEOUT); + sleepTime = sleepTime - TIMEOUT; } if (isCancelled()) { _cancelled = false;