Skip to content

Commit

Permalink
Merge pull request #546 from apache/methods_to_compute_partition_limits
Browse files Browse the repository at this point in the history
Methods to compute partition limits
  • Loading branch information
leerho authored Apr 14, 2024
2 parents 6be2938 + de4ef79 commit 30fa37e
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 197 deletions.
30 changes: 22 additions & 8 deletions src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ public static <T> KllItemsSketch<T> wrap(

//END of Constructors

@Override
public Class<T> getClassOfT() { return serDe.getClassOfT(); }

@Override
public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
Expand All @@ -156,11 +153,29 @@ public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searc
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public Class<T> getClassOfT() { return serDe.getClassOfT(); }

@Override
public Comparator<? super T> getComparator() {
return comparator;
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
refreshSortedView();
return itemsSV.getPartitionBoundariesFromNumParts(numEquallySizedParts, searchCrit);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartSizeItems,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
refreshSortedView();
return itemsSV.getPartitionBoundaries(numEquallySized, searchCrit);
return itemsSV.getPartitionBoundariesFromPartSize(nominalPartSizeItems, searchCrit);
}

@Override
Expand Down Expand Up @@ -424,9 +439,8 @@ ItemsSketchSortedView<T> getSV() {
quantiles = (T[]) Array.newInstance(serDe.getClassOfT(), numQuantiles);
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
final double normRankErr = getNormalizedRankError(getK(), true);
return new ItemsSketchSortedView(
quantiles, cumWeights, getN(), comparator, getMaxItem(), getMinItem(), normRankErr);
final QuantilesGenericAPI<T> sk = KllItemsSketch.this;
return new ItemsSketchSortedView(quantiles, cumWeights, sk);
}

private void populateFromSketch(final Object[] srcQuantiles, final int[] srcLevels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public List<PartitionBoundsRow<T>> partition(final S sk) {
this.numLevels = (int)max(1, ceil(log(guessNumParts) / log(maxPartsPerSk)));
final int partsPerSk = (int)round(pow(guessNumParts, 1.0 / numLevels));
this.partitionsPerSk = min(partsPerSk, maxPartsPerSk);
final GenericPartitionBoundaries<T> gpb = sk.getPartitionBoundaries(partitionsPerSk, criteria);
final GenericPartitionBoundaries<T> gpb = sk.getPartitionBoundariesFromNumParts(partitionsPerSk, criteria);
final StackElement<T> se = new StackElement<>(gpb, 0, "1");
stack.push(se);
partitionSearch(stack);
Expand All @@ -144,7 +144,7 @@ private void partitionSearch(final ArrayDeque<StackElement<T>> stack) {
if (++se.part <= numParts) {
final PartitionBoundsRow<T> row = new PartitionBoundsRow<>(se);
final S sk = fillReq.getRange(row.lowerBound, row.upperBound, row.rule);
final GenericPartitionBoundaries<T> gpb2 = sk.getPartitionBoundaries(this.partitionsPerSk, criteria);
final GenericPartitionBoundaries<T> gpb2 = sk.getPartitionBoundariesFromNumParts(this.partitionsPerSk, criteria);
final int level = stack.size() + 1;
final String partId = se.levelPartId + "." + se.part + "," + level;
final StackElement<T> se2 = new StackElement<>(gpb2, 0, partId);
Expand Down
33 changes: 21 additions & 12 deletions src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,21 @@ static <T> ItemsSketch<T> copy(final ItemsSketch<T> sketch) {

//END of Constructors

@Override
public Class<T> getClassOfT() { return clazz; }

@Override
public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getCDF(splitPoints, searchCrit);
}

@Override
public Class<T> getClassOfT() { return clazz; }

@Override
public Comparator<? super T> getComparator() {
return comparator_;
}

@Override
public T getMaxItem() {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
Expand All @@ -277,11 +282,21 @@ public T getMinItem() {
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getPartitionBoundaries(numEquallySized, searchCrit);
return classicQisSV.getPartitionBoundariesFromNumParts(numEquallySizedParts, searchCrit);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartSizeItems,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getPartitionBoundariesFromPartSize(nominalPartSizeItems, searchCrit);
}

@Override
Expand Down Expand Up @@ -577,10 +592,6 @@ Object[] getCombinedBuffer() {
return combinedBuffer_;
}

Comparator<? super T> getComparator() {
return comparator_;
}

/**
* Loads the Combined Buffer, min and max from the given items array.
* The Combined Buffer is always in non-compact form and must be pre-allocated.
Expand Down Expand Up @@ -656,9 +667,7 @@ private static <T> ItemsSketchSortedView<T> getSV(final ItemsSketch<T> sk) {
throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
}

final double normRankErr = getNormalizedRankError(sk.getK(), true);
return new ItemsSketchSortedView<>(
svQuantiles, svCumWeights, sk.getN(), comparator, sk.getMaxItem(), sk.getMinItem(), normRankErr);
return new ItemsSketchSortedView<>(svQuantiles, svCumWeights, sk);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.datasketches.common.SketchesStateException;

/**
* Implements PartitionBoundaries
* This defines the returned results of the getParitionBoundaries() function and
* includes the basic methods needed to construct actual partitions.
*/
final public class GenericPartitionBoundaries<T> implements PartitionBoundaries {
public final class GenericPartitionBoundaries<T> {
private long totalN; //totalN of source sketch
private T[] boundaries; //quantiles at the boundaries
private long[] natRanks; //natural ranks at the boundaries
Expand All @@ -36,7 +37,7 @@ final public class GenericPartitionBoundaries<T> implements PartitionBoundaries
private T minItem; //of the source sketch
private QuantileSearchCriteria searchCrit; //of the source sketch query to getPartitionBoundaries.
//computed
private long[] numDeltaItems; //num of items in each part
private long[] numDeltaItems; //num of items in each partition
private int numPartitions; //num of partitions

public GenericPartitionBoundaries(
Expand All @@ -48,15 +49,15 @@ public GenericPartitionBoundaries(
final T minItem,
final QuantileSearchCriteria searchCrit) {
this.totalN = totalN;
this.boundaries = boundaries; //SpotBugs EI_EXPOSE_REP2 copying from sketch class to this "friend" class.
this.boundaries = boundaries; //SpotBugs EI_EXPOSE_REP2 OK: copying from sketch class to this "friend" class.
this.natRanks = natRanks; // "
this.normRanks = normRanks; // "
this.maxItem = maxItem;
this.minItem = minItem;
this.searchCrit = searchCrit;
//check and compute
final int len = boundaries.length;
if (len < 2) { throw new SketchesStateException("Source sketch is empty"); }
if (len < 2) { throw new SketchesStateException("Source sketch is empty"); } //class is final, this is ok
numDeltaItems = new long[len];
numDeltaItems[0] = 0; // index 0 is always 0
for (int i = 1; i < len; i++) {
Expand All @@ -67,7 +68,10 @@ public GenericPartitionBoundaries(
this.numPartitions = len - 1;
}

@Override
/**
* Gets the length of the input stream offered to the underlying sketch.
* @return the length of the input stream offered to the underlying sketch.
*/
public long getN() { return totalN; }

/**
Expand Down Expand Up @@ -100,16 +104,32 @@ public GenericPartitionBoundaries(
*/
public T[] getBoundaries() { return boundaries.clone(); }

@Override
/**
* Gets an ordered array of natural ranks of the associated array of partition boundaries utilizing
* a specified search criterion. Natural ranks are integral values on the interval [1, N]
* @return an array of natural ranks.
*/
public long[] getNaturalRanks() { return natRanks.clone(); }

@Override
/**
* Gets an ordered array of normalized ranks of the associated array of partition boundaries utilizing
* a specified search criterion. Normalized ranks are double values on the interval [0.0, 1.0].
* @return an array of normalized ranks.
*/
public double[] getNormalizedRanks() { return normRanks.clone(); }

@Override
/**
* Gets the number of items to be included for each partition as an array.
* The count at index 0 is 0. The number of items included in the first partition, defined by the boundaries at
* index 0 and index 1, is at index 1 in this array, etc.
* @return the number of items to be included for each partition as an array.
*/
public long[] getNumDeltaItems() { return numDeltaItems.clone(); }

@Override
/**
* Gets the number of partitions
* @return the number of partitions
*/
public int getNumPartitions() { return numPartitions; }

/**
Expand All @@ -130,7 +150,10 @@ public GenericPartitionBoundaries(
*/
public T getMinItem() { return minItem; }

@Override
/**
* Gets the search criteria specified for the source sketch
* @return The search criteria specified for the source sketch
*/
public QuantileSearchCriteria getSearchCriteria() { return searchCrit; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @author Alexander Saydakov
* @author Lee Rhodes
*/
public interface GenericSortedView<T> extends PartitioningFeature<T>, SortedView {
public interface GenericSortedView<T> extends PartitioningFeature<T>, SketchPartitionLimits, SortedView {

/**
* Returns an approximation to the Cumulative Distribution Function (CDF) of the input stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.datasketches.quantilescommon;

import static java.lang.Math.min;
import static org.apache.datasketches.quantilescommon.GenericInequalitySearch.find;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
Expand All @@ -38,43 +39,57 @@
* @author Lee Rhodes
*/
public class ItemsSketchSortedView<T> implements GenericSortedView<T> {
private static final double PARTITIONING_ERROR_FACTOR = 2.0;
private final T[] quantiles;
private final long[] cumWeights; //cumulative natural weights
private final long totalN;
private final Comparator<? super T> comparator;
private final T maxItem;
private final T minItem;
private final Class<T> clazz;
private final double normRankErr;//assumes PMF type error
private final double normRankError;
private final int numRetItems;

/**
* Construct from elements, also used in testing.
* @param quantiles sorted array of quantiles
* @param cumWeights sorted, monotonically increasing cumulative weights.
* @param totalN the total number of items presented to the sketch.
* @param comparator the Comparator for type T
* @param maxItem of type T
* @param minItem of type T
* @param normRankErr the normalized rank error of the originating sketch.
* @param sk the underlying quantile sketch.
*/
@SuppressWarnings("unchecked")
public ItemsSketchSortedView(
final T[] quantiles,
final long[] cumWeights, //or Natural Ranks
final QuantilesGenericAPI<T> sk) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = sk.getN();
this.comparator = sk.getComparator();
this.maxItem = sk.getMaxItem();
this.minItem = sk.getMinItem();
this.clazz = sk.getClassOfT();
this.normRankError = sk.getNormalizedRankError(true);
this.numRetItems = sk.getNumRetained();
}

//Used for testing
ItemsSketchSortedView(
final T[] quantiles,
final long[] cumWeights,
final long totalN,
final Comparator<? super T> comparator,
final T maxItem,
final T minItem,
final double normRankErr) {
final Class<T> clazz,
final double normRankError,
final int numRetItems) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = totalN;
this.comparator = comparator;
this.maxItem = maxItem;
this.minItem = minItem;
this.clazz = (Class<T>)quantiles[0].getClass();
this.normRankErr = normRankErr;
this.clazz = clazz;
this.normRankError = normRankError;
this.numRetItems = numRetItems;
}

//end of constructors
Expand Down Expand Up @@ -118,29 +133,38 @@ public int getNumRetained() {
}

@Override
@SuppressWarnings("unchecked")
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public int getMaxPartitions() {
return (int) min(1.0 / normRankError, numRetItems / 2.0);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartitionSize,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(QuantilesAPI.EMPTY_MSG); }
final long totalN = this.totalN;
final int maxParts = (int) (totalN / Math.ceil(normRankErr * PARTITIONING_ERROR_FACTOR) );
final int svLen = cumWeights.length;

if (numEquallySized > maxParts) {
final long minPartSizeItems = getMinPartitionSizeItems();
if (nominalPartitionSize < minPartSizeItems) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the 'k' of this sketch "
+ "if it exceeds the maximum number of partitions allowed by the error threshold for the 'k' of this sketch."
+ "Requested Partitions: " + numEquallySized + " > " + maxParts);
+ " The requested nominal partition size is too small for this sketch.");
}
if (numEquallySized > svLen / 2.0) {
final long totalN = this.totalN;
final int numEquallySizedParts = (int) min(totalN / minPartSizeItems, getMaxPartitions());
return getPartitionBoundariesFromNumParts(numEquallySizedParts);
}

@Override
@SuppressWarnings("unchecked")
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(QuantilesAPI.EMPTY_MSG); }
final int maxParts = getMaxPartitions();
if (numEquallySizedParts > maxParts) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the number of retained items "
+ "if it exceeds maximum number of retained items divided by 2."
+ "Requested Partitions: " + numEquallySized + " > "
+ "Retained Items / 2: " + (svLen / 2));
+ " The requested number of partitions is too large for this sketch.");
}

final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySizedParts + 1);
final int partArrLen = searchNormRanks.length;
final T[] partQuantiles = (T[]) Array.newInstance(clazz, partArrLen);
final long[] partNatRanks = new long[partArrLen];
Expand All @@ -150,6 +174,7 @@ public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEqually
// which are absolutely required when partitioning, especially inner partitions.

//Are the minItem and maxItem already in place?
final int svLen = cumWeights.length;
int adjLen = svLen; //this will be the length of the local copies of quantiles and cumWeights
final boolean adjLow = quantiles[0] != minItem; //if true, adjust the low end
final boolean adjHigh = quantiles[svLen - 1] != maxItem; //if true, adjust the high end
Expand Down
Loading

0 comments on commit 30fa37e

Please sign in to comment.