Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Oct 13, 2023
1 parent cca528a commit e6eb423
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ class RealmAwareZkClientConfig {
protected String _monitorInstanceName = null;
protected boolean _monitorRootPathOnly = true;

protected boolean _usePersistWatcher = false;

public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
this._zkSerializer = zkSerializer;
return this;
Expand Down Expand Up @@ -632,6 +634,11 @@ public RealmAwareZkClientConfig setConnectInitTimeout(long _connectInitTimeout)
return this;
}

public RealmAwareZkClientConfig setUsePersistWatcher(boolean usePersistWatcher) {
this._usePersistWatcher = usePersistWatcher;
return this;
}

public PathBasedZkSerializer getZkSerializer() {
if (_zkSerializer == null) {
_zkSerializer = new BasicZkSerializer(new SerializableSerializer());
Expand Down Expand Up @@ -663,6 +670,10 @@ public long getConnectInitTimeout() {
return _connectInitTimeout;
}

public boolean isUsePersistWatcher() {
return _usePersistWatcher;
}

/**
* Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect
new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout());

// Create a ZkClient
_rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(),
clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(),
clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
_rawZkClient =
new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(),
clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly(), true,
clientConfig.isUsePersistWatcher());
}

@Override
Expand Down Expand Up @@ -159,12 +160,12 @@ public void unsubscribeStateChanges(IZkStateListener listener) {

@Override
public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {

_rawZkClient.subscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {

_rawZkClient.unsubscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ public void unsubscribeStateChanges(

@Override
public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {

getZkClient(path).subscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {

getZkClient(path).unsubscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
Expand Down Expand Up @@ -711,9 +711,9 @@ private String updateRoutingDataOnCacheMiss(String path) throws InvalidRoutingDa
private ZkClient createZkClient(String zkAddress) {
LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(),
_clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
_clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
_clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly());
_clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, _clientConfig.getMonitorType(),
_clientConfig.getMonitorKey(), _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly(),
true, _clientConfig.isUsePersistWatcher());
}

private void checkClosedState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
Expand All @@ -39,7 +41,9 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
Expand Down Expand Up @@ -648,6 +652,68 @@ public void testClose() {
}
}

@Test(dependsOnMethods = "testClose")
public void testFederatedZkClientWithPersistListener() throws InvalidRoutingDataException, InterruptedException {
RealmAwareZkClient realmAwareZkClient =
new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig().setUsePersistWatcher(true));
int count = 100;
final AtomicInteger[] event_count = {new AtomicInteger(0)};
final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
// for each iteration, we will edit a node, create a child, create a grand child, and
// delete child. Expect 4 event per iteration. -> total event should be count*4
CountDownLatch countDownLatch1 = new CountDownLatch(count * 4);
CountDownLatch countDownLatch2 = new CountDownLatch(count);
realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
String path = TEST_VALID_PATH + "/testFederatedZkClientWithPersistListener";
RecursivePersistListener rcListener = new RecursivePersistListener() {
@Override
public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception {
countDownLatch1.countDown();
event_count[0].incrementAndGet();
}
};
realmAwareZkClient.create(path, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.subscribePersistRecursiveListener(path, rcListener);
for (int i = 0; i < count; ++i) {
realmAwareZkClient.writeData(path, "data7" + i, -1);
realmAwareZkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT);
realmAwareZkClient.delete(path + "/c1_" + i + "/c2");
}
Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));

// subscribe a persist child watch, it should throw exception
IZkChildListener childListener2 = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
countDownLatch2.countDown();
event_count2[0].incrementAndGet();
}
};
try {
realmAwareZkClient.subscribeChildChanges(path, childListener2, false);
} catch (Exception ex) {
Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
}

// unsubscribe recursive persist watcher, and subscribe persist watcher should success.
realmAwareZkClient.unsubscribePersistRecursiveListener(path, rcListener);
realmAwareZkClient.subscribeChildChanges(path, childListener2, false);
// we should only get 100 event since only 100 direct child change.
for (int i = 0; i < count; ++i) {
realmAwareZkClient.writeData(path, "data7" + i, -1);
realmAwareZkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT);
realmAwareZkClient.delete(path + "/c2_" + i + "/c3");
}
Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));

realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
realmAwareZkClient.close();
}


@Override
public void testMultiSetup() throws InvalidRoutingDataException {
super.testMultiSetup();
Expand Down

0 comments on commit e6eb423

Please sign in to comment.