Skip to content

Commit

Permalink
Fix flaky test TestP2PNoDuplicatedMessage (#2587)
Browse files Browse the repository at this point in the history
Co-authored-by: Xiaxuan Gao
  • Loading branch information
MarkGaox authored Sep 7, 2023
1 parent d00a37c commit 0e82bd3
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,36 @@ public void testP2PStateTransitionDisabled() {
}

@Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
public void testP2PStateTransitionEnabled() {
public void testP2PStateTransitionEnabled() throws Exception {
enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
long startTime = System.currentTimeMillis();
MockHelixTaskExecutor.resetStats();
// rolling upgrade the cluster
for (String ins : _instances) {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifyP2PEnabled(startTime);

_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifyP2PEnabled(startTime);
verifyP2P(startTime, ins, false);
verifyP2P(startTime, ins, true);
}

// The success rate really depends on how quick participant act in relationship with controller.
// For now, we set 90% threshold.
long threshold = Math.round(total * 0.9);
Assert.assertTrue( p2pTrigged > Math.round(total * 0.9));
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
"There are duplicated transition messages sent while participant is handling the state-transition!");
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
"There are duplicated transition messages sent at same time!");
}

private void verifyP2P(long startTime, String instance, boolean enabled) throws Exception {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertTrue(TestHelper.verify(() -> {
total = 0;
p2pTriggered = 0;
verifyP2PEnabled(startTime);
return total == p2pTriggered;
}, TestHelper.WAIT_DURATION),
"Number of successful p2p transitions when disable instance " + instance + ": "
+ p2pTriggered + " , expect: " + total);
Thread.sleep(5000);
}

private void verifyP2PDisabled() {
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
dataCache.refresh(_accessor);
Expand All @@ -208,7 +213,7 @@ private void verifyP2PDisabled() {
}

static int total = 0;
static int p2pTrigged = 0;
static int p2pTriggered = 0;

private void verifyP2PEnabled(long startTime) {
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
Expand All @@ -226,7 +231,7 @@ private void verifyP2PEnabled(long startTime) {
if (state.equalsIgnoreCase("MASTER") && start > startTime) {
String triggerHost = currentState.getTriggerHost(partition);
if (!triggerHost.equals(_controllerName)) {
p2pTrigged ++;
p2pTriggered ++;
}
total ++;
}
Expand Down

0 comments on commit 0e82bd3

Please sign in to comment.