Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 20, 2023
1 parent 94f6a8d commit 41bdfcf
Showing 1 changed file with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
Expand Down Expand Up @@ -89,9 +90,11 @@ public void beforeClass() throws Exception {

ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.stateTransitionCancelEnabled(true);
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(1800000L);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

createTestDBs(200);
createTestDBs(1800000L);

setUpWagedBaseline();

Expand Down Expand Up @@ -199,7 +202,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception {
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB3_DELAYED_CRUSHED");
// add a resource where downward state transition is slow
createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
Expand Down Expand Up @@ -338,21 +341,41 @@ public void testMarkEvacuationAfterEMM() throws Exception {

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
_participants.get(1).syncStop();
_participants.get(2).syncStop();
_participants.get(3).syncStop();
// wait for converge, and set evacuate on instance 0
Assert.assertTrue(_clusterVerifier.verifyByPolling());

String evacuateInstanceName = _participants.get(0).getInstanceName();
//_baseAccessor.remove(PropertyPathBuilder.liveInstance(CLUSTER_NAME, _participantNames.get(1)), 0);
//_baseAccessor.remove(PropertyPathBuilder.liveInstance(CLUSTER_NAME, _participantNames.get(2)), 0);

String evacuateInstanceName = _participants.get(_participants.size()-2).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);

Map<String, IdealState> assignment;
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);
Map<String, ExternalView> assignment;
// EV should contain all participants, check resources one by one
assignment = getEV();
for (String resource : _allDBs) {
ExternalView ev = assignment.get(resource);
for (String partition : ev.getPartitionSet()) {
AtomicInteger activeReplicaCount = new AtomicInteger();
ev.getStateMap(partition)
.values()
.stream()
.filter(
v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
"STANDBY"))
.forEach(v -> activeReplicaCount.getAndIncrement());
Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition)
.get(evacuateInstanceName)
.equals("MASTER") && ev.getStateMap(partition)
.get(evacuateInstanceName)
.equals("LEADER"));

}
}

_participants.get(3).syncStart();
_participants.get(1).syncStart();
_participants.get(2).syncStart();
}

Expand Down

0 comments on commit 41bdfcf

Please sign in to comment.