diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 502f2288d05..42fe61296a6 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -164,8 +164,8 @@ module org.apache.zookeeper.proto { class GetChildrenPaginatedRequest { ustring path; int maxReturned; - long minCzxId; - long czxIdOffset; + long minCzxid; + int czxidOffset; boolean watch; } class CheckVersionRequest { @@ -240,8 +240,10 @@ module org.apache.zookeeper.proto { org.apache.zookeeper.data.Stat stat; } class GetChildrenPaginatedResponse { - vector children; + vector children; org.apache.zookeeper.data.Stat stat; + long nextPageCzxid; + int nextPageCzxidOffset; } class GetACLResponse { vector acl; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java index e5624c92975..f7a21d57134 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java @@ -21,18 +21,24 @@ import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; -import org.apache.zookeeper.data.PathWithStat; /** * Iterator over children nodes of a given path. + *

+ * Note: the final collection of children may not be strongly consistent with the server. + * If there are concurrent writes to the children during iteration, the final collection could + * miss some children or contain some duplicate children. + * + * @see ZooKeeper#getAllChildrenPaginated(String, boolean) */ -class ChildrenBatchIterator implements RemoteIterator { +class ChildrenBatchIterator implements RemoteIterator { private final ZooKeeper zooKeeper; private final String path; private final Watcher watcher; private final int batchSize; - private final LinkedList childrenQueue; + private final LinkedList childrenQueue; + private final PaginationNextPage nextPage; private long nextBatchMinZxid; private int nextBatchZxidOffset; @@ -47,55 +53,44 @@ class ChildrenBatchIterator implements RemoteIterator { this.nextBatchMinZxid = minZxid; this.childrenQueue = new LinkedList<>(); + this.nextPage = new PaginationNextPage(); - List firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset); - childrenQueue.addAll(firstChildrenBatch); - - updateOffsetsForNextBatch(firstChildrenBatch); + batchGetChildren(); } @Override public boolean hasNext() { - // next() never lets childrenQueue empty unless we iterated over all children return !childrenQueue.isEmpty(); } @Override - public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException { - + public String next() throws KeeperException, InterruptedException { if (!hasNext()) { throw new NoSuchElementException("No more children"); } // If we're down to the last element, backfill before returning it - if (childrenQueue.size() == 1) { - - List childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset); - childrenQueue.addAll(childrenBatch); - - updateOffsetsForNextBatch(childrenBatch); + if (childrenQueue.size() == 1 && nextBatchMinZxid != ZooDefs.GetChildrenPaginated.lastPageMinCzxid + && nextBatchZxidOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset) { + batchGetChildren(); } - PathWithStat returnChildren = childrenQueue.pop(); - - return returnChildren; + return childrenQueue.pop(); } /** - * Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current + * Prepare minZxid and zxidOffset for the next batch request */ - private void updateOffsetsForNextBatch(List children) { - - for (PathWithStat child : children) { - long childZxid = child.getStat().getCzxid(); + private void updateOffsetsForNextBatch() { + nextBatchMinZxid = nextPage.getMinCzxid(); + nextBatchZxidOffset = nextPage.getMinCzxidOffset(); + } - if (nextBatchMinZxid == childZxid) { - ++nextBatchZxidOffset; - } else { - nextBatchZxidOffset = 1; - nextBatchMinZxid = childZxid; - } - } + private void batchGetChildren() throws KeeperException, InterruptedException { + List childrenBatch = + zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage); + childrenQueue.addAll(childrenBatch); + updateOffsetsForNextBatch(); } } \ No newline at end of file diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/PaginationNextPage.java b/zookeeper-server/src/main/java/org/apache/zookeeper/PaginationNextPage.java new file mode 100644 index 00000000000..ab75ef9e8d4 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/PaginationNextPage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Represents the info used to fetch the next page of data for pagination. + */ +@InterfaceAudience.Public +public class PaginationNextPage { + private long minCzxid; + private int minCzxidOffset; + + public PaginationNextPage() { + } + + public PaginationNextPage(long minCzxid, int minCzxidOffset) { + this.minCzxid = minCzxid; + this.minCzxidOffset = minCzxidOffset; + } + + public long getMinCzxid() { + return minCzxid; + } + + public void setMinCzxid(long minCzxid) { + this.minCzxid = minCzxid; + } + + public int getMinCzxidOffset() { + return minCzxidOffset; + } + + public void setMinCzxidOffset(int minCzxidOffset) { + this.minCzxidOffset = minCzxidOffset; + } + + @Override + public String toString() { + return "PaginationNextPage{" + + "minCzxid=" + minCzxid + + ", minCzxidOffset=" + minCzxidOffset + + '}'; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java index 2517786f92f..af25b71b748 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java @@ -27,16 +27,18 @@ public interface RemoteIterator { /** * Returns true if the iterator has more elements. + * * @return true if the iterator has more elements, false otherwise. */ boolean hasNext(); /** * Returns the next element in the iteration. + * * @return the next element in the iteration. * @throws InterruptedException if the thread is interrupted * @throws KeeperException if an error is encountered server-side * @throws NoSuchElementException if the iteration has no more elements */ - E next() throws InterruptedException, KeeperException, NoSuchElementException; + E next() throws InterruptedException, KeeperException; } \ No newline at end of file diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java index 89009704d10..ff769079efb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java @@ -161,6 +161,12 @@ public interface AddWatchModes { int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE } + @InterfaceAudience.Public + public interface GetChildrenPaginated { + long lastPageMinCzxid = -1L; + int lastPageMinCzxidOffset = -1; + } + public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"}; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index 86276522f8e..6a3e4ace6d1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -53,7 +53,6 @@ import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.AddWatchRequest; import org.apache.zookeeper.proto.CheckWatchesRequest; @@ -2769,11 +2768,15 @@ public List getChildren(final String path, Watcher watcher) throws Keepe * @param maxReturned * - the maximum number of children returned * @param minCzxId - * - only return children whose creation zkid is equal or greater than {@code minCzxId} - * @param czxIdOffset - * - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages + * - only return children whose creation zxid is equal or greater than {@code minCzxId} + * @param czxidOffset + * - how many children with zxid == minCzxId to skip server-side, as they were returned in previous pages + * @param nextPage + * - if not null, {@link PaginationNextPage} info returned from server will be copied to {@code nextPage}. + * The info can be used for fetching the next page of remaining children, or checking whether the + * returned page is the last one * @return - * an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid + * a list of children nodes, up to {@code maxReturned} * @throws KeeperException * if the server signals an error with a non-zero error code. * @throws IllegalArgumentException @@ -2788,10 +2791,14 @@ public List getChildren(final String path, Watcher watcher) throws Keepe * * @since 3.6.2 */ - public List getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset) + public List getChildren(final String path, + Watcher watcher, + int maxReturned, + long minCzxId, + int czxidOffset, + final PaginationNextPage nextPage) throws KeeperException, InterruptedException { - final String clientPath = path; - PathUtils.validatePath(clientPath); + PathUtils.validatePath(path); if (maxReturned <= 0) { throw new IllegalArgumentException("Cannot return less than 1 children"); @@ -2800,10 +2807,10 @@ public List getChildren(final String path, Watcher watcher, final // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { - wcb = new ChildWatchRegistration(watcher, clientPath); + wcb = new ChildWatchRegistration(watcher, path); } - final String serverPath = prependChroot(clientPath); + final String serverPath = prependChroot(path); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getChildrenPaginated); @@ -2811,15 +2818,89 @@ public List getChildren(final String path, Watcher watcher, final request.setPath(serverPath); request.setWatch(watcher != null); request.setMaxReturned(maxReturned); - request.setMinCzxId(minCzxId); - request.setCzxIdOffset(czxIdOffset); - GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse(); - ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); - if (r.getErr() != 0) { - throw KeeperException.create(KeeperException.Code.get(r.getErr()), - clientPath); + + Set children = null; + GetChildrenPaginatedResponse response; + boolean isFirstPage = true; + boolean needNextPage = true; + + while (needNextPage) { + request.setMinCzxid(minCzxId); + // If not the first page, always start from czxidOffset 0, to avoid the case: + // if a child with the same czxid is returned in the previous page, and then deleted + // on the server, the starting offset for the next page should be shifted smaller accordingly. + // If the next page still starts from czxidOffset, the children that not in the previous page + // but their offset is less than czxidOffset, they would be missed. + // HashSet is used to de-dup the duplicate children. + request.setCzxidOffset(isFirstPage ? czxidOffset : 0); + response = new GetChildrenPaginatedResponse(); + ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); + if (r.getErr() != 0) { + throw KeeperException.create(KeeperException.Code.get(r.getErr()), + path); + } + minCzxId = response.getNextPageCzxid(); + czxidOffset = response.getNextPageCzxidOffset(); + needNextPage = needNextPage(maxReturned, minCzxId, czxidOffset); + + if (isFirstPage) { + // If all children are returned in the first page, + // no need to use hash set to de-dup children + if (!needNextPage) { + updateNextPage(nextPage, minCzxId, czxidOffset); + return response.getChildren(); + } + children = new HashSet<>(); + isFirstPage = false; + } + + children.addAll(response.getChildren()); + } + + updateNextPage(nextPage, minCzxId, czxidOffset); + + return new ArrayList<>(children); + } + + /** + * Returns a list of all the children given the path. + *

+ * The difference between this API and {@link #getChildren(String, boolean)} is: + * when there are lots of children and the network buffer exceeds {@code jute.maxbuffer}, + * this API will fetch the children using pagination and be able to return all children; + * while {@link #getChildren(String, boolean)} will fail. + *

+ * The final list of children returned is NOT strongly consistent with the server's data: + * the list might contain some deleted children if some children are deleted before the last page is fetched. + *

+ * If the watch is true and the call is successful (no exception is thrown), + # a watch will be left on the node with the given path. The watch will be + * triggered by a successful operation that deletes the node of the given + * path or creates/deletes a child under the node. + *

+ * + * @param path the path of the parent node + * @param watch whether or not leave a watch on the given node + * @return a list of all children of the given path + * @throws KeeperException if the server signals an error with a non-zero error code. + * @throws InterruptedException if the server transaction is interrupted. + */ + public List getAllChildrenPaginated(String path, boolean watch) + throws KeeperException, InterruptedException { + return getChildren(path, watch ? watchManager.defaultWatcher : null, Integer.MAX_VALUE, 0L, 0, null); + } + + private boolean needNextPage(int maxReturned, long minCzxId, int czxIdOffset) { + return maxReturned == Integer.MAX_VALUE + && minCzxId != ZooDefs.GetChildrenPaginated.lastPageMinCzxid + && czxIdOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset; + } + + private void updateNextPage(PaginationNextPage nextPage, long minCzxId, int czxIdOffset) { + if (nextPage != null) { + nextPage.setMinCzxid(minCzxId); + nextPage.setMinCzxidOffset(czxIdOffset); } - return response.getChildren(); } /** @@ -2853,7 +2934,7 @@ public List getChildren(final String path, Watcher watcher, final * * @since 3.6.2 */ - public RemoteIterator getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId) + public RemoteIterator getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId) throws KeeperException, InterruptedException { return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 3c3a8c05bd9..67ad9f56218 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -33,10 +33,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -45,6 +45,7 @@ import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.PaginationNextPage; import org.apache.zookeeper.Quotas; import org.apache.zookeeper.StatsTrack; import org.apache.zookeeper.WatchedEvent; @@ -63,6 +64,9 @@ import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.StatPersisted; +import org.apache.zookeeper.proto.GetChildrenPaginatedResponse; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.watch.IWatchManager; import org.apache.zookeeper.server.watch.WatchManagerFactory; import org.apache.zookeeper.server.watch.WatcherMode; @@ -176,6 +180,18 @@ public class DataTree { // to align and compare between servers. public static final int DIGEST_LOG_INTERVAL = 128; + // Constants used to calculate response packet length for the paginated children list. + // packetLength = (childNameLength + PAGINATION_PACKET_CHILD_EXTRA_BYTES) * numChildren + // + PAGINATION_PACKET_BASE_BYTES + private static final int PAGINATION_PACKET_BASE_BYTES; + private static final int PAGINATION_PACKET_CHILD_EXTRA_BYTES; + + static { + PAGINATION_PACKET_BASE_BYTES = getPaginationPacketLength(Collections.emptyList()); + PAGINATION_PACKET_CHILD_EXTRA_BYTES = + getPaginationPacketLength(Collections.singletonList("")) - PAGINATION_PACKET_BASE_BYTES; + } + // If this is not null, we are actively looking for a target zxid that we // want to validate the digest for private ZxidDigest digestFromLoadedSnapshot; @@ -261,6 +277,19 @@ private static long getNodeSize(String path, byte[] data) { return (path == null ? 0 : path.length()) + (data == null ? 0 : data.length); } + private static int getPaginationPacketLength(List children) { + try { + Record record = new GetChildrenPaginatedResponse(children, new Stat(), 0, 0); + ReplyHeader header = new ReplyHeader(); + byte[] recordBytes = SerializeUtils.serializeRecord(record); + byte[] headerBytes = SerializeUtils.serializeRecord(header); + return recordBytes.length + headerBytes.length; + } catch (IOException e) { + LOG.warn("Unexpected exception. Destruction averted.", e); + return 0; + } + } + public long cachedApproximateDataSize() { return nodeDataSize.get(); } @@ -311,6 +340,9 @@ public DataTree() { LOG.error("Unexpected exception when creating WatchManager, exiting abnormally", e); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } + + LOG.info("Pagination packet length formula constants: child extra = {} bytes, base = {} bytes", + PAGINATION_PACKET_CHILD_EXTRA_BYTES, PAGINATION_PACKET_BASE_BYTES); } /** @@ -808,74 +840,166 @@ public int compare(PathWithStat left, PathWithStat right) { * Produces a paginated list of the children of a given path * @param path path of node node to list * @param stat stat of the node to list - * @param watcher an optional watcher to attach to the node. The watcher is added only once when reaching the end of pagination - * @param maxReturned maximum number of children to return. Return one more than this number to indicate truncation + * @param watcher an optional watcher to attach to the node. The watcher is added only once + * when reaching the end of pagination + * @param maxReturned maximum number of children to return. * @param minCzxId only return children whose creation zxid equal or greater than minCzxId * @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages) - * @return A list of path with stats + * @param nextPage info to be used for the next page call + * @return A list of child names of the given path * @throws NoNodeException if the path does not exist */ - public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, - long minCzxId, long czxIdOffset) + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, + long minCzxId, int czxIdOffset, PaginationNextPage nextPage) throws NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } - synchronized (n) { - if (stat != null) { - n.copyStat(stat); - } - PriorityQueue childrenQueue; - Set actualChildren = n.getChildren(); - if (actualChildren == null) { - childrenQueue = new PriorityQueue(1); - } else { - childrenQueue = new PriorityQueue(maxReturned + 1, staticNodeCreationComparator); - for (String child : actualChildren) { - DataNode childNode = nodes.get(path + "/" + child); - if (null != childNode) { - final long czxId = childNode.stat.getCzxid(); - - if (czxId < minCzxId) { - // Filter out nodes that are below minCzxId - continue; - } - - Stat childStat = new Stat(); - childNode.copyStat(childStat); - - // Cannot discard before having sorted and removed offset - childrenQueue.add(new PathWithStat(child, childStat)); + + if (maxReturned == Integer.MAX_VALUE && minCzxId <= 0 && czxIdOffset <= 0) { + // This request is to fetch all children. Check if all children can be returned in a page. + Set allChildren; + boolean isBelowMaxBuffer = false; + + // Need to lock the parent node for the whole block between reading children list and adding watch + synchronized (n) { + if (stat != null) { + n.copyStat(stat); + } + allChildren = n.getChildren(); + if (isPacketLengthBelowMaxBuffer(computeChildrenPacketLength(allChildren))) { + isBelowMaxBuffer = true; + if (watcher != null) { + childWatches.addWatch(path, watcher); } } } - // Go over the ordered list of children and skip the first czxIdOffset that have czxid equal to minCzxId, if any - int skipped = 0; - while (!childrenQueue.isEmpty() && skipped < czxIdOffset) { - PathWithStat head = childrenQueue.peek(); - if (head.getStat().getCzxid() > minCzxId) { + if (isBelowMaxBuffer) { + // If all children can be returned in the first page, just return them without sorting. + int bytes = allChildren.stream().mapToInt(String::length).sum(); + updateReadStat(path, bytes); + if (nextPage != null) { + nextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid); + nextPage.setMinCzxidOffset(ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset); + } + return new ArrayList<>(allChildren); + } + } + + int index = 0; + int bytes = 0; + List targetChildren = new ArrayList(); + List paginatedChildren = new ArrayList(); + + // Need to lock the parent node for the whole block between reading children list and adding watch + synchronized (n) { + buildChildrenPathWithStat(n, path, stat, minCzxId, targetChildren); + + targetChildren.sort(staticNodeCreationComparator); + + // Go over the ordered list of children and skip the first czxIdOffset + // that have czxid equal to minCzxId, if any + while (index < targetChildren.size() && index < czxIdOffset) { + if (targetChildren.get(index).getStat().getCzxid() > minCzxId) { // We moved past the minCzxId, no point in looking further break; - } else { - childrenQueue.poll(); - ++skipped; } + index++; } - // Return as list preserving newer-to-older order - LinkedList result = new LinkedList(); - while (!childrenQueue.isEmpty() && result.size() < maxReturned) { - result.addLast(childrenQueue.poll()); + // Return as list preserving older-to-newer order + // Add children up to maxReturned and just below the max network buffer + for (int packetLength = PAGINATION_PACKET_BASE_BYTES; + index < targetChildren.size() && paginatedChildren.size() < maxReturned; + index++) { + String child = targetChildren.get(index).getPath(); + packetLength += child.length() + PAGINATION_PACKET_CHILD_EXTRA_BYTES; + if (!isPacketLengthBelowMaxBuffer(packetLength)) { + // Stop adding more children to ensure packet is below max buffer + // Decrement index as the child is not added + index--; + break; + } + paginatedChildren.add(child); + bytes += child.length(); } - // This is the last page, set the watch - if (childrenQueue.isEmpty()) { + if (index >= targetChildren.size() && watcher != null) { + // All children are added so this is the last page, set the watch childWatches.addWatch(path, watcher); } - return result; } + + updateNextPage(nextPage, targetChildren, index - 1); + updateReadStat(path, bytes); + + return paginatedChildren; + } + + private boolean isPacketLengthBelowMaxBuffer(int packetLength) { + return packetLength < BinaryInputArchive.maxBuffer; + } + + private void buildChildrenPathWithStat(DataNode n, String path, Stat stat, long minCzxId, + List targetChildren) { + synchronized (n) { + if (stat != null) { + n.copyStat(stat); + } + for (String child : n.getChildren()) { + DataNode childNode = nodes.get(path + "/" + child); + if (null != childNode) { + final long czxId = childNode.stat.getCzxid(); + + if (czxId < minCzxId) { + // Filter out nodes that are below minCzxId + continue; + } + + Stat childStat = new Stat(); + childNode.copyStat(childStat); + + // Cannot discard before having sorted and removed offset + targetChildren.add(new PathWithStat(child, childStat)); + } + } + } + } + + /* + * totalLength = (PAGINATION_PACKET_CHILD_EXTRA_BYTES + childLength) * numChildren + PAGINATION_PACKET_BASE_BYTES + */ + private int computeChildrenPacketLength(Collection children) { + int length = children.stream().mapToInt(child -> PAGINATION_PACKET_CHILD_EXTRA_BYTES + child.length()).sum(); + return length + PAGINATION_PACKET_BASE_BYTES; + } + + private void updateNextPage(PaginationNextPage nextPage, List children, int lastAddedIndex) { + if (nextPage == null) { + return; + } + if (lastAddedIndex == children.size() - 1) { + // All children are added, so this is the last page + nextPage.setMinCzxid(ZooDefs.GetChildrenPaginated.lastPageMinCzxid); + nextPage.setMinCzxidOffset(ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset); + return; + } + + // Find the minCzxidOffset next next page by searching the index (startIndex) of czxid + // that is not equal to current czxid. + // minCzxidOffset of next page = lastAddedIndex - startIndex + long lastCzxid = children.get(lastAddedIndex).getStat().getCzxid(); + int startIndex = lastAddedIndex; + while (startIndex >= 0) { + if (children.get(startIndex).getStat().getCzxid() != lastCzxid) { + break; + } + startIndex--; + } + nextPage.setMinCzxid(lastCzxid); + nextPage.setMinCzxidOffset(lastAddedIndex - startIndex); } public Stat setACL(String path, List acl, int version) throws KeeperException.NoNodeException { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index d3d2f2c8bd7..687b96928d0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -42,6 +43,7 @@ import org.apache.zookeeper.OpResult.GetChildrenResult; import org.apache.zookeeper.OpResult.GetDataResult; import org.apache.zookeeper.OpResult.SetDataResult; +import org.apache.zookeeper.PaginationNextPage; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.OpCode; @@ -574,13 +576,15 @@ public void processRequest(Request request) { request.authInfo, path, null); final int maxReturned = getChildrenPaginatedRequest.getMaxReturned(); - List list = zks.getZKDatabase().getPaginatedChildren( + final PaginationNextPage nextPage = new PaginationNextPage(); + List list = zks.getZKDatabase().getPaginatedChildren( getChildrenPaginatedRequest.getPath(), stat, getChildrenPaginatedRequest.getWatch() ? cnxn : null, maxReturned, - getChildrenPaginatedRequest.getMinCzxId(), - getChildrenPaginatedRequest.getCzxIdOffset()); - rsp = new GetChildrenPaginatedResponse(list, stat); + getChildrenPaginatedRequest.getMinCzxid(), + getChildrenPaginatedRequest.getCzxidOffset(), + nextPage); + rsp = new GetChildrenPaginatedResponse(list, stat, nextPage.getMinCzxid(), nextPage.getMinCzxidOffset()); break; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 399f94881c3..8790d362446 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -890,6 +890,7 @@ protected void pRequest(Request request) throws RequestProcessorException { case OpCode.getChildren: case OpCode.getAllChildrenNumber: case OpCode.getChildren2: + case OpCode.getChildrenPaginated: case OpCode.ping: case OpCode.setWatches: case OpCode.setWatches2: diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index 30b7b7b03b5..bdfb0af60e3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -40,12 +40,12 @@ import org.apache.jute.Record; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.PaginationNextPage; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -582,12 +582,12 @@ public int getAllChildrenNumber(String path) throws KeeperException.NoNodeExcept * @param maxReturned the maximum number of nodes to be returned * @param minCzxId only return children whose creation zxid greater than minCzxId * @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages) - * @return A list of PathWithStat for the children. Size is bound to maxReturned (maxReturned+1 indicates truncation) + * @return A list of children. Size is bound to maxReturned (maxReturned+1 indicates truncation) * @throws NoNodeException if the given path does not exist */ - public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, - long minCzxId, long czxIdOffset) throws NoNodeException { - return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId, czxIdOffset); + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, + long minCzxId, int czxIdOffset, PaginationNextPage nextPage) throws NoNodeException { + return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId, czxIdOffset, nextPage); } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java index fcc5c8f4b70..85f294c44a1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java @@ -19,12 +19,14 @@ package org.apache.zookeeper.server.util; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -184,4 +186,17 @@ public static byte[] serializeRequest(Request request) { return data; } + /** + * Serializes a {@link Record} into a byte array. + * + * @param record the {@link Record} to be serialized + * @return a new byte array + * @throws IOException if there is an error during serialization + */ + public static byte[] serializeRecord(Record record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(ZooKeeperServer.intBufferStartingSizeBytes); + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); + bos.writeRecord(record, null); + return baos.toByteArray(); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index f2d353db571..9e87c7a35b5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -50,7 +50,6 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.PathTrie; -import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.txn.CreateTxn; @@ -327,53 +326,50 @@ public void getChildrenPaginated() throws NodeExistsException, NoNodeException { // Asking from a negative for 5 nodes should return the 5, and not set the watch int curWatchCount = dt.getWatchCount(); - List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 5, -1, 0); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 5, -1, 0, null); assertEquals(5, result.size()); assertEquals("The watch not should have been set", curWatchCount, dt.getWatchCount()); // Verify that the list is sorted String before = ""; - for (final PathWithStat s : result) { - final String path = s.getPath(); - assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + for (final String path : result) { + assertTrue(String.format("The next path (%s) should be > previous (%s)", path, before), path.compareTo(before) > 0); before = path; } // Asking from a negative would give me all children, and set the watch curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), countNodes, -1, 0); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), countNodes, -1, 0, null); assertEquals(countNodes, result.size()); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); // Verify that the list is sorted before = ""; - for (final PathWithStat s : result) { - final String path = s.getPath(); - assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + for (final String path : result) { + assertTrue(String.format("The next path (%s) should be > previous (%s)", path, before), path.compareTo(before) > 0); before = path; } - // Asking from the last one should return only onde node + // Asking from the last one should return only one node curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 1, 0); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 1, 0, null); assertEquals(1, result.size()); - assertEquals("test-" + (countNodes - 1), result.get(0).getPath()); - assertEquals(firstCzxId + countNodes - 1, result.get(0).getStat().getMzxid()); + assertEquals("test-" + (countNodes - 1), result.get(0)); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); // Asking from the last created node+1 should return an empty list and set the watch curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes, 0); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes, 0, null); assertTrue("The result should be an empty list", result.isEmpty()); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); // Asking from -1 for one node should return two, and NOT set the watch curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1, -1, 0); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1, -1, 0, null); assertEquals("No watch should be set", curWatchCount, dt.getWatchCount()); assertEquals("We only return up to ", 1, result.size()); // Check that we ordered correctly - assertEquals("test-0", result.get(0).getPath()); + assertEquals("test-0", result.get(0)); } @Test(timeout = 60000) @@ -402,59 +398,58 @@ public void getChildrenPaginatedWithOffset() throws NodeExistsException, NoNodeE // Asking from a negative would give me all children, and set the watch int curWatchCount = dt.getWatchCount(); - List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1000, -1, 0); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1000, -1, 0, null); assertEquals(allNodes, result.size()); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); // Verify that the list is sorted String before = ""; - for (final PathWithStat s : result) { - final String path = s.getPath(); - assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + for (final String path : result) { + assertTrue(String.format("The next path (%s) should be > previous (%s)", path, before), path.compareTo(before) > 0); before = path; } // Asking with offset minCzxId below childrenCzxId should not skip anything, regardless of offset curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId - 1, 3); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId - 1, 3, null); assertEquals(2, result.size()); - assertEquals("test-1", result.get(0).getPath()); - assertEquals("test-2", result.get(1).getPath()); + assertEquals("test-1", result.get(0)); + assertEquals("test-2", result.get(1)); assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); // Asking with offset 5 should skip nodes 1, 2, 3, 4, 5 curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId, 5); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId, 5, null); assertEquals(2, result.size()); - assertEquals("test-6", result.get(0).getPath()); - assertEquals("test-7", result.get(1).getPath()); + assertEquals("test-6", result.get(0)); + assertEquals("test-7", result.get(1)); assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); // Asking with offset 5 for more nodes than are there should skip nodes 1, 2, 3, 4, 5 (plus 0 due to zxid) curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 10, childrenCzxId, 5); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 10, childrenCzxId, 5, null); assertEquals(5, result.size()); - assertEquals("test-6", result.get(0).getPath()); - assertEquals("test-7", result.get(1).getPath()); - assertEquals("test-8", result.get(2).getPath()); - assertEquals("test-9", result.get(3).getPath()); + assertEquals("test-6", result.get(0)); + assertEquals("test-7", result.get(1)); + assertEquals("test-8", result.get(2)); + assertEquals("test-9", result.get(3)); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); // Asking with offset 5 for fewer nodes than are there should skip nodes 1, 2, 3, 4, 5 (plus 0 due to zxid) curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 4, childrenCzxId, 5); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 4, childrenCzxId, 5, null); assertEquals(4, result.size()); - assertEquals("test-6", result.get(0).getPath()); - assertEquals("test-7", result.get(1).getPath()); - assertEquals("test-8", result.get(2).getPath()); - assertEquals("test-9", result.get(3).getPath()); + assertEquals("test-6", result.get(0)); + assertEquals("test-7", result.get(1)); + assertEquals("test-8", result.get(2)); + assertEquals("test-9", result.get(3)); assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); // Asking from the last created node+1 should return an empty list and set the watch curWatchCount = dt.getWatchCount(); - result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + childrenCzxId, 0); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + childrenCzxId, 0, null); assertTrue("The result should be an empty list", result.isEmpty()); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); } @@ -469,7 +464,7 @@ public void getChildrenPaginatedEmpty() throws NodeExistsException, NoNodeExcept // Asking from a negative would give me all children, and set the watch int curWatchCount = dt.getWatchCount(); - List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 100, -1, 0); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 100, -1, 0, null); assertTrue("The result should be empty", result.isEmpty()); assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java index b9edd72504f..06cf970cae5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java @@ -20,11 +20,14 @@ import static org.junit.Assert.fail; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; +import java.util.Set; import java.util.UUID; +import org.apache.jute.BinaryInputArchive; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.OpResult; @@ -34,7 +37,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.junit.Assert; import org.junit.Test; @@ -78,30 +80,22 @@ public void testPagination() throws Exception { } long minCzxId = -1; - Map readChildrenMetadata = new HashMap(); + Set readChildrenMetadata = new HashSet(); final int pageSize = 3; - RemoteIterator it = zk.getChildrenIterator(basePath, null, pageSize, minCzxId); + RemoteIterator it = zk.getChildrenIterator(basePath, null, pageSize, minCzxId); while (it.hasNext()) { - PathWithStat pathWithStat = it.next(); + final String nodePath = it.next(); - final String nodePath = pathWithStat.getPath(); - final Stat nodeStat = pathWithStat.getStat(); - - LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); - readChildrenMetadata.put(nodePath, nodeStat); - - Assert.assertTrue(nodeStat.getCzxid() > minCzxId); - minCzxId = nodeStat.getCzxid(); + LOG.info("Read: " + nodePath); + readChildrenMetadata.add(nodePath); + Assert.assertTrue(createdChildrenMetadata.get(nodePath).getCzxid() > minCzxId); + minCzxId = createdChildrenMetadata.get(nodePath).getCzxid(); } - Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); - - for (String child : createdChildrenMetadata.keySet()) { - Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); - } + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata); } @Test(timeout = 30000) @@ -112,28 +106,21 @@ public void testPaginationIterator() throws Exception { Map createdChildrenMetadata = createChildren(basePath, random.nextInt(50) + 1, 0); - Map readChildrenMetadata = new HashMap(); + Set readChildrenMetadata = new HashSet(); final int batchSize = random.nextInt(3) + 1; - RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); while (childrenIterator.hasNext()) { - PathWithStat child = childrenIterator.next(); + String nodePath = childrenIterator.next(); - final String nodePath = child.getPath(); - final Stat nodeStat = child.getStat(); - - LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); - readChildrenMetadata.put(nodePath, nodeStat); + LOG.info("Read: " + nodePath); + readChildrenMetadata.add(nodePath); } - Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); - - for (String child : createdChildrenMetadata.keySet()) { - Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); - } + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata); } /* @@ -153,11 +140,11 @@ public void testPaginationWithServerDown() throws Exception { Map createdChildrenMetadata = createChildren(basePath, random.nextInt(15) + 10, 0); - Map readChildrenMetadata = new HashMap(); + Set readChildrenMetadata = new HashSet(); final int batchSize = random.nextInt(3) + 1; - RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); boolean serverDown = false; @@ -177,7 +164,7 @@ public void testPaginationWithServerDown() throws Exception { } } - PathWithStat child = null; + String child = null; boolean exception = false; try { @@ -191,19 +178,12 @@ public void testPaginationWithServerDown() throws Exception { // next() returned (either more elements in current batch or server is up) Assert.assertNotNull(child); - final String nodePath = child.getPath(); - final Stat nodeStat = child.getStat(); - - LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); - readChildrenMetadata.put(nodePath, nodeStat); + LOG.info("Read: " + child); + readChildrenMetadata.add(child); } } - Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); - - for (String child : createdChildrenMetadata.keySet()) { - Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); - } + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata); } @@ -233,7 +213,7 @@ public void testPaginationWatch() throws Exception { FireOnlyOnceWatcher fireOnlyOnceWatcher = new FireOnlyOnceWatcher(); - RemoteIterator it = zk.getChildrenIterator(basePath, fireOnlyOnceWatcher, pageSize, minCzxId); + RemoteIterator it = zk.getChildrenIterator(basePath, fireOnlyOnceWatcher, pageSize, minCzxId); int childrenIndex = 0; @@ -241,16 +221,9 @@ public void testPaginationWatch() throws Exception { ++childrenIndex; - PathWithStat pathWithStat = it.next(); - - final String nodePath = pathWithStat.getPath(); + final String nodePath = it.next(); LOG.info("Read: " + nodePath); - final Stat nodeStat = pathWithStat.getStat(); - - Assert.assertTrue(nodeStat.getCzxid() > minCzxId); - minCzxId = nodeStat.getCzxid(); - // Create more children before pagination is completed -- should NOT trigger watch if (childrenIndex < 6) { String childPath = basePath + "/" + "before-pagination-" + childrenIndex; @@ -260,7 +233,7 @@ public void testPaginationWatch() throws Exception { // Modify the first child of each page. // This should not trigger additional watches or create duplicates in the set of children returned if (childrenIndex % pageSize == 0) { - zk.setData(basePath + "/" + pathWithStat.getPath(), new byte[3], -1); + zk.setData(basePath + "/" + nodePath, new byte[3], -1); } synchronized (fireOnlyOnceWatcher) { @@ -305,7 +278,7 @@ public void testPaginationWithNoChildren() throws Exception { final int batchSize = 10; - RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); Assert.assertFalse(childrenIterator.hasNext()); @@ -360,24 +333,77 @@ public void testPaginationWithMulti() throws Exception { LOG.info("Created: " + childPath + " zkId: " + stat.getCzxid()); } - Map readChildrenMetadata = new HashMap(); + Set readChildrenMetadata = new HashSet(); - RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); while (childrenIterator.hasNext()) { - PathWithStat children = childrenIterator.next(); + String children = childrenIterator.next(); - LOG.info("Read: " + children.getPath() + " zkId: " + children.getStat().getCzxid()); - readChildrenMetadata.put(children.getPath(), children.getStat()); + LOG.info("Read: " + children); + readChildrenMetadata.add(children); } Assert.assertEquals(numChildren, readChildrenMetadata.size()); - Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata); + } - for (String child : createdChildrenMetadata.keySet()) { - Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + /* + * Tests if all children can be fetched in one page, the children are + * in the same order(no sorting by czxid) as the non-paginated result. + */ + @Test(timeout = 60000) + public void testGetAllChildrenPaginatedOnePage() throws KeeperException, InterruptedException { + final String basePath = "/testPagination-" + UUID.randomUUID().toString(); + createChildren(basePath, 100, 0); + + List expected = zk.getChildren(basePath, false); + List actual = zk.getAllChildrenPaginated(basePath, false); + + Assert.assertEquals(expected, actual); + } + + /* + * Tests if all children's packet exceeds jute.maxbuffer, it can still successfully fetch them. + * The packet length computation formula is also tested through this test. Otherwise, it'll + * fail to return all children with the paginated API. + */ + @Test(timeout = 60000) + public void testGetAllChildrenPaginatedMultiPages() throws InterruptedException, KeeperException { + // Get the number of children that would definitely exceed 1 MB. + int numChildren = BinaryInputArchive.maxBuffer / UUID.randomUUID().toString().length() + 1; + final String basePath = "/testPagination-" + UUID.randomUUID().toString(); + + zk.create(basePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Set expectedChildren = new HashSet<>(); + + for (int i = 0; i < numChildren; i += 1000) { + Transaction transaction = zk.transaction(); + for (int j = i; j < i + 1000 && j < numChildren; j++) { + String child = UUID.randomUUID().toString(); + String childPath = basePath + "/" + child; + transaction.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + expectedChildren.add(child); + } + transaction.commit(); } + + try { + zk.getChildren(basePath, false); + Assert.fail("Should not succeed to get children because packet length is out of range"); + } catch (KeeperException.ConnectionLossException expected) { + // ConnectionLossException is expected because packet length exceeds jute.maxbuffer + } + + // Paginated API can successfully fetch all the children with pagination. + // If ConnectionLossException is thrown from this method, it possibly means + // the packet length computing formula in DataTree#getPaginatedChildren needs modification. + List actualChildren = zk.getAllChildrenPaginated(basePath, false); + + Assert.assertEquals(numChildren, actualChildren.size()); + Assert.assertEquals(expectedChildren, new HashSet<>(actualChildren)); } } \ No newline at end of file