Skip to content

Commit

Permalink
Add stress test for Metaclient leader election (#2574)
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu authored and junkaixue committed Aug 28, 2023
1 parent a1fa502 commit 1f9cae8
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* under the License.
*/

import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -27,17 +29,21 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.io.FileUtils;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
Expand Down Expand Up @@ -69,14 +75,14 @@ private static void simulateZkStateReconnected(ZkClient zkClient) throws Interru
zkClient.process(event);
}

@BeforeSuite
@BeforeTest
public void prepare() {
System.out.println("START TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
// start local zookeeper server
_zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
}

@AfterSuite
@AfterTest
public void cleanUp() {
System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
}
Expand Down Expand Up @@ -162,7 +168,6 @@ public void handleConnectionEstablishmentError(Throwable error) throws Exception
zkMetaClient.create("/key", "value");
Assert.fail("Create call after close should throw IllegalStateException");
} catch (Exception ex) {
System.out.println("ex " + ex);
Assert.assertTrue(ex instanceof IllegalStateException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class ZkMetaClientTestBase {
*/
@BeforeSuite
public void prepare() {
System.out.println("ZkMetaClientTestBase start ");
// Enable extended types and create a ZkClient
System.setProperty("zookeeper.extendedTypesEnabled", "true");
// start local zookeeper server
Expand All @@ -57,6 +58,7 @@ public void prepare() {

@AfterSuite
public void cleanUp() {
System.out.println("ZkMetaClientTestBase shut down");
_zkServer.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.helix.metaclient.api.MetaClientInterface;
import java.util.HashMap;


/**
* AbstractPuppy object contains interfaces to implement puppy and main logics to manage puppy life cycle
*/
Expand All @@ -32,7 +33,6 @@ public abstract class AbstractPuppy implements Runnable {
public final HashMap<String, Integer> _eventChangeCounterMap;
public int _unhandledErrorCounter;


public AbstractPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
_metaclient = metaclient;
_puppySpec = puppySpec;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.helix.metaclient.recipes.leaderelection;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.helix.metaclient.MetaClientTestUtil;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.puppy.AbstractPuppy;
import org.apache.helix.metaclient.puppy.PuppySpec;
import org.testng.Assert;


public class LeaderElectionPuppy extends AbstractPuppy {
String _leaderGroup;
String _participant;
private final Random _random;
LeaderElectionClient _leaderElectionClient;

public LeaderElectionPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
super(metaclient, puppySpec);
_random = new Random();
}

public LeaderElectionPuppy(LeaderElectionClient leaderElectionClient, PuppySpec puppySpec, String leaderGroup,
String participant) {
super(leaderElectionClient.getMetaClient(), puppySpec);
_leaderElectionClient = leaderElectionClient;
_leaderGroup = leaderGroup;
_random = new Random();
_participant = participant;
}

@Override
protected void bark() throws Exception {
int randomNumber = _random.nextInt((int) TimeUnit.SECONDS.toMillis(5));
System.out.println("LeaderElectionPuppy " + _participant + " Joining");
_leaderElectionClient.joinLeaderElectionParticipantPool(_leaderGroup);

Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (_leaderElectionClient.getLeader(_leaderGroup) != null);
}, MetaClientTestUtil.WAIT_DURATION));
if (_random.nextBoolean()) {
_leaderElectionClient.relinquishLeader(_leaderGroup);
}
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (_leaderElectionClient.getParticipantInfo(_leaderGroup, _participant) != null);
}, MetaClientTestUtil.WAIT_DURATION));

Thread.sleep(randomNumber);
System.out.println("LeaderElectionPuppy " + _participant + " Leaving");
_leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup);
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (_leaderElectionClient.getParticipantInfo(_leaderGroup, _participant) == null);
}, MetaClientTestUtil.WAIT_DURATION));

Thread.sleep(randomNumber);
}

