From 29337efc893eeab1cced5fe3786db20d5ce141b4 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Tue, 15 Aug 2023 17:15:27 -0700 Subject: [PATCH] Leader election client - Proactively send leader gone event when disconnect from ZK (#2585) --- .../leaderelection/LeaderElectionClient.java | 64 +++-- ...eaderElectionListenerInterfaceAdapter.java | 29 +- ...estConnectStateChangeListenerAndRetry.java | 32 +-- .../impl/zk/TestStressZkClient.java | 3 +- .../helix/metaclient/impl/zk/TestUtil.java | 24 ++ .../leaderelection/LeaderElectionPuppy.java | 5 + .../leaderelection/TestLeaderElection.java | 247 ++++++++++++------ .../TestMultiClientLeaderElection.java | 3 + 8 files changed, 273 insertions(+), 134 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index 7b13778c0d..3bcf09ceb3 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -33,6 +33,7 @@ import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; +import org.apache.helix.metaclient.exception.MetaClientBadVersionException; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; @@ -83,7 +84,7 @@ public class LeaderElectionClient implements AutoCloseable { /** * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient * instance underneath. - * When MetaClient is auto closed be cause of being disconnected and auto retry connection timed out, A new + * When MetaClient is auto closed because of being disconnected and auto retry connection timed out, A new * MetaClient instance will be created and keeps retry connection. * * @param metaClientConfig The config used to create an metaclient. @@ -257,20 +258,24 @@ private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectio } // check if current participant is the leader // read data and stats, check, and multi check + delete - ImmutablePair tup = _metaClient.getDataAndStat(key); - if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { - int expectedVersion = tup.right.getVersion(); - List ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion)); - //Execute transactional support on operations - List opResults = _metaClient.transactionOP(ops); - if (opResults.get(0).getType() == ERRORRESULT) { - if (isLeader(leaderPath)) { - // Participant re-elected as leader. - throw new ConcurrentModificationException("Concurrent operation, please retry"); - } else { - LOG.info("Someone else is already leader"); + try { + ImmutablePair tup = _metaClient.getDataAndStat(key); + if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { + int expectedVersion = tup.right.getVersion(); + List ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion)); + //Execute transactional support on operations + List opResults = _metaClient.transactionOP(ops); + if (opResults.get(0).getType() == ERRORRESULT) { + if (isLeader(leaderPath)) { + // Participant re-elected as leader. + throw new ConcurrentModificationException("Concurrent operation, please retry"); + } else { + LOG.info("Someone else is already leader"); + } } } + } catch (MetaClientNoNodeException ex) { + LOG.info("No Leader for participant pool {} when exit the pool", leaderPath); } } @@ -334,8 +339,10 @@ public List getParticipants(String leaderPath) { * @return A boolean value indicating if registration is success. */ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { - _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener), - false); + LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener); + _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, + adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there + _metaClient.subscribeStateChanges(adapter); return false; } @@ -344,7 +351,10 @@ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListe * @param listener An implementation of LeaderElectionListenerInterface */ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { - _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener)); + LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener); + _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter + ); + _metaClient.unsubscribeConnectStateChanges(adapter); } @Override @@ -395,6 +405,9 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath), MetaClientInterface.EntryMode.EPHEMERAL); } + } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED + && currentState == MetaClientInterface.ConnectState.CONNECTED) { + touchLeaderNode(); } } @@ -404,6 +417,25 @@ public void handleConnectionEstablishmentError(Throwable error) throws Exception } } + private void touchLeaderNode() { + for (String leaderPath : _leaderGroups) { + String key = leaderPath; + ImmutablePair tup = _metaClient.getDataAndStat(key); + if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { + int expectedVersion = tup.right.getVersion(); + try { + _metaClient.set(key, tup.left, expectedVersion); + } catch (MetaClientNoNodeException ex) { + LOG.info("leaderPath {} gone when retouch leader node.", key); + } catch (MetaClientBadVersionException e) { + LOG.info("New leader for leaderPath {} when retouch leader node.", key); + } catch (MetaClientException ex) { + LOG.warn("Failed to touch {} when reconnected.", key, ex); + } + } + } + } + public MetaClientInterface getMetaClient() { return _metaClient; } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java index 5c64d67902..ee790ac823 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java @@ -1,14 +1,18 @@ package org.apache.helix.metaclient.recipes.leaderelection; +import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; +import org.apache.helix.metaclient.api.MetaClientInterface; import static org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*; -public class LeaderElectionListenerInterfaceAdapter implements DataChangeListener { +public class LeaderElectionListenerInterfaceAdapter implements DataChangeListener, ConnectStateChangeListener { + private String _leaderPath; private final LeaderElectionListenerInterface _leaderElectionListener; - public LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface leaderElectionListener) { + public LeaderElectionListenerInterfaceAdapter(String leaderPath, LeaderElectionListenerInterface leaderElectionListener) { + _leaderPath = leaderPath; _leaderElectionListener = leaderElectionListener; } @@ -16,11 +20,12 @@ public LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface le public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception { switch (changeType) { case ENTRY_CREATED: + case ENTRY_UPDATE: String newLeader = ((LeaderInfo) data).getLeaderName(); - _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED, newLeader); + _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_ACQUIRED, newLeader); break; case ENTRY_DELETED: - _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, ""); + _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, ""); } } @@ -40,4 +45,20 @@ public boolean equals(Object o) { public int hashCode() { return _leaderElectionListener.hashCode(); } + + @Override + public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, + MetaClientInterface.ConnectState currentState) throws Exception { + if (currentState == MetaClientInterface.ConnectState.DISCONNECTED) { + // when disconnected, notify leader lost even though the ephmeral node is not gone until expire + // Leader election client will touch the node if reconnect before expire + _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, ""); + } + + } + + @Override + public void handleConnectionEstablishmentError(Throwable error) throws Exception { + + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java index 086db51c7a..f088140c6e 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java @@ -48,32 +48,14 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS; import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS; +import static org.apache.helix.metaclient.impl.zk.TestUtil.*; public class TestConnectStateChangeListenerAndRetry { - protected static final String ZK_ADDR = "localhost:2181"; + protected static final String ZK_ADDR = "localhost:2184"; protected static ZkServer _zkServer; - private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000; - private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000; - private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000; - - /** - * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly - * This need to be done in a separate thread to simulate ZkClient eventThread. - */ - private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException { - WatchedEvent event = - new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, - null); - zkClient.process(event); - - Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN); - - event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, - null); - zkClient.process(event); - } + @BeforeTest public void prepare() { @@ -82,11 +64,6 @@ public void prepare() { _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR); } - @AfterTest - public void cleanUp() { - System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis())); - } - @Test public void testConnectState() { System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis())); @@ -115,7 +92,7 @@ public void testReConnectSucceed() throws InterruptedException { @Override public void run() { try { - simulateZkStateReconnected(zkMetaClient.getZkClient()); + simulateZkStateReconnected(zkMetaClient); } catch (InterruptedException e) { Assert.fail("Exception in simulateZkStateReconnected", e); } @@ -170,6 +147,7 @@ public void handleConnectionEstablishmentError(Throwable error) throws Exception } catch (Exception ex) { Assert.assertTrue(ex instanceof IllegalStateException); } + zkMetaClient.unsubscribeConnectStateChanges(listener); } System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis())); } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java index 8a3f9c1154..6f358f0e8a 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java @@ -48,7 +48,8 @@ private void setUp() { } @AfterTest - private void tearDown() { + @Override + public void cleanUp() { this._zkMetaClient.close(); } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java index fbf1ab1f3a..067fe3eb5a 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java @@ -40,6 +40,10 @@ public class TestUtil { + public static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000; + public static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000; + public static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000; + static java.lang.reflect.Field getField(Class clazz, String fieldName) throws NoSuchFieldException { try { @@ -161,4 +165,24 @@ public void process(WatchedEvent event) { "Fail to expire current session, zk: " + curZookeeper); } + + + /** + * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly + * This need to be done in a separate thread to simulate ZkClient eventThread. + */ + public static void simulateZkStateReconnected(ZkMetaClient client) throws InterruptedException { + final ZkClient zkClient = client.getZkClient(); + WatchedEvent event = + new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, + null); + zkClient.process(event); + + Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN); + + event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, + null); + zkClient.process(event); + } + } \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java index 5f1fdf6315..3f123d3ac8 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java @@ -82,6 +82,11 @@ protected void cleanup() { _leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup); } catch (MetaClientException ignore) { // already leave the pool. OK to throw exception. + } finally { + try { + _leaderElectionClient.close(); + } catch (Exception e) { + } } } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index 71d85fdb9f..75917623c7 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -5,9 +5,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.TestUtil; import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.testng.Assert; +import org.testng.annotations.AfterTest; import org.testng.annotations.Test; import static org.apache.helix.metaclient.impl.zk.TestUtil.*; @@ -19,13 +23,24 @@ public class TestLeaderElection extends ZkMetaClientTestBase { private static final String PARTICIPANT_NAME2 = "participant_2"; private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1"; - public static LeaderElectionClient createLeaderElectionClient(String participantName) { + public static LeaderElectionClient createLeaderElectionClient(String participantName) { MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER; MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build(); return new LeaderElectionClient(config, participantName); } + @AfterTest + @Override + public void cleanUp() { + ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) + .build(); + try (ZkMetaClient client = new ZkMetaClient<>(config)) { + client.connect(); + client.recursiveDelete(LEADER_PATH); + } + } + @Test public void testAcquireLeadership() throws Exception { System.out.println("START TestLeaderElection.testAcquireLeadership"); @@ -47,7 +62,6 @@ public void testAcquireLeadership() throws Exception { Assert.assertEquals(clt1.getLeader(leaderPath), clt2.getLeader(leaderPath)); Assert.assertEquals(clt1.getLeader(leaderPath), PARTICIPANT_NAME1); - // client 1 exit leader election group, and client 2 should be current leader. clt1.exitLeaderElectionParticipantPool(leaderPath); @@ -75,7 +89,7 @@ public void testAcquireLeadership() throws Exception { System.out.println("END TestLeaderElection.testAcquireLeadership"); } - @Test + @Test(dependsOnMethods = "testAcquireLeadership") public void testElectionPoolMembership() throws Exception { System.out.println("START TestLeaderElection.testElectionPoolMembership"); String leaderPath = LEADER_PATH + "/_testElectionPoolMembership"; @@ -111,49 +125,12 @@ public void testElectionPoolMembership() throws Exception { clt2.exitLeaderElectionParticipantPool(leaderPath); Assert.assertNull(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2)); + clt1.close(); + clt2.close(); System.out.println("END TestLeaderElection.testElectionPoolMembership"); } - @Test - public void testSessionExpire() throws Exception { - System.out.println("START TestLeaderElection.testSessionExpire"); - String leaderPath = LEADER_PATH + "/_testSessionExpire"; - LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); - participantInfo.setSimpleField("Key1", "value1"); - LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); - participantInfo2.setSimpleField("Key2", "value2"); - LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); - LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); - - clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); - try { - clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // no op - } catch (ConcurrentModificationException ex) { - // expected - Assert.assertEquals(ex.getClass().getName(), "java.util.ConcurrentModificationException"); - } - clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); - // a leader should be up - Assert.assertTrue(MetaClientTestUtil.verify(() -> { - return (clt1.getLeader(leaderPath) != null); - }, MetaClientTestUtil.WAIT_DURATION)); - - // session expire and reconnect - expireSession((ZkMetaClient) clt1.getMetaClient()); - - Assert.assertTrue(MetaClientTestUtil.verify(() -> { - return (clt1.getLeader(leaderPath) != null); - }, MetaClientTestUtil.WAIT_DURATION)); - Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath)); - Assert.assertNotNull(clt1.getLeader(leaderPath)); - // when session recreated, participant info node should maintain - Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); - Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); - Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); - Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); - System.out.println("END TestLeaderElection.testSessionExpire"); - } - @Test (dependsOnMethods = "testAcquireLeadership") + @Test(dependsOnMethods = "testElectionPoolMembership") public void testLeadershipListener() throws Exception { System.out.println("START TestLeaderElection.testLeadershipListener"); String leaderPath = LEADER_PATH + "/testLeadershipListener"; @@ -165,45 +142,45 @@ public void testLeadershipListener() throws Exception { final int count = 10; final int[] numNewLeaderEvent = {0}; final int[] numLeaderGoneEvent = {0}; - CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2); - CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2); - - LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { - - @Override - public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { - if (type == ChangeType.LEADER_LOST) { - countDownLatchLeaderGone.countDown(); - Assert.assertEquals(curLeader.length(), 0); - numLeaderGoneEvent[0]++; - } else if (type == ChangeType.LEADER_ACQUIRED) { - countDownLatchNewLeader.countDown(); - numNewLeaderEvent[0]++; - Assert.assertTrue(curLeader.length()!=0); - } else { - Assert.fail(); - } - } - }; + CountDownLatch countDownLatchNewLeader = new CountDownLatch(count * 2); + CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count * 2); + + LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { + + @Override + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { + if (type == ChangeType.LEADER_LOST) { + countDownLatchLeaderGone.countDown(); + Assert.assertEquals(curLeader.length(), 0); + numLeaderGoneEvent[0]++; + } else if (type == ChangeType.LEADER_ACQUIRED) { + countDownLatchNewLeader.countDown(); + numNewLeaderEvent[0]++; + Assert.assertTrue(curLeader.length() != 0); + } else { + Assert.fail(); + } + } + }; clt3.subscribeLeadershipChanges(leaderPath, listener); // each iteration will be participant_1 is new leader, leader gone, participant_2 is new leader, leader gone - for (int i=0; i { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); - // clt1 gone clt1.exitLeaderElectionParticipantPool(leaderPath); Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(leaderPath) != null); @@ -227,20 +237,63 @@ private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, Le }, MetaClientTestUtil.WAIT_DURATION)); clt2.exitLeaderElectionParticipantPool(leaderPath); - + clt1.close(); + clt2.close(); + clt3.close(); + System.out.println("END TestLeaderElection.testRelinquishLeadership"); } - @Test (dependsOnMethods = "testLeadershipListener") - public void testRelinquishLeadership() throws Exception { - System.out.println("START TestLeaderElection.testRelinquishLeadership"); - String leaderPath = LEADER_PATH + "/testRelinquishLeadership"; + @Test(dependsOnMethods = "testAcquireLeadership") + public void testSessionExpire() throws Exception { + System.out.println("START TestLeaderElection.testSessionExpire"); + String leaderPath = LEADER_PATH + "/_testSessionExpire"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); - LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2); + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + try { + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // no op + } catch (ConcurrentModificationException ex) { + // expected + Assert.assertEquals(ex.getClass().getName(), "java.util.ConcurrentModificationException"); + } + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + // a leader should be up + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + // session expire and reconnect + expireSession((ZkMetaClient) clt1.getMetaClient()); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath)); + Assert.assertNotNull(clt1.getLeader(leaderPath)); + // when session recreated, participant info node should maintain + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + clt1.close(); + clt2.close(); + System.out.println("END TestLeaderElection.testSessionExpire"); + } + + @Test(dependsOnMethods = "testSessionExpire") + public void testClientDisconnectAndReconnectBeforeExpire() throws Exception { + System.out.println("START TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire"); + String leaderPath = LEADER_PATH + "/testClientDisconnectAndReconnectBeforeExpire"; + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); final int count = 1; - CountDownLatch countDownLatchNewLeader = new CountDownLatch(count); + CountDownLatch countDownLatchNewLeader = new CountDownLatch(count + 1); CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count); LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { @@ -250,27 +303,51 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea if (type == ChangeType.LEADER_LOST) { countDownLatchLeaderGone.countDown(); Assert.assertEquals(curLeader.length(), 0); + System.out.println("gone leader"); } else if (type == ChangeType.LEADER_ACQUIRED) { countDownLatchNewLeader.countDown(); - Assert.assertTrue(curLeader.length()!=0); + Assert.assertTrue(curLeader.length() != 0); + System.out.println("new leader"); } else { Assert.fail(); } } }; + clt1.subscribeLeadershipChanges(leaderPath, listener); clt1.joinLeaderElectionParticipantPool(leaderPath); clt2.joinLeaderElectionParticipantPool(leaderPath); - clt3.subscribeLeadershipChanges(leaderPath, listener); - // clt1 gone - clt1.relinquishLeader(leaderPath); - - // participant 1 should have gone, and a leader gone event is sent + // check leader node version before we simulate disconnect. Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(leaderPath) != null); }, MetaClientTestUtil.WAIT_DURATION)); + int leaderNodeVersion = ((ZkMetaClient) clt1.getMetaClient()).exists(leaderPath + "/LEADER").getVersion(); + System.out.println("version " + leaderNodeVersion); + + // clt1 disconnected and reconnected before session expire + simulateZkStateReconnected((ZkMetaClient) clt1.getMetaClient()); + + Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); + leaderNodeVersion = ((ZkMetaClient) clt2.getMetaClient()).exists(leaderPath + "/LEADER").getVersion(); + System.out.println("version " + leaderNodeVersion); + + clt1.exitLeaderElectionParticipantPool(leaderPath); + clt2.exitLeaderElectionParticipantPool(leaderPath); + clt1.close(); + clt2.close(); + System.out.println("END TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire"); + } + + private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) + throws Exception { + clt1.joinLeaderElectionParticipantPool(leaderPath); + clt2.joinLeaderElectionParticipantPool(leaderPath); + + Thread.sleep(2000); + + // clt1 gone clt1.exitLeaderElectionParticipantPool(leaderPath); Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(leaderPath) != null); @@ -280,7 +357,5 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea }, MetaClientTestUtil.WAIT_DURATION)); clt2.exitLeaderElectionParticipantPool(leaderPath); - System.out.println("END TestLeaderElection.testRelinquishLeadership"); } - } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java index 0fe4245012..4fb508790f 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java @@ -51,11 +51,14 @@ private void setUp() { _zkMetaClient.create("/Parent/a", ""); } @AfterTest + @Override public void cleanUp() { try { _zkMetaClient.recursiveDelete(_leaderElectionGroup); } catch (MetaClientException ex) { _zkMetaClient.recursiveDelete(_leaderElectionGroup); + } finally { + _zkMetaClient.close(); } }