Skip to content

Commit

Permalink
Upgrade NodeAnnouncer to use CuratorCache
Browse files Browse the repository at this point in the history
  • Loading branch information
GWphua committed Nov 27, 2024
1 parent 377e6f9 commit 648e740
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.druid.curator.announcement;

import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -38,7 +38,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,9 +60,6 @@ public class NodeAnnouncer
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new ConcurrentHashMap<>();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();

// Used for testing
private Set<String> addedChildren;

private boolean started = false;

public NodeAnnouncer(
Expand All @@ -73,18 +69,6 @@ public NodeAnnouncer(
this.curator = curator;
}

@VisibleForTesting
void initializeAddedChildren()
{
addedChildren = new HashSet<>();
}

@VisibleForTesting
Set<String> getAddedChildren()
{
return addedChildren;
}

@LifecycleStart
public void start()
{
Expand Down Expand Up @@ -210,11 +194,10 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated)

final CuratorCache cache = CuratorCache.builder(curator, parentPath)
.withOptions(
CuratorCache.Options.COMPRESSED_DATA,
CuratorCache.Options.SINGLE_NODE_CACHE
CuratorCache.Options.COMPRESSED_DATA
)
.withStorage(
CuratorCacheStorage.dataNotCached()
CuratorCacheStorage.standard()
)
.build();

Expand All @@ -238,13 +221,10 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
}
}
break;
case NODE_CREATED:
if (addedChildren != null) {
final String pathOfCreatedNode = newData.getPath();
addedChildren.add(pathOfCreatedNode);
}
// fall through
case NODE_CREATED:
case NODE_CHANGED:
default:
// do nothing
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void testSanity() throws Exception
curator.start();
curator.blockUntilConnected();
NodeAnnouncer announcer = new NodeAnnouncer(curator);
announcer.initializeAddedChildren();

final byte[] billy = StringUtils.toUtf8("billy");
final String testPath1 = "/test1";
Expand All @@ -202,9 +201,6 @@ public void testSanity() throws Exception
Assert.assertNull("/somewhere/test2 does not exists", curator.checkExists().forPath(testPath2));

announcer.start();
while (!announcer.getAddedChildren().contains("/test1")) {
Thread.sleep(100);
}

try {
Assert.assertArrayEquals("/test1 has data", billy, curator.getData().decompressed().forPath(testPath1));
Expand Down Expand Up @@ -269,15 +265,15 @@ public void testSessionKilled() throws Exception
curator.blockUntilConnected();
NodeAnnouncer announcer = new NodeAnnouncer(curator);
try {
curator.inTransaction().create().forPath("/somewhere").and().commit();
curator.transaction().forOperations(curator.transactionOp().create().forPath("/somewhere"));
announcer.start();

final byte[] billy = StringUtils.toUtf8("billy");
final String testPath1 = "/test1";
final String testPath2 = "/somewhere/test2";
final Set<String> paths = Sets.newHashSet(testPath1, testPath2);
announcer.announce(testPath1, billy);
announcer.announce(testPath2, billy);
final Set< String> paths = Sets.newHashSet(testPath1, testPath2);
awaitAnnounce(announcer, testPath1, billy, true);
awaitAnnounce(announcer, testPath2, billy, true);

Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2));
Expand Down

0 comments on commit 648e740

Please sign in to comment.