@Override
protected void cleanup() {
try {
System.out.println("Cleaning - LeaderElectionPuppy " + _participant + " Leaving");
_leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup);
} catch (MetaClientException ignore) {
// already leave the pool. OK to throw exception.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
private static final String PARTICIPANT_NAME2 = "participant_2";
private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1";

public LeaderElectionClient createLeaderElectionClient(String participantName) {
public static LeaderElectionClient createLeaderElectionClient(String participantName) {
MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER;
MetaClientConfig config =
new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build();
Expand All @@ -28,50 +28,57 @@ public LeaderElectionClient createLeaderElectionClient(String participantName) {

@Test
public void testAcquireLeadership() throws Exception {
String leaderPath = LEADER_PATH + "testAcquireLeadership";
System.out.println("START TestLeaderElection.testAcquireLeadership");
String leaderPath = LEADER_PATH + "/testAcquireLeadership";

// create 2 clients representing 2 participants
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);

clt1.getMetaClient().create(LEADER_PATH, new LeaderInfo(LEADER_PATH));

clt1.joinLeaderElectionParticipantPool(leaderPath);
clt2.joinLeaderElectionParticipantPool(leaderPath);
// First client joining the leader election group should be current leader
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));
Assert.assertNotNull(clt1.getLeader(LEADER_PATH));
Assert.assertEquals(clt1.getLeader(LEADER_PATH), clt2.getLeader(LEADER_PATH));
Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1);
Assert.assertNotNull(clt1.getLeader(leaderPath));
Assert.assertEquals(clt1.getLeader(leaderPath), clt2.getLeader(leaderPath));
Assert.assertEquals(clt1.getLeader(leaderPath), PARTICIPANT_NAME1);


// client 1 exit leader election group, and client 2 should be current leader.
clt1.exitLeaderElectionParticipantPool(LEADER_PATH);
clt1.exitLeaderElectionParticipantPool(leaderPath);

Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2));
return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
}, MetaClientTestUtil.WAIT_DURATION));

// client1 join and client2 leave. client 1 should be leader.
clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
clt2.exitLeaderElectionParticipantPool(LEADER_PATH);
clt1.joinLeaderElectionParticipantPool(leaderPath);
clt2.exitLeaderElectionParticipantPool(leaderPath);
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1));
return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME1));
}, MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(clt1.isLeader(LEADER_PATH));
Assert.assertFalse(clt2.isLeader(LEADER_PATH));
Assert.assertTrue(clt1.isLeader(leaderPath));
Assert.assertFalse(clt2.isLeader(leaderPath));

clt1.close();
clt2.close();
System.out.println("END TestLeaderElection.testAcquireLeadership");
}

@Test
public void testElectionPoolMembership() throws Exception {
String leaderPath = LEADER_PATH + "/testElectionPoolMembership";
System.out.println("START TestLeaderElection.testElectionPoolMembership");
String leaderPath = LEADER_PATH + "/_testElectionPoolMembership";
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
participantInfo.setSimpleField("Key1", "value1");
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
Expand Down Expand Up @@ -103,12 +110,14 @@ public void testElectionPoolMembership() throws Exception {
clt1.exitLeaderElectionParticipantPool(leaderPath);
clt2.exitLeaderElectionParticipantPool(leaderPath);

Assert.assertNull(clt2.getParticipantInfo(LEADER_PATH, PARTICIPANT_NAME2));
Assert.assertNull(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2));
System.out.println("END TestLeaderElection.testElectionPoolMembership");
}

@Test
public void testSessionExpire() throws Exception {
String leaderPath = LEADER_PATH + "/testSessionExpire";
System.out.println("START TestLeaderElection.testSessionExpire");
String leaderPath = LEADER_PATH + "/_testSessionExpire";
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
participantInfo.setSimpleField("Key1", "value1");
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
Expand Down Expand Up @@ -142,10 +151,12 @@ public void testSessionExpire() throws Exception {
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
System.out.println("END TestLeaderElection.testSessionExpire");
}
@Test (dependsOnMethods = "testAcquireLeadership")
public void testLeadershipListener() throws Exception {
String leaderPath = LEADER_PATH + "testLeadershipListener";
System.out.println("START TestLeaderElection.testLeadershipListener");
String leaderPath = LEADER_PATH + "/testLeadershipListener";
// create 2 clients representing 2 participants
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
Expand Down Expand Up @@ -189,7 +200,6 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea
Assert.assertEquals(numNewLeaderEvent[0], count*2);

clt3.unsubscribeLeadershipChanges(leaderPath, listener);

// listener shouldn't receive any event after unsubscribe
joinPoolTestHelper(leaderPath, clt1, clt2);
Assert.assertEquals(numLeaderGoneEvent[0], count*2);
Expand All @@ -198,6 +208,7 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea
clt1.close();
clt2.close();
clt3.close();
System.out.println("END TestLeaderElection.testLeadershipListener");
}

private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) throws Exception {
Expand All @@ -221,7 +232,8 @@ private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, Le

@Test (dependsOnMethods = "testLeadershipListener")
public void testRelinquishLeadership() throws Exception {
String leaderPath = LEADER_PATH + "testRelinquishLeadership";
System.out.println("START TestLeaderElection.testRelinquishLeadership");
String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
Expand Down Expand Up @@ -268,6 +280,7 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea
}, MetaClientTestUtil.WAIT_DURATION));

clt2.exitLeaderElectionParticipantPool(leaderPath);
System.out.println("END TestLeaderElection.testRelinquishLeadership");
}

}
Loading

0 comments on commit 1f9cae8

Please sign in to comment.