Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First implementation of weighted update for KllDoubles. #487

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/main/java/org/apache/datasketches/common/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,19 @@ public static boolean isOdd(final long n) {
return (n & 1L) == 1L;
}

//Other

/**
* Returns a one if the bit at bitPos is a one, otherwise zero.
* @param number the number to examine
* @param bitPos the given zero-based bit position, where the least significant
* bit is at position zero.
* @return a one if the bit at bitPos is a one, otherwise zero.
*/
public static final int bitAt(final long number, final int bitPos) {
return (number & (1L << bitPos)) > 0 ? 1 : 0;
}

/**
* Computes the number of decimal digits of the number n
* @param n the given number
Expand Down Expand Up @@ -756,6 +769,8 @@ public static String intToFixedLengthString(final int number, final int length)
return characterPad(num, length, ' ', false);
}

//Generic tests

/**
* Finds the minimum of two generic items
* @param <T> the type
Expand Down
66 changes: 42 additions & 24 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than createItemsArray() I don't know enough of what goes on in KLL to really get the reason for the other changes and if they're necessary for this PR. Consider those basically unreviewed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I generally prefer comments about what happens inside a method to be above the method, rather than mixed in with the variables for the method. Is this inline-with-variables style a common java approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will detail the changes by file:

  • Util.java:

    • Added bitAt(...) function used by KllHelper.createLevelsArray(), part of weighted update.
  • KllDoublesHelper.java

    • Added createItemsArray(..), part of weighted update
    • The mergeDoubleImpl(..) is complex and difficult to understand what is going on. I added more comments to help clarify its operation. In addition, I found that it needed a subtle change in its algorithm that would have created a problem with our new weighted updates, because weighted updates can create new levels at higher indices than would normally occur with the original update call. I had to improve the calculation of of the upper bound "ub" of "worklevels" to include this possibility. Otherwise, the merge could crash due to inadequate size of the "worklevels" buffer. This also meant calculating the "provisionalNumLevels" before "ub" and not the other way around.
    • The changes to populateDoubleWorkArrays(..) include a comment for additional clarification of what the method is doing and changing some of the variable names to be consistent with CamelBack notation, which is standard in java.
  • KllDoublesSketch.java

    • Added the "public void weightedUpdate(..)" function.
  • KllHeapDoublesSketch.java

    • Added specialized constructor for the temporary sketch created to hold the specially created levels array and items array prior to its merge into the target sketch.
  • KllHelper.java

    • 3 methods had to be converted from "private" to "package-private" to allow for testing: "intCapAux(..)", "intCapAuxAux(..)", and "powersOfThree(..)".
    • The method "getLevelCapacityItems(..)" was redundant and removed.
    • Added "createLevelsArray(..)" which creates the special levels array for weighted updates.
  • KllSketch.java

    • Corrected one comment.
  • QuantilesDoublesAPI.java

    • Corrected one javadoc
  • KllMiscDoublesTest.java

    • Added tests specific to weighted updates
    • Added tests specific to the critical calculations of the merge and compaction processes.
    • For a number of these tests I added print methods, that when printing is enabled, reveal detailed information about the construction of the new weighted update methods as well as more complete information about other critical methods used in merging and compaction processes. For anyone attempting to understand the KLL code, these optional printing methods, that also serve as tests, are critical to understanding the KLL algorithm and how we implemented it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to have code comments as close as possible to the relevant code. This is especially important in large, complex methods. These comments are distinctly different from Javadocs which document what a method does at a high level. Code comments are addressed to the developer, who needs to understand the detailed implementation and might need to modify or translate the code. Javadocs are addressed to the user of the method and generally is not concerned with the details of its implementation, except when it impacts its use.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@
//
final class KllDoublesHelper {

/**
* Create Items Array from given item and weight.
* Used with weighted update only.
* @param item the given item
* @param weight the given weight
* @return the Items Array.
*/
static double[] createItemsArray(final double item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
final double[] itemsArr = new double[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
}

/**
* The following code is only valid in the special case of exactly reaching capacity while updating.
* It cannot be used while merging, while reducing k, or anything else.
Expand Down Expand Up @@ -135,12 +149,12 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch,
}
}

//After the level 0 update, we capture the intermediate state of levels and items arrays...
//After the level 0 update, we capture the intermediate state of my levels and items arrays...
final int myCurNumLevels = mySketch.getNumLevels();
final int[] myCurLevelsArr = mySketch.levelsArr;
final double[] myCurDoubleItemsArr = mySketch.getDoubleItemsArray();

// then rename them and initialize in case there are no higher levels
// create aliases in case there are no higher levels
int myNewNumLevels = myCurNumLevels;
int[] myNewLevelsArr = myCurLevelsArr;
double[] myNewDoubleItemsArr = myCurDoubleItemsArr;
Expand All @@ -150,12 +164,13 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch,
final int tmpSpaceNeeded = mySketch.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final double[] workbuf = new double[tmpSpaceNeeded];
final int ub = KllHelper.ubOnNumLevels(finalN);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

final int ub = max(KllHelper.ubOnNumLevels(finalN), provisionalNumLevels);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

populateDoubleWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurDoubleItemsArr,
otherNumLevels, otherLevelsArr, otherDoubleItemsArr);
Expand Down Expand Up @@ -199,7 +214,7 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch,
KllHelper.memorySpaceMgmt(mySketch, myNewLevelsArr.length, myNewDoubleItemsArr.length);
mySketch.setWritableMemory(wmem);
}
}
} //end of updating levels above level 0

//Update Preamble:
mySketch.setN(finalN);
Expand All @@ -225,7 +240,7 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch,
assert KllHelper.sumTheSampleWeights(mySketch.getNumLevels(), mySketch.levelsArr) == mySketch.getN();
}

private static void mergeSortedDoubleArrays(
private static void mergeSortedDoubleArrays( //only bufC is modified
final double[] bufA, final int startA, final int lenA,
final double[] bufB, final int startB, final int lenB,
final double[] bufC, final int startC) {
Expand Down Expand Up @@ -299,8 +314,7 @@ private static void randomlyHalveUpDoubles(final double[] buf, final int start,
}

//Called from KllDoublesSketch::update and this
static void updateDouble(final KllDoublesSketch dblSk,
final double item) {
static void updateDouble(final KllDoublesSketch dblSk, final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (dblSk.isEmpty()) {
dblSk.setMinItem(item);
Expand Down Expand Up @@ -445,32 +459,36 @@ private static int[] generalDoublesCompress(
return new int[] {numLevels, targetItemCount, currentItemCount};
}

private static void populateDoubleWorkArrays(
final double[] workbuf, final int[] worklevels, final int provisionalNumLevels,
private static void populateDoubleWorkArrays( //workBuf and workLevels are modified
final double[] workBuf, final int[] workLevels, final int provisionalNumLevels,
final int myCurNumLevels, final int[] myCurLevelsArr, final double[] myCurDoubleItemsArr,
final int otherNumLevels, final int[] otherLevelsArr, final double[] otherDoubleItemsArr) {

worklevels[0] = 0;
workLevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self",
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
workLevels[1] = workLevels[0] + selfPopZero;

for (int lvl = 1; lvl < provisionalNumLevels; lvl++) {
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherDoubleItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
mergeSortedDoubleArrays(
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;

if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherDoubleItemsArr, otherLevelsArr[lvl], workBuf, workLevels[lvl], otherPop);
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedDoubleArrays( //only workbuf is modified
myCurDoubleItemsArr, myCurLevelsArr[lvl], selfPop,
otherDoubleItemsArr, otherLevelsArr[lvl], otherPop,
workbuf, worklevels[lvl]);
workBuf, workLevels[lvl]);
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public QuantilesDoublesSketchIterator iterator() {
@Override
public final void merge(final KllSketch other) {
if (readOnly || sketchStructure != UPDATABLE) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
final KllDoublesSketch othDblSk = (KllDoublesSketch)other;
if (othDblSk.isEmpty()) { return; }
final KllDoublesSketch othDblSk = (KllDoublesSketch)other; //check cast first
if (othDblSk.isEmpty()) { return; } //then check empty
KllDoublesHelper.mergeDoubleImpl(this, othDblSk);
kllDoublesSV = null;
}
Expand Down Expand Up @@ -324,6 +324,19 @@ public void update(final double item) {
kllDoublesSV = null;
}

/**
* Updates this sketch with the given item the number of times specified by the given weight.
jmalkin marked this conversation as resolved.
Show resolved Hide resolved
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void weightedUpdate(final double item, final int weight) {
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (Double.isNaN(item)) { return; } //ignore
KllHeapDoublesSketch.weightedUpdateDouble(this, item, weight);
kllDoublesSV = null;
}

//restricted

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
this.doubleItems = new double[k];
}

/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapDoublesSketch(final int k, final int m, final double item, final int weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
jmalkin marked this conversation as resolved.
Show resolved Hide resolved
this.minDoubleItem = item;
this.maxDoubleItem = item;
this.doubleItems = KllDoublesHelper.createItemsArray(item, weight);
}

/**
* Heapify constructor.
* @param srcMem Memory object that contains data serialized by this sketch.
Expand Down Expand Up @@ -282,4 +301,13 @@ void setNumLevels(final int numLevels) {
@Override
void setWritableMemory(final WritableMemory wmem) { }

static void weightedUpdateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
if (weight < dblSk.getLevelsArray(UPDATABLE)[0]) {
for (int i = 0; i < weight; i++) { dblSk.update(item); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
dblSk.merge(tmpSk);
}
}

}
46 changes: 24 additions & 22 deletions src/main/java/org/apache/datasketches/kll/KllHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static java.lang.Math.pow;
import static java.lang.Math.round;
import static org.apache.datasketches.common.Family.KLL;
import static org.apache.datasketches.common.Util.bitAt;
import static org.apache.datasketches.common.Util.floorPowerOf2;
import static org.apache.datasketches.kll.KllPreambleUtil.DATA_START_ADR;
import static org.apache.datasketches.kll.KllPreambleUtil.EMPTY_BIT_MASK;
Expand Down Expand Up @@ -93,7 +94,7 @@ static class LevelStats {
/**
* This is the exact powers of 3 from 3^0 to 3^30 where the exponent is the index
*/
private static long[] powersOfThree =
static long[] powersOfThree =
new long[] {1, 3, 9, 27, 81, 243, 729, 2187, 6561, 19683, 59049, 177147, 531441,
1594323, 4782969, 14348907, 43046721, 129140163, 387420489, 1162261467,
3486784401L, 10460353203L, 31381059609L, 94143178827L, 282429536481L,
Expand Down Expand Up @@ -150,6 +151,23 @@ public static long convertToCumulative(final long[] array) {
return subtotal;
}

/**
* Create the Levels Array from given weight
* Used with weighted update only.
* @param weight the given weight
* @return the Levels Array
*/
static int[] createLevelsArray(final int weight) {
final int numLevels = 32 - Integer.numberOfLeadingZeros(weight);
final int[] levelsArr = new int[numLevels + 1]; //always one more than numLevels
int itemsArrIndex = 0;
levelsArr[0] = itemsArrIndex;
for (int level = 0; level < numLevels; level++) {
levelsArr[level + 1] = itemsArrIndex += bitAt(weight, level);
}
return levelsArr;
}

static int currentLevelSizeItems(final int level, final int numLevels, final int[] levels) {
if (level >= numLevels) { return 0; }
return levels[level + 1] - levels[level];
Expand Down Expand Up @@ -180,7 +198,9 @@ static LevelStats getFinalSketchStatsAtNumLevels(
printf("%6s %8s %12s %18s %18s\n", "Level", "Items", "CumItems", "N at Level", "CumN");
}
for (int level = 0; level < numLevels; level++) {
final LevelStats lvlStats = getLevelCapacityItems(k, m, numLevels, level);
final int items = KllHelper.levelCapacity(k, numLevels, level, m);
final long n = (long)items << level;
final LevelStats lvlStats = new LevelStats(n, numLevels, items);
cumItems += lvlStats.numItems;
cumN += lvlStats.n;
if (printSketchStructure) {
Expand Down Expand Up @@ -257,24 +277,6 @@ static int getKFromEpsilon(final double epsilon, final boolean pmf) {
return max(KllSketch.MIN_M, min(KllSketch.MAX_K, k));
}

/**
* Given k, m, numLevels, this computes the item capacity of a single level.
* @param k the given user sketch configuration parameter
* @param m the given user sketch configuration parameter
* @param numLevels the given number of levels of the sketch
* @param level the specific level to compute its item capacity
* @return LevelStats with the computed N and items for the given level.
*/
static LevelStats getLevelCapacityItems(
final int k,
final int m,
final int numLevels,
final int level) {
final int items = KllHelper.levelCapacity(k, numLevels, level, m);
final long n = (long)items << level;
return new LevelStats(n, numLevels, items);
}

/**
* Gets the normalized rank error given k and pmf.
* Static method version of the <i>getNormalizedRankError(boolean)</i>.
Expand Down Expand Up @@ -696,7 +698,7 @@ static int findLevelToCompact(final int k, final int m, final int numLevels, fin
* @param depth the zero-based index of the level being computed.
* @return the actual capacity of a given level given its depth index.
*/
private static long intCapAux(final int k, final int depth) {
static long intCapAux(final int k, final int depth) {
if (depth <= 30) { return intCapAuxAux(k, depth); }
final int half = depth / 2;
final int rest = depth - half;
Expand All @@ -710,7 +712,7 @@ private static long intCapAux(final int k, final int depth) {
* @param depth the zero-based index of the level being computed. The max depth is 30!
* @return the actual capacity of a given level given its depth index.
*/
private static long intCapAuxAux(final long k, final int depth) {
static long intCapAuxAux(final long k, final int depth) {
final long twok = k << 1; // for rounding at the end, pre-multiply by 2 here, divide by 2 during rounding.
final long tmp = ((twok << depth) / powersOfThree[depth]); //2k* (2/3)^depth. 2k also keeps the fraction larger.
final long result = ((tmp + 1L) >>> 1); // (tmp + 1)/2. If odd, round up. This guarantees an integer.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/apache/datasketches/kll/KllSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public abstract class KllSketch implements QuantilesAPI {
final SketchType sketchType;
final SketchStructure sketchStructure;
boolean readOnly;
int[] levelsArr; //Always writable form
int[] levelsArr; //Always updatable form

/**
* Constructor for on-heap and off-heap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ default double[] getRanks(double[] quantiles) {

/**
* Updates this sketch with the given item.
* @param item from a stream of quantiles. NaNs are ignored.
* @param item from a stream of items. NaNs are ignored.
*/
void update(double item);

Expand Down
Loading