Skip to content

Commit

Permalink
Simplify code for Zookeeper getChildren
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Nov 30, 2020
1 parent 956968e commit c400bc1
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private void updateOffsetsForNextBatch() {

private void batchGetChildren() throws KeeperException, InterruptedException {
List<String> childrenBatch =
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, null, nextPage);
childrenQueue.addAll(childrenBatch);
updateOffsetsForNextBatch();
}
Expand Down
99 changes: 48 additions & 51 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2754,7 +2754,10 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
* Returns the a subset (a "page") of the children node of the given path.
*
* <p>
* The returned list contains up to {@code maxReturned} ordered by czxid.
* The returned list contains children names up to {@code maxReturned}. The max number of returned children
* is limited by {@code jute.maxbuffer}. If the packet length of {@code maxReturned} children exceeds the limit,
* there will be less children returned, within the limit range. {@code outputNextPage} can be used in the next
* page to fetch the remaining children.
* </p>
*
* <p>
Expand All @@ -2771,10 +2774,14 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
* - only return children whose creation zxid is equal or greater than {@code minCzxId}
* @param czxidOffset
* - how many children with zxid == minCzxId to skip server-side, as they were returned in previous pages
* @param nextPage
* - if not null, {@link PaginationNextPage} info returned from server will be copied to {@code nextPage}.
* The info can be used for fetching the next page of remaining children, or checking whether the
* returned page is the last one
* @param outputStat
* - output stat of the znode designated by path. It is the stat of the znode when the znode's single
* page of children are returned. The {@code cversion} in stat could be used to validate data consistency
* for multiple pages of children fetched.
* @param outputNextPage
* - if not null, {@link PaginationNextPage} info returned from server will be copied to
* {@code outputNextPage}. The info can be used for fetching the next page of remaining children,
* or checking whether the returned page is the last page.
* @return
* a list of children nodes, up to {@code maxReturned}
* @throws KeeperException
Expand All @@ -2796,10 +2803,10 @@ public List<String> getChildren(final String path,
int maxReturned,
long minCzxId,
int czxidOffset,
final PaginationNextPage nextPage)
Stat outputStat,
PaginationNextPage outputNextPage)
throws KeeperException, InterruptedException {
PathUtils.validatePath(path);

if (maxReturned <= 0) {
throw new IllegalArgumentException("Cannot return less than 1 children");
}
Expand All @@ -2818,48 +2825,21 @@ public List<String> getChildren(final String path,
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxid(minCzxId);
request.setCzxidOffset(czxidOffset);

Set<String> children = null;
GetChildrenPaginatedResponse response;
boolean isFirstPage = true;
boolean needNextPage = true;

while (needNextPage) {
request.setMinCzxid(minCzxId);
// If not the first page, always start from czxidOffset 0, to avoid the case:
// if a child with the same czxid is returned in the previous page, and then deleted
// on the server, the starting offset for the next page should be shifted smaller accordingly.
// If the next page still starts from czxidOffset, the children that not in the previous page
// but their offset is less than czxidOffset, they would be missed.
// HashSet is used to de-dup the duplicate children.
request.setCzxidOffset(isFirstPage ? czxidOffset : 0);
response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
path);
}
minCzxId = response.getNextPageCzxid();
czxidOffset = response.getNextPageCzxidOffset();
needNextPage = needNextPage(maxReturned, minCzxId);

if (isFirstPage) {
// If all children are returned in the first page,
// no need to use hash set to de-dup children
if (!needNextPage) {
updateNextPage(nextPage, minCzxId, czxidOffset);
return response.getChildren();
}
children = new HashSet<>();
isFirstPage = false;
}

children.addAll(response.getChildren());
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
path);
}
if (outputStat != null) {
DataTree.copyStat(response.getStat(), outputStat);
}
updateNextPage(outputNextPage, response.getNextPageCzxid(), response.getNextPageCzxidOffset());

updateNextPage(nextPage, minCzxId, czxidOffset);

return new ArrayList<>(children);
return response.getChildren();
}

/**
Expand All @@ -2886,12 +2866,29 @@ public List<String> getChildren(final String path,
*/
public List<String> getAllChildrenPaginated(String path, boolean watch)
throws KeeperException, InterruptedException {
return getChildren(path, watch ? watchManager.defaultWatcher : null, Integer.MAX_VALUE, 0L, 0, null);
}
PaginationNextPage nextPage = new PaginationNextPage();
Watcher watcher = watch ? watchManager.defaultWatcher : null;
List<String> childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, 0L, 0, null, nextPage);
if (nextPage.getMinCzxid() == ZooDefs.GetChildrenPaginated.lastPageMinCzxid) {
return childrenPaginated;
}

