Skip to content

Commit

Permalink
Leader election client - Proactively send leader gone event when disc…
Browse files Browse the repository at this point in the history
…onnect from ZK (#2585)
  • Loading branch information
xyuanlu authored Aug 16, 2023
1 parent 9b4f33c commit 29337ef
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class LeaderElectionClient implements AutoCloseable {
/**
* Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient
* instance underneath.
* When MetaClient is auto closed be cause of being disconnected and auto retry connection timed out, A new
* When MetaClient is auto closed because of being disconnected and auto retry connection timed out, A new
* MetaClient instance will be created and keeps retry connection.
*
* @param metaClientConfig The config used to create an metaclient.
Expand Down Expand Up @@ -257,20 +258,24 @@ private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectio
}
// check if current participant is the leader
// read data and stats, check, and multi check + delete
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion));
//Execute transactional support on operations
List<OpResult> opResults = _metaClient.transactionOP(ops);
if (opResults.get(0).getType() == ERRORRESULT) {
if (isLeader(leaderPath)) {
// Participant re-elected as leader.
throw new ConcurrentModificationException("Concurrent operation, please retry");
} else {
LOG.info("Someone else is already leader");
try {
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion));
//Execute transactional support on operations
List<OpResult> opResults = _metaClient.transactionOP(ops);
if (opResults.get(0).getType() == ERRORRESULT) {
if (isLeader(leaderPath)) {
// Participant re-elected as leader.
throw new ConcurrentModificationException("Concurrent operation, please retry");
} else {
LOG.info("Someone else is already leader");
}
}
}
} catch (MetaClientNoNodeException ex) {
LOG.info("No Leader for participant pool {} when exit the pool", leaderPath);
}
}

Expand Down Expand Up @@ -334,8 +339,10 @@ public List<String> getParticipants(String leaderPath) {
* @return A boolean value indicating if registration is success.
*/
public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener),
false);
LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
_metaClient.subscribeStateChanges(adapter);
return false;
}

Expand All @@ -344,7 +351,10 @@ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListe
* @param listener An implementation of LeaderElectionListenerInterface
*/
public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener));
LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
);
_metaClient.unsubscribeConnectStateChanges(adapter);
}

@Override
Expand Down Expand Up @@ -395,6 +405,9 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath),
MetaClientInterface.EntryMode.EPHEMERAL);
}
} else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
&& currentState == MetaClientInterface.ConnectState.CONNECTED) {
touchLeaderNode();
}
}

Expand All @@ -404,6 +417,25 @@ public void handleConnectionEstablishmentError(Throwable error) throws Exception
}
}

private void touchLeaderNode() {
for (String leaderPath : _leaderGroups) {
String key = leaderPath;
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
try {
_metaClient.set(key, tup.left, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
} catch (MetaClientBadVersionException e) {
LOG.info("New leader for leaderPath {} when retouch leader node.", key);
} catch (MetaClientException ex) {
LOG.warn("Failed to touch {} when reconnected.", key, ex);
}
}
}
}

public MetaClientInterface getMetaClient() {
return _metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
package org.apache.helix.metaclient.recipes.leaderelection;

import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.MetaClientInterface;

import static org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*;


public class LeaderElectionListenerInterfaceAdapter implements DataChangeListener {
public class LeaderElectionListenerInterfaceAdapter implements DataChangeListener, ConnectStateChangeListener {
private String _leaderPath;
private final LeaderElectionListenerInterface _leaderElectionListener;

public LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface leaderElectionListener) {
public LeaderElectionListenerInterfaceAdapter(String leaderPath, LeaderElectionListenerInterface leaderElectionListener) {
_leaderPath = leaderPath;
_leaderElectionListener = leaderElectionListener;
}

@Override
public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception {
switch (changeType) {
case ENTRY_CREATED:
case ENTRY_UPDATE:
String newLeader = ((LeaderInfo) data).getLeaderName();
_leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED, newLeader);
_leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_ACQUIRED, newLeader);
break;
case ENTRY_DELETED:
_leaderElectionListener.onLeadershipChange(key, LEADER_LOST, "");
_leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, "");
}
}

Expand All @@ -40,4 +45,20 @@ public boolean equals(Object o) {
public int hashCode() {
return _leaderElectionListener.hashCode();
}

@Override
public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
MetaClientInterface.ConnectState currentState) throws Exception {
if (currentState == MetaClientInterface.ConnectState.DISCONNECTED) {
// when disconnected, notify leader lost even though the ephmeral node is not gone until expire
// Leader election client will touch the node if reconnect before expire
_leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, "");
}

}

@Override
public void handleConnectionEstablishmentError(Throwable error) throws Exception {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,14 @@

import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
import static org.apache.helix.metaclient.impl.zk.TestUtil.*;


public class TestConnectStateChangeListenerAndRetry {
protected static final String ZK_ADDR = "localhost:2181";
protected static final String ZK_ADDR = "localhost:2184";
protected static ZkServer _zkServer;

private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;

/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
* This need to be done in a separate thread to simulate ZkClient eventThread.
*/
private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException {
WatchedEvent event =
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected,
null);
zkClient.process(event);

Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);

event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
null);
zkClient.process(event);
}


@BeforeTest
public void prepare() {
Expand All @@ -82,11 +64,6 @@ public void prepare() {
_zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
}

@AfterTest
public void cleanUp() {
System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
}

@Test
public void testConnectState() {
System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis()));
Expand Down Expand Up @@ -115,7 +92,7 @@ public void testReConnectSucceed() throws InterruptedException {
@Override
public void run() {
try {
simulateZkStateReconnected(zkMetaClient.getZkClient());
simulateZkStateReconnected(zkMetaClient);
} catch (InterruptedException e) {
Assert.fail("Exception in simulateZkStateReconnected", e);
}
Expand Down Expand Up @@ -170,6 +147,7 @@ public void handleConnectionEstablishmentError(Throwable error) throws Exception
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IllegalStateException);
}
zkMetaClient.unsubscribeConnectStateChanges(listener);
}
System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private void setUp() {
}

@AfterTest
private void tearDown() {
@Override
public void cleanUp() {
this._zkMetaClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@

public class TestUtil {

public static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
public static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
public static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;

static java.lang.reflect.Field getField(Class clazz, String fieldName)
throws NoSuchFieldException {
try {
Expand Down Expand Up @@ -161,4 +165,24 @@ public void process(WatchedEvent event) {
"Fail to expire current session, zk: " + curZookeeper);
}



/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
* This need to be done in a separate thread to simulate ZkClient eventThread.
*/
public static void simulateZkStateReconnected(ZkMetaClient client) throws InterruptedException {
final ZkClient zkClient = client.getZkClient();
WatchedEvent event =
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected,
null);
zkClient.process(event);

Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);

event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
null);
zkClient.process(event);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ protected void cleanup() {
_leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup);
} catch (MetaClientException ignore) {
// already leave the pool. OK to throw exception.
} finally {
try {
_leaderElectionClient.close();
} catch (Exception e) {
}
}
}
}
Loading

0 comments on commit 29337ef

Please sign in to comment.