Skip to content

Commit

Permalink
Separate thread for event changes. Hasmap computeifabsent
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Sep 21, 2023
1 parent 0b47a24 commit bfeca3b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {

Expand All @@ -45,6 +47,8 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
private ZkClient _cacheClient;

private ExecutorService executor;

/**
* Constructor for ZkMetaClientCache.
* @param config ZkMetaClientConfig
Expand All @@ -69,13 +73,7 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
@Override
public T get(final String key) {
if (_cacheData) {
if (!getDataCacheMap().containsKey(key)) {
T data = _cacheClient.readData(key, true);
if (data == null) {
return null;
}
getDataCacheMap().put(key, data);
}
getDataCacheMap().computeIfAbsent(key, k -> _cacheClient.readData(k, true));
return getDataCacheMap().get(key);
}
return _cacheClient.readData(key, true);
Expand All @@ -86,13 +84,7 @@ public List<T> get(List<String> keys) {
List<T> dataList = new ArrayList<>();
if (_cacheData) {
for (String key : keys) {
if (!getDataCacheMap().containsKey(key)) {
T data = _cacheClient.readData(key, true);
if (data == null) {
continue;
}
getDataCacheMap().put(key, data);
}
getDataCacheMap().computeIfAbsent(key, k -> _cacheClient.readData(k, true));
dataList.add(getDataCacheMap().get(key));
}
}
Expand Down Expand Up @@ -152,7 +144,9 @@ public TrieNode getChildrenCacheTree() {
public void connect() {
super.connect();
_eventListener = this::handleCacheUpdate;
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
executor = Executors.newSingleThreadExecutor(); // Create a single-thread executor
executor.execute(() -> {
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String DATA_PATH = "/data";
private static final String DATA_VALUE = "testData";
Expand Down Expand Up @@ -87,6 +90,28 @@ public void testCacheDataUpdates() {
}
}

@Test
public void testBatchGet() {
final String key = "/testBatchGet";
try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);

ArrayList<String> keys = new ArrayList<>();
keys.add(key);
keys.add(key + DATA_PATH);

ArrayList<String> values = new ArrayList<>();
values.add("test");
values.add(DATA_VALUE);

// Get data for DATA_PATH and cache it
List<String> data = zkMetaClientCache.get(keys);
Assert.assertEquals(data, values);
}
}

protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
Expand Down

0 comments on commit bfeca3b

Please sign in to comment.