Skip to content

Commit

Permalink
NextPage returns the exact minCzxid and offset
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Dec 4, 2020
1 parent c400bc1 commit 898d08a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public interface AddWatchModes {
@InterfaceAudience.Public
public interface GetChildrenPaginated {
long lastPageMinCzxid = -1L;
int lastPageCzxidOffset = -1;
}

public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};
Expand Down
49 changes: 36 additions & 13 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2781,9 +2781,14 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
* @param outputNextPage
* - if not null, {@link PaginationNextPage} info returned from server will be copied to
* {@code outputNextPage}. The info can be used for fetching the next page of remaining children,
* or checking whether the returned page is the last page.
* or checking whether the returned page is the last page by comparing
* {@link PaginationNextPage#getMinCzxid()} with
* {@link org.apache.zookeeper.ZooDefs.GetChildrenPaginated#lastPageMinCzxid}
* @return
* a list of children nodes, up to {@code maxReturned}
* a list of children nodes, up to {@code maxReturned}.
* <p>When params are set {@code maxReturned} = {@code Integer.MAX_VALUE}, {@code minCzxid} <= 0:
* the children list is unordered if all the children can be returned in a page;
* <p>Otherwise, the children list is ordered by czxid.
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
Expand Down Expand Up @@ -2860,7 +2865,7 @@ public List<String> getChildren(final String path,
*
* @param path the path of the parent node
* @param watch whether or not leave a watch on the given node
* @return a list of all children of the given path
* @return a unordered list of all children of the given path
* @throws KeeperException if the server signals an error with a non-zero error code.
* @throws InterruptedException if the server transaction is interrupted.
*/
Expand All @@ -2874,21 +2879,39 @@ public List<String> getAllChildrenPaginated(String path, boolean watch)
}

Set<String> childrenSet = new HashSet<>(childrenPaginated);
getChildrenRemainingPages(path, watcher, nextPage, childrenSet);

return new ArrayList<>(childrenSet);
}

private void getChildrenRemainingPages(String path,
Watcher watcher,
PaginationNextPage nextPage,
Set<String> childrenSet) throws KeeperException, InterruptedException {
int retryCount = 0;
do {
List<String> childrenPaginated;
long minCzxid = nextPage.getMinCzxid();
// If a child with the same czxid is returned in the previous page, and then deleted
// on the server, the children not in the previous page but offset is less than czxidOffset
// would be missed in the next page. Use the last child in the previous page to determine whether or not
// the deletion case happens. If it happens, retry fetching the page starting from offset 0.
int czxidOffset = nextPage.getMinCzxidOffset() - 1;
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid, czxidOffset, null, nextPage);
if (!childrenPaginated.isEmpty() && !childrenSet.contains(childrenPaginated.get(0))) {
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid, 0, null, nextPage);
if (nextPage.getMinCzxidOffset() == 0) {
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid,
nextPage.getMinCzxidOffset(), null, nextPage);
} else {
// If a child with the same czxid is returned in the previous page, and then deleted
// on the server, the children not in the previous page but offset is less than czxidOffset
// would be missed in the next page. Use the last child in the previous page to determine whether or not
// the deletion case happens. If it happens, retry fetching the page starting from offset 0.
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid,
nextPage.getMinCzxidOffset() - 1, null, nextPage);
if (!childrenPaginated.isEmpty() && !childrenSet.contains(childrenPaginated.get(0))) {
if (retryCount++ >= 3) {
throw KeeperException.create(Code.OPERATIONTIMEOUT);
}
LOG.info("Retry fetching children for minZxid = {}, retries = {}", minCzxid, retryCount);
childrenPaginated = getChildren(path, watcher, Integer.MAX_VALUE, minCzxid, 0, null, nextPage);
}
}
childrenSet.addAll(childrenPaginated);
} while (nextPage.getMinCzxid() != ZooDefs.GetChildrenPaginated.lastPageMinCzxid);

return new ArrayList<>(childrenSet);
}

private void updateNextPage(PaginationNextPage nextPage, long minCzxId, int czxIdOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,18 +878,25 @@ public List<String> getPaginatedChildren(String path, Stat stat, Watcher watcher
updateReadStat(path, countReadChildrenBytes(allChildren));
if (outputNextPage != null) {
outputNextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid);
outputNextPage.setMinCzxidOffset(ZooDefs.GetChildrenPaginated.lastPageCzxidOffset);
}
return allChildren;
}
}

return getSortedPaginatedChildren(n, path, stat, watcher, maxReturned, minCzxId, czxIdOffset, outputNextPage);
}

private List<String> getSortedPaginatedChildren(DataNode node, String path, Stat stat, Watcher watcher,
int maxReturned, long minCzxId, int czxIdOffset,
PaginationNextPage outputNextPage) {
int index = 0;
List<PathWithStat> targetChildren = new ArrayList<PathWithStat>();
List<String> paginatedChildren = new ArrayList<String>();

// Need to lock the parent node for the whole block between reading children list and adding watch
synchronized (n) {
buildChildrenPathWithStat(n, path, stat, minCzxId, targetChildren);
synchronized (node) {
buildChildrenPathWithStat(node, path, stat, minCzxId, targetChildren);

targetChildren.sort(staticNodeCreationComparator);

Expand Down Expand Up @@ -981,22 +988,30 @@ private void updateNextPage(PaginationNextPage nextPage, List<PathWithStat> chil
if (lastAddedIndex == children.size() - 1) {
// All children are added, so this is the last page
nextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid);
nextPage.setMinCzxidOffset(ZooDefs.GetChildrenPaginated.lastPageCzxidOffset);
return;
}

// Find the minCzxidOffset next next page by searching the index (startIndex) of czxid
// that is not equal to current czxid.
// minCzxidOffset of next page = lastAddedIndex - startIndex
long lastCzxid = children.get(lastAddedIndex).getStat().getCzxid();
int startIndex = lastAddedIndex;
while (startIndex >= 0) {
if (children.get(startIndex).getStat().getCzxid() != lastCzxid) {
break;
long nextCzxid = children.get(lastAddedIndex + 1).getStat().getCzxid();
int nextCzixdOffset = 0;

if (nextCzxid == lastCzxid) {
// Find the minCzxidOffset next next page by searching the index (startIndex) of czxid
// that is not equal to current czxid.
// minCzxidOffset of next page = lastAddedIndex - startIndex
int startIndex = lastAddedIndex;
while (startIndex >= 0) {
if (children.get(startIndex).getStat().getCzxid() != lastCzxid) {
break;
}
startIndex--;
}
startIndex--;
nextCzixdOffset = lastAddedIndex - startIndex;
}
nextPage.setMinCzxid(lastCzxid);
nextPage.setMinCzxidOffset(lastAddedIndex - startIndex);

nextPage.setMinCzxid(nextCzxid);
nextPage.setMinCzxidOffset(nextCzixdOffset);
}

public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException.NoNodeException {
Expand Down
Loading

0 comments on commit 898d08a

Please sign in to comment.