Skip to content

Commit

Permalink
Optimize paginated getChildren
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Nov 20, 2020
1 parent 58e65ad commit 4ee53c4
Show file tree
Hide file tree
Showing 13 changed files with 522 additions and 209 deletions.
8 changes: 5 additions & 3 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ module org.apache.zookeeper.proto {
class GetChildrenPaginatedRequest {
ustring path;
int maxReturned;
long minCzxId;
long czxIdOffset;
long minCzxid;
int czxidOffset;
boolean watch;
}
class CheckVersionRequest {
Expand Down Expand Up @@ -240,8 +240,10 @@ module org.apache.zookeeper.proto {
org.apache.zookeeper.data.Stat stat;
}
class GetChildrenPaginatedResponse {
vector<org.apache.zookeeper.data.PathWithStat> children;
vector<ustring> children;
org.apache.zookeeper.data.Stat stat;
long nextPageCzxid;
int nextPageCzxidOffset;
}
class GetACLResponse {
vector<org.apache.zookeeper.data.ACL> acl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.data.PathWithStat;

/**
* Iterator over children nodes of a given path.
* <p>
* Note: the final collection of children may not be strongly consistent with the server.
* If there are concurrent writes to the children during iteration, the final collection could
* miss some children or contain some duplicate children.
*
* @see ZooKeeper#getAllChildrenPaginated(String, boolean)
*/
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
class ChildrenBatchIterator implements RemoteIterator<String> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private final LinkedList<String> childrenQueue;
private final PaginationNextPage nextPage;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;

Expand All @@ -47,55 +53,44 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();
this.nextPage = new PaginationNextPage();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
batchGetChildren();
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return !childrenQueue.isEmpty();
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

public String next() throws KeeperException, InterruptedException {
if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
if (childrenQueue.size() == 1 && nextBatchMinZxid != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
&& nextBatchZxidOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset) {
batchGetChildren();
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
return childrenQueue.pop();
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
* Prepare minZxid and zxidOffset for the next batch request
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();
private void updateOffsetsForNextBatch() {
nextBatchMinZxid = nextPage.getMinCzxid();
nextBatchZxidOffset = nextPage.getMinCzxidOffset();
}

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
private void batchGetChildren() throws KeeperException, InterruptedException {
List<String> childrenBatch =
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
childrenQueue.addAll(childrenBatch);
updateOffsetsForNextBatch();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Represents the info used to fetch the next page of data for pagination.
*/
@InterfaceAudience.Public
public class PaginationNextPage {
private long minCzxid;
private int minCzxidOffset;

public PaginationNextPage() {
}

public PaginationNextPage(long minCzxid, int minCzxidOffset) {
this.minCzxid = minCzxid;
this.minCzxidOffset = minCzxidOffset;
}

public long getMinCzxid() {
return minCzxid;
}

public void setMinCzxid(long minCzxid) {
this.minCzxid = minCzxid;
}

public int getMinCzxidOffset() {
return minCzxidOffset;
}

public void setMinCzxidOffset(int minCzxidOffset) {
this.minCzxidOffset = minCzxidOffset;
}

@Override
public String toString() {
return "PaginationNextPage{" +
"minCzxid=" + minCzxid +
", minCzxidOffset=" + minCzxidOffset +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
*
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

/**
* Returns the next element in the iteration.
*
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException, NoSuchElementException;
E next() throws InterruptedException, KeeperException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public interface AddWatchModes {
int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE
}

@InterfaceAudience.Public
public interface GetChildrenPaginated {
long lastPageMinCzxid = -1L;
int lastPageMinCzxidOffset = -1;
}

public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};

}
119 changes: 100 additions & 19 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
Expand Down Expand Up @@ -2769,11 +2768,15 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
* @param maxReturned
* - the maximum number of children returned
* @param minCzxId
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
* @param czxIdOffset
* - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages
* - only return children whose creation zxid is equal or greater than {@code minCzxId}
* @param czxidOffset
* - how many children with zxid == minCzxId to skip server-side, as they were returned in previous pages
* @param nextPage
* - if not null, {@link PaginationNextPage} info returned from server will be copied to {@code nextPage}.
* The info can be used for fetching the next page of remaining children, or checking whether the
* returned page is the last one
* @return
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
* a list of children nodes, up to {@code maxReturned}
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
Expand All @@ -2788,10 +2791,14 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
*
* @since 3.6.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
public List<String> getChildren(final String path,
Watcher watcher,
int maxReturned,
long minCzxId,
int czxidOffset,
final PaginationNextPage nextPage)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
PathUtils.validatePath(path);

if (maxReturned <= 0) {
throw new IllegalArgumentException("Cannot return less than 1 children");
Expand All @@ -2800,26 +2807,100 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
wcb = new ChildWatchRegistration(watcher, path);
}

final String serverPath = prependChroot(clientPath);
final String serverPath = prependChroot(path);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildrenPaginated);
GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);

Set<String> children = null;
GetChildrenPaginatedResponse response;
boolean isFirstPage = true;
boolean needNextPage = true;

while (needNextPage) {
request.setMinCzxid(minCzxId);
// If not the first page, always start from czxidOffset 0, to avoid the case:
// if a child with the same czxid is returned in the previous page, and then deleted
// on the server, the starting offset for the next page should be shifted smaller accordingly.
// If the next page still starts from czxidOffset, the children that not in the previous page
// but their offset is less than czxidOffset, they would be missed.
// HashSet is used to de-dup the duplicate children.
request.setCzxidOffset(isFirstPage ? czxidOffset : 0);
response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
path);
}
minCzxId = response.getNextPageCzxid();
czxidOffset = response.getNextPageCzxidOffset();
needNextPage = needNextPage(maxReturned, minCzxId, czxidOffset);

if (isFirstPage) {
// If all children are returned in the first page,
// no need to use hash set to de-dup children
if (!needNextPage) {
updateNextPage(nextPage, minCzxId, czxidOffset);
return response.getChildren();
}
children = new HashSet<>();
isFirstPage = false;
}

children.addAll(response.getChildren());
}

updateNextPage(nextPage, minCzxId, czxidOffset);

return new ArrayList<>(children);
}

/**
* Returns a list of all the children given the path.
* <p>
* The difference between this API and {@link #getChildren(String, boolean)} is:
* when there are lots of children and the network buffer exceeds {@code jute.maxbuffer},
* this API will fetch the children using pagination and be able to return all children;
* while {@link #getChildren(String, boolean)} will fail.
* <p>
* The final list of children returned is NOT strongly consistent with the server's data:
* the list might contain some deleted children if some children are deleted before the last page is fetched.
* <p>
* If the watch is true and the call is successful (no exception is thrown),
# a watch will be left on the node with the given path. The watch will be
* triggered by a successful operation that deletes the node of the given
* path or creates/deletes a child under the node.
* <p>
*
* @param path the path of the parent node
* @param watch whether or not leave a watch on the given node
* @return a list of all children of the given path
* @throws KeeperException if the server signals an error with a non-zero error code.
* @throws InterruptedException if the server transaction is interrupted.
*/
public List<String> getAllChildrenPaginated(String path, boolean watch)
throws KeeperException, InterruptedException {
return getChildren(path, watch ? watchManager.defaultWatcher : null, Integer.MAX_VALUE, 0L, 0, null);
}

private boolean needNextPage(int maxReturned, long minCzxId, int czxIdOffset) {
return maxReturned == Integer.MAX_VALUE
&& minCzxId != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
&& czxIdOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset;
}

private void updateNextPage(PaginationNextPage nextPage, long minCzxId, int czxIdOffset) {
if (nextPage != null) {
nextPage.setMinCzxid(minCzxId);
nextPage.setMinCzxidOffset(czxIdOffset);
}
return response.getChildren();
}

/**
Expand Down Expand Up @@ -2853,7 +2934,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
*
* @since 3.6.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
public RemoteIterator<String> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}
Expand Down
Loading

0 comments on commit 4ee53c4

Please sign in to comment.