diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java index b73fe50dbf..1d36b3e3ea 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java @@ -47,7 +47,9 @@ public class ZkMetaClientCache extends ZkMetaClient implements MetaClientC private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class); private ZkClient _cacheClient; private ExecutorService executor; - private final CountDownLatch latch = new CountDownLatch(1); + + // TODO: Look into using conditional variable instead of latch. + private final CountDownLatch _initializedCache = new CountDownLatch(1); /** * Constructor for ZkMetaClientCache. @@ -79,9 +81,13 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC @Override public T get(final String key) { if (_cacheData) { - return getDataCacheMap().get(key); + T data = getDataCacheMap().get(key); + if (data == null) { + LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key); + } + return data; } - return _cacheClient.readData(key, true); + return super.get(key); } @Override @@ -104,6 +110,7 @@ public int countDirectChildren(final String key) { } private void populateAllCache() { + // TODO: Concurrently populate children and data cache. if (_cacheData) { try { List children = _cacheClient.getChildren(_rootEntry); @@ -118,28 +125,40 @@ private void populateAllCache() { } } - private void handleCacheUpdate(String path, ChildChangeListener.ChangeType changeType) { - waitForPopulateAllCache(); - switch (changeType) { - case ENTRY_CREATED: - // Not implemented yet. - modifyDataInCache(path, false); - break; - case ENTRY_DELETED: - // Not implemented yet. - modifyDataInCache(path, true); - break; - case ENTRY_DATA_CHANGE: - modifyDataInCache(path, false); - break; - default: - LOG.error("Unknown change type: " + changeType); + private class CacheUpdateRunnable implements Runnable { + private final String path; + private final ChildChangeListener.ChangeType changeType; + + public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) { + this.path = path; + this.changeType = changeType; + } + + @Override + public void run() { + waitForPopulateAllCache(); + // TODO: HANDLE DEDUP EVENT CHANGES + switch (changeType) { + case ENTRY_CREATED: + // Not implemented yet. + modifyDataInCache(path, false); + break; + case ENTRY_DELETED: + // Not implemented yet. + modifyDataInCache(path, true); + break; + case ENTRY_DATA_CHANGE: + modifyDataInCache(path, false); + break; + default: + LOG.error("Unknown change type: " + changeType); + } } } private void waitForPopulateAllCache() { try { - latch.await(); + _initializedCache.await(); } catch (InterruptedException e) { throw new MetaClientException("Interrupted while waiting for cache to populate.", e); } @@ -170,13 +189,14 @@ public TrieNode getChildrenCacheTree() { @Override public void connect() { super.connect(); - _eventListener = this::handleCacheUpdate; + _eventListener = (path, changeType) -> { + Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType); + executor.execute(cacheUpdateRunnable); + }; executor = Executors.newSingleThreadExecutor(); - executor.execute(() -> { - _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener)); - }); + _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener)); populateAllCache(); // Notify the latch that cache is populated. - latch.countDown(); + _initializedCache.countDown(); } }