From e017b0509d61f5d092f94efe841160d64f1af02c Mon Sep 17 00:00:00 2001 From: mapeng Date: Mon, 18 Sep 2023 21:58:19 -0700 Subject: [PATCH] Data cache implementation --- .../factories/MetaClientFactory.java | 25 +++-- .../metaclient/impl/zk/ZkMetaClientCache.java | 94 ++++++++++++++++--- .../impl/zk/TestZkMetaClientCache.java | 65 ++++++++++--- 3 files changed, 143 insertions(+), 41 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java index ebb4549dae..7cc86a8a98 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java @@ -40,13 +40,7 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) { throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); + return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config)); } return null; } @@ -56,14 +50,17 @@ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, Meta throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig); + return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig); } return null; } + + private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) { + return new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). + setConnectionAddress(config.getConnectionAddress()) + .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) + .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) + .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) + .build(); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java index af1c9d7915..c300a01f8f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java @@ -21,23 +21,21 @@ import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.MetaClientCacheInterface; -import org.apache.helix.metaclient.datamodel.DataRecord; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.factories.MetaClientCacheConfig; -import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; -import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; -import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer; import org.apache.helix.zookeeper.zkclient.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class ZkMetaClientCache extends ZkMetaClient implements MetaClientCacheInterface { - private Map _dataCacheMap; + private ConcurrentHashMap _dataCacheMap; private final String _rootEntry; private TrieNode _childrenCacheTree; private ChildChangeListener _eventListener; @@ -59,16 +57,51 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC _lazyCaching = cacheConfig.getLazyCaching(); _cacheData = cacheConfig.getCacheData(); _cacheChildren = cacheConfig.getCacheChildren(); + + if (_cacheData) { + _dataCacheMap = new ConcurrentHashMap<>(); + } + if (_cacheChildren) { + _childrenCacheTree = new TrieNode(_rootEntry, null); + } } @Override - public Stat exists(String key) { - throw new MetaClientException("Not implemented yet."); + public T get(final String key) { + if (_cacheData) { + if (!getDataCacheMap().containsKey(key)) { + T data = _cacheClient.readData(key, true); + if (data == null) { + return null; + } + getDataCacheMap().put(key, data); + } + return getDataCacheMap().get(key); + } + return _cacheClient.readData(key, true); } @Override - public T get(final String key) { - throw new MetaClientException("Not implemented yet."); + public List get(List keys) { + List dataList = new ArrayList<>(); + if (_cacheData) { + for (String key : keys) { + if (!getDataCacheMap().containsKey(key)) { + T data = _cacheClient.readData(key, true); + if (data == null) { + continue; + } + getDataCacheMap().put(key, data); + } + dataList.add(getDataCacheMap().get(key)); + } + } + else { + for (String key : keys) { + dataList.add(_cacheClient.readData(key, true)); + } + } + return dataList; } @Override @@ -81,14 +114,45 @@ public int countDirectChildren(final String key) { throw new MetaClientException("Not implemented yet."); } - @Override - public List get(List keys) { - throw new MetaClientException("Not implemented yet."); + private void handleCacheUpdate(String path, ChildChangeListener.ChangeType changeType) { + switch (changeType) { + case ENTRY_CREATED: + // Not implemented yet. + break; + case ENTRY_DELETED: + // Not implemented yet. + break; + case ENTRY_DATA_CHANGE: + modifyDataInCache(path); + break; + default: + LOG.error("Unknown change type: " + changeType); + } } + private void modifyDataInCache(String path) { + if (_cacheData) { + T dataRecord = _cacheClient.readData(path, true); + getDataCacheMap().put(path, dataRecord); + } + } + + public ConcurrentHashMap getDataCacheMap() { + return _dataCacheMap; + } + + public TrieNode getChildrenCacheTree() { + return _childrenCacheTree; + } + + /** + * Connect to the underlying ZkClient. + */ @Override - public List exists(List keys) { - throw new MetaClientException("Not implemented yet."); + public void connect() { + super.connect(); + _eventListener = this::handleCacheUpdate; + _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener)); } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java index a3a5b4eee3..6deb487f14 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java @@ -19,39 +19,80 @@ * under the License. */ - import org.apache.helix.metaclient.factories.MetaClientCacheConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.testng.Assert; import org.testng.annotations.Test; public class TestZkMetaClientCache extends ZkMetaClientTestBase { - private static final String PATH = "/Cache"; + private static final String DATA_PATH = "/data"; + private static final String DATA_VALUE = "testData"; @Test public void testCreateClient() { - final String key = "/TestZkMetaClientCache_testCreate"; - try (ZkMetaClient zkMetaClientCache = createZkMetaClientCache()) { + final String key = "/testCreate"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { zkMetaClientCache.connect(); // Perform some random non-read operation zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + } + } + + @Test + public void testLazyDataCacheAndFetch() { + final String key = "/testLazyDataCacheAndFetch"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, "test"); + + // Verify that data is not cached initially + Assert.assertFalse(zkMetaClientCache.getDataCacheMap().containsKey(key + DATA_PATH)); + + zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); + + // Get data for DATA_PATH (should trigger lazy loading) + String data = zkMetaClientCache.get(key + DATA_PATH); + + // Verify that data is now cached + Assert.assertTrue(zkMetaClientCache.getDataCacheMap().containsKey(key + DATA_PATH)); + Assert.assertEquals(DATA_VALUE, data); + } + } + + @Test + public void testCacheDataUpdates() { + final String key = "/testCacheDataUpdates"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, "test"); + zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); + + // Get data for DATA_PATH and cache it + String data = zkMetaClientCache.get(key + DATA_PATH); + Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + + // Update data for DATA_PATH + String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1"); - try { - //Perform some read operation - should fail. - // TODO: Remove this once implemented. - zkMetaClientCache.get(key); - Assert.fail("Should have failed with non implemented yet."); - } catch (Exception ignored) { + // Verify that cached data is updated. Might take some time + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) { + break; + } + Thread.sleep(1000); } + Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } - protected static ZkMetaClientCache createZkMetaClientCache() { + protected static ZkMetaClientCache createZkMetaClientCacheLazyCaching(String rootPath) { ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) //.setZkSerializer(new TestStringSerializer()) .build(); - MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true); + MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true); return new ZkMetaClientCache<>(config, cacheConfig); } }