Set<String> childrenSet = new HashSet<>(childrenPaginated);
do {
long minCzxid = nextPage.getMinCzxid();
// If a child with the same czxid is returned in the previous page, and then deleted
// on the server, the children not in the previous page but offset is less than czxidOffset
// would be missed in the next page. Use the last child in the previous page to determine whether or not
// the deletion case happens. If it happens, retry fetching the page starting from offset 0.
int czxidOffset = nextPage.getMinCzxidOffset() - 1;
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid, czxidOffset, null, nextPage);
if (!childrenPaginated.isEmpty() && !childrenSet.contains(childrenPaginated.get(0))) {
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid, 0, null, nextPage);
}
childrenSet.addAll(childrenPaginated);
} while (nextPage.getMinCzxid() != ZooDefs.GetChildrenPaginated.lastPageMinCzxid);

private boolean needNextPage(int maxReturned, long minCzxId) {
return maxReturned == Integer.MAX_VALUE
&& minCzxId != ZooDefs.GetChildrenPaginated.lastPageMinCzxid;
return new ArrayList<>(childrenSet);
}

private void updateNextPage(PaginationNextPage nextPage, long minCzxId, int czxIdOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,12 +845,12 @@ public int compare(PathWithStat left, PathWithStat right) {
* @param maxReturned maximum number of children to return.
* @param minCzxId only return children whose creation zxid equal or greater than minCzxId
* @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages)
* @param nextPage info to be used for the next page call
* @param outputNextPage info to be used for the next page call
* @return A list of child names of the given path
* @throws NoNodeException if the path does not exist
*/
public List<String> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned,
long minCzxId, int czxIdOffset, PaginationNextPage nextPage)
long minCzxId, int czxIdOffset, PaginationNextPage outputNextPage)
throws NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
Expand All @@ -876,8 +876,8 @@ public List<String> getPaginatedChildren(String path, Stat stat, Watcher watcher
}
if (canFitInMaxBuffer) {
updateReadStat(path, countReadChildrenBytes(allChildren));
if (nextPage != null) {
nextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid);
if (outputNextPage != null) {
outputNextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid);
}
return allChildren;
}
Expand Down Expand Up @@ -926,7 +926,7 @@ public List<String> getPaginatedChildren(String path, Stat stat, Watcher watcher
}
}

updateNextPage(nextPage, targetChildren, index);
updateNextPage(outputNextPage, targetChildren, index);
updateReadStat(path, countReadChildrenBytes(paginatedChildren));

return paginatedChildren;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ public void testPacketLengthFormula() throws KeeperException, InterruptedExcepti
PaginationNextPage nextPage = new PaginationNextPage();

// First page
List<String> firstPage = zk.getChildren(basePath, null, numChildren, 0, 0, nextPage);
List<String> firstPage = zk.getChildren(basePath, null, numChildren, 0, 0, null, nextPage);
BufferStats clientResponseStats = serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
int expectedPacketLength =
(DataTree.PAGINATION_PACKET_CHILD_EXTRA_BYTES + UUID.randomUUID().toString().length())
Expand Down Expand Up @@ -487,7 +487,7 @@ public void testMaxNumChildrenPageWatch() throws KeeperException, InterruptedExc
FireOnlyOnceWatcher fireOnlyOnceWatcher = new FireOnlyOnceWatcher();

// First page
List<String> firstPage = zk.getChildren(basePath, fireOnlyOnceWatcher, numChildren, 0, 0, nextPage);
List<String> firstPage = zk.getChildren(basePath, fireOnlyOnceWatcher, numChildren, 0, 0, null, nextPage);
BufferStats clientResponseStats = serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
int expectedPacketLength =
(DataTree.PAGINATION_PACKET_CHILD_EXTRA_BYTES + UUID.randomUUID().toString().length())
Expand All @@ -513,7 +513,7 @@ public void testMaxNumChildrenPageWatch() throws KeeperException, InterruptedExc

// Second page
List<String> secondPage = zk.getChildren(basePath, fireOnlyOnceWatcher, numChildren, nextPage.getMinCzxid(),
nextPage.getMinCzxidOffset(), nextPage);
nextPage.getMinCzxidOffset(), null, nextPage);

// Verify the logic of adding children to page in DataTree is correct.
Assert.assertEquals(ZooDefs.GetChildrenPaginated.lastPageMinCzxid, nextPage.getMinCzxid());
Expand Down

0 comments on commit c400bc1

Please sign in to comment.