Skip to content

Commit

Permalink
Optimize paginated getChildren
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Nov 16, 2020
1 parent 58e65ad commit e2ee4de
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 196 deletions.
6 changes: 4 additions & 2 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ module org.apache.zookeeper.proto {
ustring path;
int maxReturned;
long minCzxId;
long czxIdOffset;
int czxIdOffset;
boolean watch;
}
class CheckVersionRequest {
Expand Down Expand Up @@ -240,8 +240,10 @@ module org.apache.zookeeper.proto {
org.apache.zookeeper.data.Stat stat;
}
class GetChildrenPaginatedResponse {
vector<org.apache.zookeeper.data.PathWithStat> children;
vector<ustring> children;
org.apache.zookeeper.data.Stat stat;
long nextPageCzxid;
int nextPageCzxIdOffset;
}
class GetACLResponse {
vector<org.apache.zookeeper.data.ACL> acl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.data.PathWithStat;

/**
* Iterator over children nodes of a given path.
*/
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
class ChildrenBatchIterator implements RemoteIterator<String> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private final LinkedList<String> childrenQueue;
private final PaginationNextPage nextPage;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;

Expand All @@ -48,54 +48,41 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {

this.childrenQueue = new LinkedList<>();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
this.nextPage = new PaginationNextPage();
List<String> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
updateOffsetsForNextBatch();
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return !childrenQueue.isEmpty();
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

public String next() throws KeeperException, InterruptedException, NoSuchElementException {
if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
if (childrenQueue.size() == 1 && nextBatchMinZxid >= 0) {
List<String> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
updateOffsetsForNextBatch();
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
return childrenQueue.pop();
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
* Prepare minZxid and zxidOffset for the next batch request
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
private void updateOffsetsForNextBatch() {
nextBatchMinZxid = nextPage.getMinCzxid();
nextBatchZxidOffset = nextPage.getMinCzxidOffset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Represents the info used to fetch the next page of data for pagination.
*/
@InterfaceAudience.Public
public class PaginationNextPage {
private long minCzxid;
private int minCzxidOffset;

public PaginationNextPage() {
}

public PaginationNextPage(long minCzxid, int minCzxidOffset) {
this.minCzxid = minCzxid;
this.minCzxidOffset = minCzxidOffset;
}

public long getMinCzxid() {
return minCzxid;
}

public void setMinCzxid(long minCzxid) {
this.minCzxid = minCzxid;
}

public int getMinCzxidOffset() {
return minCzxidOffset;
}

public void setMinCzxidOffset(int minCzxidOffset) {
this.minCzxidOffset = minCzxidOffset;
}

@Override
public String toString() {
return "PaginationNextPage{" +
"minCzxid=" + minCzxid +
", minCzxidOffset=" + minCzxidOffset +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public interface AddWatchModes {
int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE
}

@InterfaceAudience.Public
public interface GetChildrenPaginated {
long lastPageMinCzxid = -1;
int lastPageMinCzxidOffset = -1;
}

public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};

}
41 changes: 30 additions & 11 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
Expand Down Expand Up @@ -2788,7 +2787,7 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
*
* @since 3.6.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
public List<String> getChildren(final String path, Watcher watcher, int maxReturned, long minCzxId, int czxIdOffset, final PaginationNextPage nextPage)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
Expand All @@ -2811,15 +2810,35 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);

List<String> children = new ArrayList<>();
GetChildrenPaginatedResponse response;
do {
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
children.addAll(response.getChildren());
minCzxId = response.getNextPageCzxid();
czxIdOffset = response.getNextPageCzxIdOffset();
} while (needNextPage(maxReturned, minCzxId, czxIdOffset));

if (nextPage != null) {
nextPage.setMinCzxid(response.getNextPageCzxid());
nextPage.setMinCzxidOffset(response.getNextPageCzxIdOffset());
}
return response.getChildren();

return children;
}

private boolean needNextPage(int maxReturned, long minCzxId, int czxIdOffset) {
return maxReturned == Integer.MAX_VALUE
&& minCzxId != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
&& czxIdOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset;
}

/**
Expand Down Expand Up @@ -2853,7 +2872,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
*
* @since 3.6.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
public RemoteIterator<String> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}
Expand Down
Loading

0 comments on commit e2ee4de

Please sign in to comment.