From 1cea5600b605ffba9e329edfec2e1b065783935d Mon Sep 17 00:00:00 2001 From: mapeng Date: Thu, 5 Oct 2023 15:29:14 -0700 Subject: [PATCH] Children cache implementation --- .../api/MetaClientCacheInterface.java | 35 ++++- .../factories/MetaClientCacheConfig.java | 7 +- .../metaclient/impl/zk/ZkMetaClientCache.java | 53 ++++++-- .../impl/zk/TestZkMetaClientCache.java | 124 ++++++++++++++---- 4 files changed, 174 insertions(+), 45 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java index 6449970bb8..3630e1c397 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java @@ -30,10 +30,8 @@ public interface MetaClientCacheInterface extends MetaClientInterface { class TrieNode { // A mapping between trie key and children nodes. private Map _children; - // the complete path/prefix leading to the current node. private final String _path; - private final String _nodeKey; public TrieNode(String path, String nodeKey) { @@ -54,8 +52,39 @@ public String getNodeKey() { return _nodeKey; } - public void addChild(String key, TrieNode node) { + public void addChild(String key, TrieNode node) { _children.put(key, node); } + + public TrieNode processPath(String path, boolean isCreate) { + String[] pathComponents = path.split("/"); + TrieNode currentNode = this; + TrieNode previousNode = null; + + for (int i = 1; i < pathComponents.length; i++) { + String component = pathComponents[i]; + if (component.equals(_nodeKey)) { + // Skip the root node + } else if (!currentNode.getChildren().containsKey(component)) { + if (isCreate) { + TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component); + currentNode.addChild(component, newNode); + previousNode = currentNode; + currentNode = newNode; + } else { + return currentNode; + } + } else { + previousNode = currentNode; + currentNode = currentNode.getChildren().get(component); + } + } + + if (!isCreate && previousNode != null) { + previousNode.getChildren().remove(currentNode.getNodeKey()); + } + + return currentNode; + } } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java index 07972945a2..2c23a0a686 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java @@ -23,10 +23,10 @@ public class MetaClientCacheConfig { private final String _rootEntry; - private boolean _cacheData = false; - private boolean _cacheChildren = false; + private final boolean _cacheData; + private final boolean _cacheChildren; - public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) { + public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren) { _rootEntry = rootEntry; _cacheData = cacheData; _cacheChildren = cacheChildren; @@ -43,5 +43,4 @@ public boolean getCacheData() { public boolean getCacheChildren() { return _cacheChildren; } - } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java index 66eb6f91e0..45701063c5 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java @@ -70,7 +70,7 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC _dataCacheMap = new ConcurrentHashMap<>(); } if (_cacheChildren) { - _childrenCacheTree = new TrieNode(_rootEntry, null); + _childrenCacheTree = new TrieNode(_rootEntry, _rootEntry.substring(1)); } } @@ -102,14 +102,46 @@ public List get(List keys) { return dataList; } + /** + * Get the direct children for a given key. + * @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be + * a parent key, + * For metadata storage that has non-hierarchical key space (e.g. etcd), the key would + * be a prefix key. + * @return list of direct children or null if key doesn't exist / cache is not populated yet. + */ @Override public List getDirectChildrenKeys(final String key) { - throw new MetaClientException("Not implemented yet."); + if (_cacheChildren) { + TrieNode node = _childrenCacheTree.processPath(key, true); + if (node == null) { + LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key); + return null; + } + return List.copyOf(node.getChildren().keySet()); + } + return super.getDirectChildrenKeys(key); } + /** + * Get the number of direct children for a given key. + * @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be + * a parent key, + * For metadata storage that has non-hierarchical key space (e.g. etcd), the key would + * be a prefix key. + * @return number of direct children or 0 if key doesn't exist / has no children / cache is not populated yet. + */ @Override public int countDirectChildren(final String key) { - throw new MetaClientException("Not implemented yet."); + if (_cacheChildren) { + TrieNode node = _childrenCacheTree.processPath(key, true); + if (node == null) { + LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key); + return 0; + } + return node.getChildren().size(); + } + return super.countDirectChildren(key); } private void populateAllCache() { @@ -130,7 +162,13 @@ private void populateAllCache() { T dataRecord = _cacheClient.readData(node, true); _dataCacheMap.put(node, dataRecord); } - queue.addAll(_cacheClient.getChildren(node)); + if (_cacheChildren) { + _childrenCacheTree.processPath(node, true); + } + List childNodes = _cacheClient.getChildren(node); + for (String child : childNodes) { + queue.add(node + "/" + child); // Add child nodes to the queue with their full path. + } } // Let the other threads know that the cache is populated. _initializedCache.countDown(); @@ -151,11 +189,11 @@ public void run() { // TODO: HANDLE DEDUP EVENT CHANGES switch (changeType) { case ENTRY_CREATED: - // Not implemented yet. + _childrenCacheTree.processPath(path, true); modifyDataInCache(path, false); break; case ENTRY_DELETED: - // Not implemented yet. + _childrenCacheTree.processPath(path, false); modifyDataInCache(path, true); break; case ENTRY_DATA_CHANGE: @@ -190,9 +228,6 @@ public ConcurrentHashMap getDataCacheMap() { return _dataCacheMap; } - public TrieNode getChildrenCacheTree() { - return _childrenCacheTree; - } /** * Connect to the underlying ZkClient. diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java index 30a0a729d0..2950a652e6 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java @@ -19,13 +19,13 @@ * under the License. */ +import org.apache.helix.metaclient.MetaClientTestUtil; import org.apache.helix.metaclient.factories.MetaClientCacheConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.List; +import java.util.*; public class TestZkMetaClientCache extends ZkMetaClientTestBase { private static final String DATA_PATH = "/data"; @@ -48,32 +48,55 @@ public void testCacheDataUpdates() { zkMetaClientCache.connect(); zkMetaClientCache.create(key, "test"); zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); - // Get data for DATA_PATH and cache it String data = zkMetaClientCache.get(key + DATA_PATH); Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> + (Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), data)), MetaClientTestUtil.WAIT_DURATION)); // Update data for DATA_PATH String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1"); - // Verify that cached data is updated. Might take some time - for (int i = 0; i < 10; i++) { - if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) { - break; - } - Thread.sleep(1000); - } - Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> + (Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), newData)), MetaClientTestUtil.WAIT_DURATION)); zkMetaClientCache.delete(key + DATA_PATH); - // Verify that cached data is updated. Might take some time - for (int i = 0; i < 10; i++) { - if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) { - break; - } - Thread.sleep(1000); - } - } catch (InterruptedException e) { + Assert.assertTrue(MetaClientTestUtil.verify(() -> + (Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), null)), MetaClientTestUtil.WAIT_DURATION)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testGetDirectChildrenKeys() { + final String key = "/testGetDirectChildrenKeys"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> + (zkMetaClientCache.getDirectChildrenKeys(key).size() == 2), MetaClientTestUtil.WAIT_DURATION)); + + Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1")); + Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testCountDirectChildren() { + final String key = "/testCountDirectChildren"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE); + Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.countDirectChildren(key) == 2), MetaClientTestUtil.WAIT_DURATION)); + } catch (Exception e) { throw new RuntimeException(e); } } @@ -94,25 +117,68 @@ public void testBatchGet() { values.add("test"); values.add(DATA_VALUE); - for (int i = 0; i < 10; i++) { - // Get data for DATA_PATH and cache it - List data = zkMetaClientCache.get(keys); - if (data.equals(values)) { - break; + Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testLargeClusterLoading() { + final String key = "/testLargerNodes"; + try (ZkMetaClient zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + + int numLayers = 4; + int numNodesPerLayer = 20; + + // Create the root node + zkMetaClient.create(key, "test"); + + Queue queue = new LinkedList<>(); + queue.offer(key); + + for (int layer = 1; layer <= numLayers; layer++) { + int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size() * numNodesPerLayer); + + for (int i = 0; i < nodesAtThisLayer; i++) { + String parentKey = queue.poll(); + for (int j = 0; j < numNodesPerLayer; j++) { + String newNodeKey = parentKey + "/node" + j; + zkMetaClient.create(newNodeKey, "test"); + queue.offer(newNodeKey); + } } - Thread.sleep(1000); } - } catch (InterruptedException e) { - throw new RuntimeException(e); + + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + + // Assert Checks on a Random Path + Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(key + "/node4/node1").equals("test")), MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer), MetaClientTestUtil.WAIT_DURATION)); + String newData = zkMetaClientCache.update(key + "/node4/node1", currentData -> currentData + "1"); + Assert.assertTrue(MetaClientTestUtil.verify(() -> ( zkMetaClientCache.get(key + "/node4/node1").equals(newData)), MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> + (zkMetaClientCache.getDirectChildrenKeys(key + "/node4/node1") + .equals(zkMetaClient.getDirectChildrenKeys(key + "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION)); + + zkMetaClientCache.delete(key + "/node4/node1"); + Assert.assertTrue(MetaClientTestUtil.verify(() -> (Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)), MetaClientTestUtil.WAIT_DURATION)); + + } catch (Exception e) { + throw new RuntimeException(e); + } } } - protected static ZkMetaClientCache createZkMetaClientCacheLazyCaching(String rootPath) { + + public ZkMetaClientCache createZkMetaClientCacheLazyCaching(String rootPath) { ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) //.setZkSerializer(new TestStringSerializer()) .build(); - MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true); + MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true); return new ZkMetaClientCache<>(config, cacheConfig); } }