Skip to content

Commit

Permalink
Children cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Oct 5, 2023
1 parent 6bd97a9 commit 1cea560
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
class TrieNode {
// A mapping between trie key and children nodes.
private Map<String, TrieNode> _children;

// the complete path/prefix leading to the current node.
private final String _path;

private final String _nodeKey;

public TrieNode(String path, String nodeKey) {
Expand All @@ -54,8 +52,39 @@ public String getNodeKey() {
return _nodeKey;
}

public void addChild(String key, TrieNode node) {
public void addChild(String key, TrieNode node) {
_children.put(key, node);
}

public TrieNode processPath(String path, boolean isCreate) {
String[] pathComponents = path.split("/");
TrieNode currentNode = this;
TrieNode previousNode = null;

for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (component.equals(_nodeKey)) {
// Skip the root node
} else if (!currentNode.getChildren().containsKey(component)) {
if (isCreate) {
TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component);
currentNode.addChild(component, newNode);
previousNode = currentNode;
currentNode = newNode;
} else {
return currentNode;
}
} else {
previousNode = currentNode;
currentNode = currentNode.getChildren().get(component);
}
}

if (!isCreate && previousNode != null) {
previousNode.getChildren().remove(currentNode.getNodeKey());
}

return currentNode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@

public class MetaClientCacheConfig {
private final String _rootEntry;
private boolean _cacheData = false;
private boolean _cacheChildren = false;
private final boolean _cacheData;
private final boolean _cacheChildren;

public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) {
public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
Expand All @@ -43,5 +43,4 @@ public boolean getCacheData() {
public boolean getCacheChildren() {
return _cacheChildren;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
_childrenCacheTree = new TrieNode(_rootEntry, null);
_childrenCacheTree = new TrieNode(_rootEntry, _rootEntry.substring(1));
}
}

Expand Down Expand Up @@ -102,14 +102,46 @@ public List<T> get(List<String> keys) {
return dataList;
}

/**
* Get the direct children for a given key.
* @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be
* a parent key,
* For metadata storage that has non-hierarchical key space (e.g. etcd), the key would
* be a prefix key.
* @return list of direct children or null if key doesn't exist / cache is not populated yet.
*/
@Override
public List<String> getDirectChildrenKeys(final String key) {
throw new MetaClientException("Not implemented yet.");
if (_cacheChildren) {
TrieNode node = _childrenCacheTree.processPath(key, true);
if (node == null) {
LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key);
return null;
}
return List.copyOf(node.getChildren().keySet());
}
return super.getDirectChildrenKeys(key);
}

/**
* Get the number of direct children for a given key.
* @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be
* a parent key,
* For metadata storage that has non-hierarchical key space (e.g. etcd), the key would
* be a prefix key.
* @return number of direct children or 0 if key doesn't exist / has no children / cache is not populated yet.
*/
@Override
public int countDirectChildren(final String key) {
throw new MetaClientException("Not implemented yet.");
if (_cacheChildren) {
TrieNode node = _childrenCacheTree.processPath(key, true);
if (node == null) {
LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key);
return 0;
}
return node.getChildren().size();
}
return super.countDirectChildren(key);
}

private void populateAllCache() {
Expand All @@ -130,7 +162,13 @@ private void populateAllCache() {
T dataRecord = _cacheClient.readData(node, true);
_dataCacheMap.put(node, dataRecord);
}
queue.addAll(_cacheClient.getChildren(node));
if (_cacheChildren) {
_childrenCacheTree.processPath(node, true);
}
List<String> childNodes = _cacheClient.getChildren(node);
for (String child : childNodes) {
queue.add(node + "/" + child); // Add child nodes to the queue with their full path.
}
}
// Let the other threads know that the cache is populated.
_initializedCache.countDown();
Expand All @@ -151,11 +189,11 @@ public void run() {
// TODO: HANDLE DEDUP EVENT CHANGES
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
_childrenCacheTree.processPath(path, true);
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
// Not implemented yet.
_childrenCacheTree.processPath(path, false);
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
Expand Down Expand Up @@ -190,9 +228,6 @@ public ConcurrentHashMap<String, T> getDataCacheMap() {
return _dataCacheMap;
}

public TrieNode getChildrenCacheTree() {
return _childrenCacheTree;
}

/**
* Connect to the underlying ZkClient.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
* under the License.
*/

import org.apache.helix.metaclient.MetaClientTestUtil;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.*;

public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String DATA_PATH = "/data";
Expand All @@ -48,32 +48,55 @@ public void testCacheDataUpdates() {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

// Get data for DATA_PATH and cache it
String data = zkMetaClientCache.get(key + DATA_PATH);
Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
Assert.assertTrue(MetaClientTestUtil.verify(() ->
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), data)), MetaClientTestUtil.WAIT_DURATION));

// Update data for DATA_PATH
String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1");

// Verify that cached data is updated. Might take some time
for (int i = 0; i < 10; i++) {
if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
Assert.assertTrue(MetaClientTestUtil.verify(() ->
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), newData)), MetaClientTestUtil.WAIT_DURATION));

zkMetaClientCache.delete(key + DATA_PATH);
// Verify that cached data is updated. Might take some time
for (int i = 0; i < 10; i++) {
if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) {
break;
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Assert.assertTrue(MetaClientTestUtil.verify(() ->
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), null)), MetaClientTestUtil.WAIT_DURATION));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testGetDirectChildrenKeys() {
final String key = "/testGetDirectChildrenKeys";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);

Assert.assertTrue(MetaClientTestUtil.verify(() ->
(zkMetaClientCache.getDirectChildrenKeys(key).size() == 2), MetaClientTestUtil.WAIT_DURATION));

Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1"));
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testCountDirectChildren() {
final String key = "/testCountDirectChildren";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.countDirectChildren(key) == 2), MetaClientTestUtil.WAIT_DURATION));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -94,25 +117,68 @@ public void testBatchGet() {
values.add("test");
values.add(DATA_VALUE);

for (int i = 0; i < 10; i++) {
// Get data for DATA_PATH and cache it
List<String> data = zkMetaClientCache.get(keys);
if (data.equals(values)) {
break;
Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testLargeClusterLoading() {
final String key = "/testLargerNodes";
try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
zkMetaClient.connect();

int numLayers = 4;
int numNodesPerLayer = 20;

// Create the root node
zkMetaClient.create(key, "test");

Queue<String> queue = new LinkedList<>();
queue.offer(key);

for (int layer = 1; layer <= numLayers; layer++) {
int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size() * numNodesPerLayer);

for (int i = 0; i < nodesAtThisLayer; i++) {
String parentKey = queue.poll();
for (int j = 0; j < numNodesPerLayer; j++) {
String newNodeKey = parentKey + "/node" + j;
zkMetaClient.create(newNodeKey, "test");
queue.offer(newNodeKey);
}
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);

try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();

// Assert Checks on a Random Path
Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(key + "/node4/node1").equals("test")), MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer), MetaClientTestUtil.WAIT_DURATION));
String newData = zkMetaClientCache.update(key + "/node4/node1", currentData -> currentData + "1");
Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(key + "/node4/node1").equals(newData)), MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(MetaClientTestUtil.verify(() ->
(zkMetaClientCache.getDirectChildrenKeys(key + "/node4/node1")
.equals(zkMetaClient.getDirectChildrenKeys(key + "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION));

zkMetaClientCache.delete(key + "/node4/node1");
Assert.assertTrue(MetaClientTestUtil.verify(() -> (Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)), MetaClientTestUtil.WAIT_DURATION));

} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {

public ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}

0 comments on commit 1cea560

Please sign in to comment.