Skip to content

Commit

Permalink
Data cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Sep 19, 2023
1 parent 0555e93 commit e017b05
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}
Expand All @@ -56,14 +50,17 @@ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, Meta
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig);
return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
}
return null;
}

private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@

import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {

private Map<String, DataRecord> _dataCacheMap;
private ConcurrentHashMap<String, T> _dataCacheMap;
private final String _rootEntry;
private TrieNode _childrenCacheTree;
private ChildChangeListener _eventListener;
Expand All @@ -59,16 +57,51 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
_lazyCaching = cacheConfig.getLazyCaching();
_cacheData = cacheConfig.getCacheData();
_cacheChildren = cacheConfig.getCacheChildren();

if (_cacheData) {
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
_childrenCacheTree = new TrieNode(_rootEntry, null);
}
}

@Override
public Stat exists(String key) {
throw new MetaClientException("Not implemented yet.");
public T get(final String key) {
if (_cacheData) {
if (!getDataCacheMap().containsKey(key)) {
T data = _cacheClient.readData(key, true);
if (data == null) {
return null;
}
getDataCacheMap().put(key, data);
}
return getDataCacheMap().get(key);
}
return _cacheClient.readData(key, true);
}

@Override
public T get(final String key) {
throw new MetaClientException("Not implemented yet.");
public List<T> get(List<String> keys) {
List<T> dataList = new ArrayList<>();
if (_cacheData) {
for (String key : keys) {
if (!getDataCacheMap().containsKey(key)) {
T data = _cacheClient.readData(key, true);
if (data == null) {
continue;
}
getDataCacheMap().put(key, data);
}
dataList.add(getDataCacheMap().get(key));
}
}
else {
for (String key : keys) {
dataList.add(_cacheClient.readData(key, true));
}
}
return dataList;
}

@Override
Expand All @@ -81,14 +114,45 @@ public int countDirectChildren(final String key) {
throw new MetaClientException("Not implemented yet.");
}

@Override
public List<T> get(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private void handleCacheUpdate(String path, ChildChangeListener.ChangeType changeType) {
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
break;
case ENTRY_DELETED:
// Not implemented yet.
break;
case ENTRY_DATA_CHANGE:
modifyDataInCache(path);
break;
default:
LOG.error("Unknown change type: " + changeType);
}
}

private void modifyDataInCache(String path) {
if (_cacheData) {
T dataRecord = _cacheClient.readData(path, true);
getDataCacheMap().put(path, dataRecord);
}
}

public ConcurrentHashMap<String, T> getDataCacheMap() {
return _dataCacheMap;
}

public TrieNode getChildrenCacheTree() {
return _childrenCacheTree;
}

/**
* Connect to the underlying ZkClient.
*/
@Override
public List<Stat> exists(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
public void connect() {
super.connect();
_eventListener = this::handleCacheUpdate;
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,80 @@
* under the License.
*/


import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String PATH = "/Cache";
private static final String DATA_PATH = "/data";
private static final String DATA_VALUE = "testData";

@Test
public void testCreateClient() {
final String key = "/TestZkMetaClientCache_testCreate";
try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) {
final String key = "/testCreate";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
// Perform some random non-read operation
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
}
}

@Test
public void testLazyDataCacheAndFetch() {
final String key = "/testLazyDataCacheAndFetch";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");

// Verify that data is not cached initially
Assert.assertFalse(zkMetaClientCache.getDataCacheMap().containsKey(key + DATA_PATH));

zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

// Get data for DATA_PATH (should trigger lazy loading)
String data = zkMetaClientCache.get(key + DATA_PATH);

// Verify that data is now cached
Assert.assertTrue(zkMetaClientCache.getDataCacheMap().containsKey(key + DATA_PATH));
Assert.assertEquals(DATA_VALUE, data);
}
}

@Test
public void testCacheDataUpdates() {
final String key = "/testCacheDataUpdates";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

// Get data for DATA_PATH and cache it
String data = zkMetaClientCache.get(key + DATA_PATH);
Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));

// Update data for DATA_PATH
String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1");

try {
//Perform some read operation - should fail.
// TODO: Remove this once implemented.
zkMetaClientCache.get(key);
Assert.fail("Should have failed with non implemented yet.");
} catch (Exception ignored) {
// Verify that cached data is updated. Might take some time
for (int i = 0; i < 10; i++) {
if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

protected static ZkMetaClientCache<String> createZkMetaClientCache() {
protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true);
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}

0 comments on commit e017b05

Please sign in to comment.