diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index 7abbd23599..5961b96415 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -577,6 +577,8 @@ class RealmAwareZkClientConfig { protected String _monitorInstanceName = null; protected boolean _monitorRootPathOnly = true; + protected boolean _usePersistWatcher = false; + public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) { this._zkSerializer = zkSerializer; return this; @@ -632,6 +634,11 @@ public RealmAwareZkClientConfig setConnectInitTimeout(long _connectInitTimeout) return this; } + public RealmAwareZkClientConfig setUsePersistWatcher(boolean usePersistWatcher) { + this._usePersistWatcher = usePersistWatcher; + return this; + } + public PathBasedZkSerializer getZkSerializer() { if (_zkSerializer == null) { _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); @@ -663,6 +670,10 @@ public long getConnectInitTimeout() { return _connectInitTimeout; } + public boolean isUsePersistWatcher() { + return _usePersistWatcher; + } + /** * Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig. * @return diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 143300d3b1..ffa07ebd83 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -105,10 +105,11 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout()); // Create a ZkClient - _rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), - clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(), - clientConfig.getMonitorType(), clientConfig.getMonitorKey(), - clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly()); + _rawZkClient = + new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(), + clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(), + clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly(), true, + clientConfig.isUsePersistWatcher()); } @Override @@ -159,12 +160,12 @@ public void unsubscribeStateChanges(IZkStateListener listener) { @Override public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + _rawZkClient.subscribePersistRecursiveListener(path, recursivePersistListener); } @Override public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + _rawZkClient.unsubscribePersistRecursiveListener(path, recursivePersistListener); } @Override diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index d0ddae18f4..ca4551f804 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -161,12 +161,12 @@ public void unsubscribeStateChanges( @Override public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + getZkClient(path).subscribePersistRecursiveListener(path, recursivePersistListener); } @Override public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + getZkClient(path).unsubscribePersistRecursiveListener(path, recursivePersistListener); } @Override @@ -711,9 +711,9 @@ private String updateRoutingDataOnCacheMiss(String path) throws InvalidRoutingDa private ZkClient createZkClient(String zkAddress) { LOG.debug("Creating ZkClient for realm: {}.", zkAddress); return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(), - _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, - _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(), - _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly()); + _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, _clientConfig.getMonitorType(), + _clientConfig.getMonitorKey(), _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly(), + true, _clientConfig.isUsePersistWatcher()); } private void checkClosedState() { diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java index 30335efb13..65bc2a7f0e 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java @@ -26,8 +26,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; @@ -39,7 +41,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.routing.RoutingDataManager; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.testng.Assert; @@ -648,6 +652,68 @@ public void testClose() { } } + @Test(dependsOnMethods = "testClose") + public void testFederatedZkClientWithPersistListener() throws InvalidRoutingDataException, InterruptedException { + RealmAwareZkClient realmAwareZkClient = + new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), + new RealmAwareZkClient.RealmAwareZkClientConfig().setUsePersistWatcher(true)); + int count = 100; + final AtomicInteger[] event_count = {new AtomicInteger(0)}; + final AtomicInteger[] event_count2 = {new AtomicInteger(0)}; + // for each iteration, we will edit a node, create a child, create a grand child, and + // delete child. Expect 4 event per iteration. -> total event should be count*4 + CountDownLatch countDownLatch1 = new CountDownLatch(count * 4); + CountDownLatch countDownLatch2 = new CountDownLatch(count); + realmAwareZkClient.createPersistent(TEST_VALID_PATH, true); + String path = TEST_VALID_PATH + "/testFederatedZkClientWithPersistListener"; + RecursivePersistListener rcListener = new RecursivePersistListener() { + @Override + public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception { + countDownLatch1.countDown(); + event_count[0].incrementAndGet(); + } + }; + realmAwareZkClient.create(path, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.subscribePersistRecursiveListener(path, rcListener); + for (int i = 0; i < count; ++i) { + realmAwareZkClient.writeData(path, "data7" + i, -1); + realmAwareZkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT); + realmAwareZkClient.delete(path + "/c1_" + i + "/c2"); + } + Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS)); + + // subscribe a persist child watch, it should throw exception + IZkChildListener childListener2 = new IZkChildListener() { + @Override + public void handleChildChange(String parentPath, List currentChilds) throws Exception { + countDownLatch2.countDown(); + event_count2[0].incrementAndGet(); + } + }; + try { + realmAwareZkClient.subscribeChildChanges(path, childListener2, false); + } catch (Exception ex) { + Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException"); + } + + // unsubscribe recursive persist watcher, and subscribe persist watcher should success. + realmAwareZkClient.unsubscribePersistRecursiveListener(path, rcListener); + realmAwareZkClient.subscribeChildChanges(path, childListener2, false); + // we should only get 100 event since only 100 direct child change. + for (int i = 0; i < count; ++i) { + realmAwareZkClient.writeData(path, "data7" + i, -1); + realmAwareZkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT); + realmAwareZkClient.delete(path + "/c2_" + i + "/c3"); + } + Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS)); + + realmAwareZkClient.deleteRecursively(TEST_VALID_PATH); + realmAwareZkClient.close(); + } + + @Override public void testMultiSetup() throws InvalidRoutingDataException { super.testMultiSetup();