Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Sep 26, 2023
1 parent 276f3b1 commit 7355b82
Showing 1 changed file with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
private ZkClient _cacheClient;
private ExecutorService executor;
private final CountDownLatch latch = new CountDownLatch(1);

// TODO: Look into using conditional variable instead of latch.
private final CountDownLatch _initializedCache = new CountDownLatch(1);

/**
* Constructor for ZkMetaClientCache.
Expand Down Expand Up @@ -79,9 +81,13 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
@Override
public T get(final String key) {
if (_cacheData) {
return getDataCacheMap().get(key);
T data = getDataCacheMap().get(key);
if (data == null) {
LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key);
}
return data;
}
return _cacheClient.readData(key, true);
return super.get(key);
}

@Override
Expand All @@ -104,6 +110,7 @@ public int countDirectChildren(final String key) {
}

private void populateAllCache() {
// TODO: Concurrently populate children and data cache.
if (_cacheData) {
try {
List<String> children = _cacheClient.getChildren(_rootEntry);
Expand All @@ -118,28 +125,40 @@ private void populateAllCache() {
}
}

private void handleCacheUpdate(String path, ChildChangeListener.ChangeType changeType) {
waitForPopulateAllCache();
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
// Not implemented yet.
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
modifyDataInCache(path, false);
break;
default:
LOG.error("Unknown change type: " + changeType);
private class CacheUpdateRunnable implements Runnable {
private final String path;
private final ChildChangeListener.ChangeType changeType;

public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) {
this.path = path;
this.changeType = changeType;
}

@Override
public void run() {
waitForPopulateAllCache();
// TODO: HANDLE DEDUP EVENT CHANGES
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
// Not implemented yet.
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
modifyDataInCache(path, false);
break;
default:
LOG.error("Unknown change type: " + changeType);
}
}
}

private void waitForPopulateAllCache() {
try {
latch.await();
_initializedCache.await();
} catch (InterruptedException e) {
throw new MetaClientException("Interrupted while waiting for cache to populate.", e);
}
Expand Down Expand Up @@ -170,13 +189,14 @@ public TrieNode getChildrenCacheTree() {
@Override
public void connect() {
super.connect();
_eventListener = this::handleCacheUpdate;
_eventListener = (path, changeType) -> {
Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType);
executor.execute(cacheUpdateRunnable);
};
executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
});
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
populateAllCache();
// Notify the latch that cache is populated.
latch.countDown();
_initializedCache.countDown();
}
}

0 comments on commit 7355b82

Please sign in to comment.