Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lattice Cache - Caching Just Data Implementation #2619

Merged
merged 6 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TrieNode {

private final String _nodeKey;

TrieNode(String path, String nodeKey) {
public TrieNode(String path, String nodeKey) {
_path = path;
_nodeKey = nodeKey;
_children = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ public class MetaClientCacheConfig {
private final String _rootEntry;
private boolean _cacheData = false;
private boolean _cacheChildren = false;
private boolean _lazyCaching = true;

public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
_lazyCaching = lazyCaching;
}

public String getRootEntry() {
Expand All @@ -46,7 +44,4 @@ public boolean getCacheChildren() {
return _cacheChildren;
}

public boolean getLazyCaching() {
return _lazyCaching;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}
Expand All @@ -56,14 +50,17 @@ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, Meta
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig);
return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
}
return null;
}

private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,35 @@

import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {

private Map<String, DataRecord> _dataCacheMap;
private ConcurrentHashMap<String, T> _dataCacheMap;
private final String _rootEntry;
private TrieNode _childrenCacheTree;
private ChildChangeListener _eventListener;
private boolean _cacheData;
private boolean _cacheChildren;
private boolean _lazyCaching;
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
private ZkClient _cacheClient;
private ExecutorService executor;

// TODO: Look into using conditional variable instead of latch.
private final CountDownLatch _initializedCache = new CountDownLatch(1);

/**
* Constructor for ZkMetaClientCache.
Expand All @@ -56,19 +60,43 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
super(config);
_cacheClient = getZkClient();
_rootEntry = cacheConfig.getRootEntry();
_lazyCaching = cacheConfig.getLazyCaching();
_cacheData = cacheConfig.getCacheData();
_cacheChildren = cacheConfig.getCacheChildren();

if (_cacheData) {
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
_childrenCacheTree = new TrieNode(_rootEntry, null);
}
}

/**
* Get data for a given key.
* If datacache is enabled, will fetch for cache. If it doesn't exist
* returns null (for when initial populating cache is in progress).
* @param key key to identify the entry
* @return data for the key
*/
@Override
public Stat exists(String key) {
throw new MetaClientException("Not implemented yet.");
public T get(final String key) {
if (_cacheData) {
T data = getDataCacheMap().get(key);
if (data == null) {
LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key);
}
return data;
}
return super.get(key);
}

@Override
public T get(final String key) {
throw new MetaClientException("Not implemented yet.");
public List<T> get(List<String> keys) {
List<T> dataList = new ArrayList<>();
for (String key : keys) {
dataList.add(get(key));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

}
return dataList;
}

@Override
Expand All @@ -81,14 +109,94 @@ public int countDirectChildren(final String key) {
throw new MetaClientException("Not implemented yet.");
}

@Override
public List<T> get(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private void populateAllCache() {
// TODO: Concurrently populate children and data cache.
if (_cacheData) {
Marcosrico marked this conversation as resolved.
Show resolved Hide resolved
try {
List<String> children = _cacheClient.getChildren(_rootEntry);
for (String child : children) {
String childPath = _rootEntry + "/" + child;
T dataRecord = _cacheClient.readData(childPath, true);
xyuanlu marked this conversation as resolved.
Show resolved Hide resolved
getDataCacheMap().put(childPath, dataRecord);
}
} catch (Exception e) {
// Ignore
}
}
}

@Override
public List<Stat> exists(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private class CacheUpdateRunnable implements Runnable {
private final String path;
private final ChildChangeListener.ChangeType changeType;

public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add todo for dedup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added todo a little lower :)

this.path = path;
this.changeType = changeType;
}

@Override
public void run() {
waitForPopulateAllCache();
// TODO: HANDLE DEDUP EVENT CHANGES
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
// Not implemented yet.
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
modifyDataInCache(path, false);
break;
default:
LOG.error("Unknown change type: " + changeType);
}
}
}

private void waitForPopulateAllCache() {
try {
_initializedCache.await();
} catch (InterruptedException e) {
throw new MetaClientException("Interrupted while waiting for cache to populate.", e);
}
}

private void modifyDataInCache(String path, Boolean isDelete) {
if (_cacheData) {
if (isDelete) {
getDataCacheMap().remove(path);
} else {
T dataRecord = _cacheClient.readData(path, true);
getDataCacheMap().put(path, dataRecord);
}
}
}

public ConcurrentHashMap<String, T> getDataCacheMap() {
return _dataCacheMap;
}

public TrieNode getChildrenCacheTree() {
return _childrenCacheTree;
}

/**
* Connect to the underlying ZkClient.
*/
@Override
public void connect() {
super.connect();
_eventListener = (path, changeType) -> {
Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType);
executor.execute(cacheUpdateRunnable);
};
executor = Executors.newSingleThreadExecutor();
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
populateAllCache();
// Notify the latch that cache is populated.
_initializedCache.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,100 @@
* under the License.
*/


import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String PATH = "/Cache";
private static final String DATA_PATH = "/data";
private static final String DATA_VALUE = "testData";

@Test
public void testCreateClient() {
final String key = "/TestZkMetaClientCache_testCreate";
try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) {
final String key = "/testCreate";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
// Perform some random non-read operation
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
}
}

@Test
public void testCacheDataUpdates() {
final String key = "/testCacheDataUpdates";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

// Get data for DATA_PATH and cache it
String data = zkMetaClientCache.get(key + DATA_PATH);
Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));

// Update data for DATA_PATH
String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1");

// Verify that cached data is updated. Might take some time
for (int i = 0; i < 10; i++) {
if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));

zkMetaClientCache.delete(key + DATA_PATH);
// Verify that cached data is updated. Might take some time
for (int i = 0; i < 10; i++) {
if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) {
break;
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Test
public void testBatchGet() {
final String key = "/testBatchGet";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

ArrayList<String> keys = new ArrayList<>();
keys.add(key);
keys.add(key + DATA_PATH);

ArrayList<String> values = new ArrayList<>();
values.add("test");
values.add(DATA_VALUE);

try {
//Perform some read operation - should fail.
// TODO: Remove this once implemented.
zkMetaClientCache.get(key);
Assert.fail("Should have failed with non implemented yet.");
} catch (Exception ignored) {
for (int i = 0; i < 10; i++) {
// Get data for DATA_PATH and cache it
List<String> data = zkMetaClientCache.get(keys);
if (data.equals(values)) {
break;
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

protected static ZkMetaClientCache<String> createZkMetaClientCache() {
protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true);
MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}