Skip to content

Commit

Permalink
Children cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Sep 20, 2023
1 parent 0555e93 commit ef3589d
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,29 @@ public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
/**
* TrieNode class to store the children of the entries to be cached.
*/
class TrieNode {
public class TrieNode {
// A mapping between trie key and children nodes.
private Map<String, TrieNode> _children;

// the complete path/prefix leading to the current node.
private final String _path;

private final String _nodeKey;
private boolean _childrenCached;

TrieNode(String path, String nodeKey) {
public TrieNode(String path, String nodeKey) {
_path = path;
_nodeKey = nodeKey;
_children = new HashMap<>();
_childrenCached = false;
}

public boolean isChildrenCached() {
return _childrenCached;
}

public void setChildrenCached(boolean status) {
_childrenCached = status;
}

public Map<String, TrieNode> getChildren() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@

import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;

public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {

private Map<String, DataRecord> _dataCacheMap;
private ConcurrentHashMap<String, T> _dataCacheMap;
private final String _rootEntry;
private TrieNode _childrenCacheTree;
private ChildChangeListener _eventListener;
Expand All @@ -59,36 +60,141 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
_lazyCaching = cacheConfig.getLazyCaching();
_cacheData = cacheConfig.getCacheData();
_cacheChildren = cacheConfig.getCacheChildren();

if (_cacheData) {
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
_childrenCacheTree = new TrieNode(_rootEntry, _rootEntry.substring(1));
}
}

@Override
public Stat exists(String key) {
public T get(final String key) {
throw new MetaClientException("Not implemented yet.");
}

@Override
public T get(final String key) {
public List<T> get(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
}

@Override
public List<String> getDirectChildrenKeys(final String key) {
throw new MetaClientException("Not implemented yet.");
TrieNode node = getTree(key);
List<String> children;
if (node == null || !node.isChildrenCached()) {
try {
children = _cacheClient.getChildren(key);
populateChildrenCache(node, children);
return children;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
return new ArrayList<>(node.getChildren().keySet());
}

private void populateChildrenCache(TrieNode root, List<String> children) {
if (root == null) {
return;
}
for (String child : children) {
updateCache("/" + child, true);
}
root.setChildrenCached(true);
}

@Override
public int countDirectChildren(final String key) {
throw new MetaClientException("Not implemented yet.");
TrieNode node = getTree(key);
if (node == null || !node.isChildrenCached()) {
try {
List<String> children = _cacheClient.getChildren(key);
populateChildrenCache(node, children);
return _cacheClient.countChildren(key);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
return node.getChildren().size();
}

@Override
public List<T> get(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private void handleCacheUpdate(String path, ChildChangeListener.ChangeType changeType) {
switch (changeType) {
case ENTRY_CREATED:
updateCache(path, true);
break;
case ENTRY_DELETED:
updateCache(path, false);
break;
case ENTRY_DATA_CHANGE:
break;
default:
LOG.error("Unknown change type: " + changeType);
}
}

private void updateCache(String path, boolean isCreate) {
if (_cacheChildren) {
String[] pathComponents = path.split("/");
TrieNode currentNode = _childrenCacheTree;
TrieNode previousNode = null;
for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (component.equals(_childrenCacheTree.getNodeKey())) {
// Skip the root node
}
else if (!currentNode.getChildren().containsKey(component)) {
if (isCreate) {
TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component);
currentNode.addChild(component, newNode);
previousNode = currentNode;
currentNode = newNode;
} else {
return;
}
} else {
previousNode = currentNode;
currentNode = currentNode.getChildren().get(component);
}
}
if (!isCreate && previousNode != null) {
previousNode.getChildren().remove(currentNode.getNodeKey());
}
}
}

private TrieNode getTree(String path) {
String[] pathComponents = path.split("/");
TrieNode currentNode = _childrenCacheTree;
for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (!currentNode.getChildren().containsKey(component)) {
return currentNode;
} else {
currentNode = currentNode.getChildren().get(component);
}
}
return currentNode;
}

public ConcurrentHashMap<String, T> getDataCacheMap() {
return _dataCacheMap;
}

public TrieNode getChildrenCacheTree() {
return _childrenCacheTree;
}

/**
* Connect to the underlying ZkClient.
*/
@Override
public List<Stat> exists(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
public void connect() {
super.connect();
_eventListener = this::handleCacheUpdate;
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,88 @@
* under the License.
*/


import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.List;

public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String PATH = "/Cache";
private static final String DATA_PATH = "/data";
private static final String DATA_VALUE = "testData";

@Test
public void testCreateClient() {
final String key = "/TestZkMetaClientCache_testCreate";
try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) {
final String key = "/testCreate";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
// Perform some random non-read operation
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
}
}


@Test
public void testGetDirectChildrenKeys() {
final String key = "/testGetDirectChildrenKeys";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);

// Retrieve the direct children keys
List<String> children = zkMetaClientCache.getDirectChildrenKeys(key);

// Assert that the retrieved children keys match the expected ones
Assert.assertEquals(2, children.size());
Assert.assertTrue(children.contains("child1"));
Assert.assertTrue(children.contains("child2"));
}
}

@Test
public void testCountDirectChildren() {
final String key = "/testCountDirectChildren";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);

int count = zkMetaClientCache.countDirectChildren(key);

Assert.assertEquals(2, count);
}
}

@Test
public void testChildCacheUpdate() {
final String key = "/testChildCacheUpdate";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
Thread.sleep(5000); // Wait for the cache update to happen
Assert.assertEquals(zkMetaClientCache.getChildrenCacheTree().getChildren().size(), 2);
zkMetaClientCache.delete(key + "/child1");
Thread.sleep(5000); // Wait for the cache update to happen
Assert.assertEquals(zkMetaClientCache.getChildrenCacheTree().getChildren().size(), 1);

try {
//Perform some read operation - should fail.
// TODO: Remove this once implemented.
zkMetaClientCache.get(key);
Assert.fail("Should have failed with non implemented yet.");
} catch (Exception ignored) {
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

protected static ZkMetaClientCache<String> createZkMetaClientCache() {
protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true);
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}

0 comments on commit ef3589d

Please sign in to comment.