Skip to content

Commit

Permalink
Merge pull request #497 from apache/weighted_updates_for_kll_floats_s…
Browse files Browse the repository at this point in the history
…ketch

Implement weighted updates to KllFloatsSketch
  • Loading branch information
leerho authored Jan 24, 2024
2 parents 7c94738 + 8614d45 commit b078525
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 50 deletions.
73 changes: 48 additions & 25 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.datasketches.common.Util.isEven;
import static org.apache.datasketches.common.Util.isOdd;
import static org.apache.datasketches.kll.KllHelper.findLevelToCompact;
import static org.apache.datasketches.kll.KllSketch.DEFAULT_M;

import java.util.Arrays;
import java.util.Random;
Expand All @@ -39,6 +40,20 @@
//
final class KllFloatsHelper {

/**
* Create Items Array from given item and weight.
* Used with weighted update only.
* @param item the given item
* @param weight the given weight
* @return the Items Array.
*/
static float[] createItemsArray(final float item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
final float[] itemsArr = new float[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
}

/**
* The following code is only valid in the special case of exactly reaching capacity while updating.
* It cannot be used while merging, while reducing k, or anything else.
Expand Down Expand Up @@ -107,7 +122,7 @@ private static void compressWhileUpdatingSketch(final KllFloatsSketch fltSk) {
fltSk.setFloatItemsArray(myFloatItemsArr);
}

//assumes readOnly = false, and UPDATABLE, called from KllFloatsSketch::merge
//assumes readOnly = false and UPDATABLE, called from KllFloatsSketch::merge
static void mergeFloatImpl(final KllFloatsSketch mySketch,
final KllFloatsSketch otherFltSk) {
if (otherFltSk.isEmpty()) { return; }
Expand Down Expand Up @@ -140,7 +155,7 @@ static void mergeFloatImpl(final KllFloatsSketch mySketch,
final int[] myCurLevelsArr = mySketch.levelsArr;
final float[] myCurFloatItemsArr = mySketch.getFloatItemsArray();

// then rename them and initialize in case there are no higher levels
// create aliases in case there are no higher levels
int myNewNumLevels = myCurNumLevels;
int[] myNewLevelsArr = myCurLevelsArr;
float[] myNewFloatItemsArr = myCurFloatItemsArr;
Expand All @@ -150,12 +165,13 @@ static void mergeFloatImpl(final KllFloatsSketch mySketch,
final int tmpSpaceNeeded = mySketch.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final float[] workbuf = new float[tmpSpaceNeeded];
final int ub = KllHelper.ubOnNumLevels(finalN);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

final int ub = max(KllHelper.ubOnNumLevels(finalN), provisionalNumLevels);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

populateFloatWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurFloatItemsArr,
otherNumLevels, otherLevelsArr, otherFloatItemsArr);
Expand Down Expand Up @@ -199,7 +215,7 @@ static void mergeFloatImpl(final KllFloatsSketch mySketch,
KllHelper.memorySpaceMgmt(mySketch, myNewLevelsArr.length, myNewFloatItemsArr.length);
mySketch.setWritableMemory(wmem);
}
}
} //end of updating levels above level 0

//Update Preamble:
mySketch.setN(finalN);
Expand Down Expand Up @@ -298,31 +314,34 @@ private static void randomlyHalveUpFloats(final float[] buf, final int start, fi
}
}

//Called from KllFloatsSketch::update and this
static void updateFloat(final KllFloatsSketch fltSk,
final float item) {
if (Float.isNaN(item)) { return; } //ignore
if (fltSk.isEmpty()) {
fltSk.setMinItem(item);
fltSk.setMaxItem(item);
} else {
fltSk.setMinItem(min(fltSk.getMinItem(), item));
fltSk.setMaxItem(max(fltSk.getMaxItem(), item));
}
int level0space = fltSk.levelsArr[0];
assert level0space >= 0;
if (level0space == 0) {
//Called from KllFloatsSketch::update and merge
static void updateFloat(final KllFloatsSketch fltSk, final float item) {
fltSk.updateMinMax(item);
int freeSpace = fltSk.levelsArr[0];
assert freeSpace >= 0;
if (freeSpace == 0) {
compressWhileUpdatingSketch(fltSk);
level0space = fltSk.levelsArr[0];
assert (level0space > 0);
freeSpace = fltSk.levelsArr[0];
assert (freeSpace > 0);
}
fltSk.incN();
fltSk.setLevelZeroSorted(false);
final int nextPos = level0space - 1;
final int nextPos = freeSpace - 1;
fltSk.setLevelsArrayAt(0, nextPos);
fltSk.setFloatItemsArrayAt(nextPos, item);
}

//Called from KllFloatsSketch::update with weight
static void updateFloat(final KllFloatsSketch fltSk, final float item, final int weight) {
if (weight < fltSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateFloat(fltSk, item); }
} else {
fltSk.updateMinMax(item);
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);
fltSk.merge(tmpSk);
}
}

/**
* Compression algorithm used to merge higher levels.
* <p>Here is what we do for each level:</p>
Expand Down Expand Up @@ -453,6 +472,7 @@ private static void populateFloatWorkArrays(
worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;
Expand All @@ -462,11 +482,14 @@ private static void populateFloatWorkArrays(
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop == 0 && otherPop == 0) { continue; }
if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedFloatArrays(
myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop,
otherFloatItemsArr, otherLevelsArr[lvl], otherPop,
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,26 @@ public String toString(final boolean withLevels, final boolean withLevelsAndItem

@Override
public void update(final float item) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
KllFloatsHelper.updateFloat(this, item);
kllFloatsSV = null;
}

/**
* Weighted update. Updates this sketch with the given item the number of times specified by the given integer weight.
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void update(final float item, final int weight) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllFloatsHelper.updateFloat(this, item); }
else { KllFloatsHelper.updateFloat(this, item, weight); }
kllFloatsSV = null;
}

//restricted

/**
Expand Down Expand Up @@ -390,4 +405,14 @@ private final void refreshSortedView() {

abstract void setMinItem(float item);

void updateMinMax(final float item) {
if (isEmpty()) {
setMinItem(item);
setMaxItem(item);
} else {
setMinItem(min(getMinItem(), item));
setMaxItem(max(getMaxItem(), item));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public KllFloatsSketchSortedView(final KllFloatsSketch sketch) {
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
}

//end of constructors

@Override
public long[] getCumulativeWeights() {
return cumWeights.clone();
Expand Down Expand Up @@ -114,7 +112,7 @@ public float getQuantile(final double rank, final QuantileSearchCriteria searchC
final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
if (index == -1) {
return quantiles[quantiles.length - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
}
return quantiles[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
this.floatItems = new float[k];
}

/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapFloatsSketch(final int k, final int m, final float item, final int weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minFloatItem = item;
this.maxFloatItem = item;
this.floatItems = KllFloatsHelper.createItemsArray(item, weight);
}

/**
* Heapify constructor.
* @param srcMem Memory object that contains data serialized by this sketch.
Expand Down Expand Up @@ -280,6 +299,6 @@ float[] getFloatRetainedItemsArray() {
}

@Override
void setWritableMemory(final WritableMemory wmem) { } //inheritance dummy
void setWritableMemory(final WritableMemory wmem) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class KllDoublesValidationTest {

private static int[] makeInputArray(int n, int stride) {
assert isOdd(stride);
int mask = (1 << 23) - 1;
int mask = (1 << 23) - 1; // because library items are single-precision floats
int cur = 0;
int[] arr = new int[n];
for (int i = 0; i < n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ public void manyValuesEstimationMode() {
assertEquals(pmf[0], 0.5, PMF_EPS_FOR_K_256);
assertEquals(pmf[1], 0.5, PMF_EPS_FOR_K_256);

assertEquals(sketch.getMinItem(), 0f);
assertEquals(sketch.getMaxItem(), n - 1f);
assertEquals(sketch.getMinItem(), 0f); // min value is exact
assertEquals(sketch.getMaxItem(), n - 1f); // max value is exact

// check at every 0.1 percentage point
final double[] fractions = new double[1001];
Expand Down
26 changes: 13 additions & 13 deletions src/test/java/org/apache/datasketches/kll/KllMiscDoublesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,32 @@ public void visualCheckToString() {
public void viewHeapCompactions() {
int k = 20;
int n = 108;
boolean withSummary = false;
boolean withDetail = true;
boolean withLevels = false;
boolean withLevelsAndItems = true;
int compaction = 0;
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k);
for (int i = 1; i <= n; i++) {
sk.update(i);
if (sk.levelsArr[0] == 0) {
println(LS + "#<<< BEFORE COMPACTION # " + (++compaction) + " >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
sk.update(++i);
println(LS + "#<<< AFTER COMPACTION # " + (compaction) + " >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
assertEquals(sk.getDoubleItemsArray()[sk.levelsArr[0]], i);
}
}
println(LS + "#<<< END STATE # >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
println("");
}

@Test //set static enablePrinting = true for visual checking
public void viewDirectCompactions() {
int k = 20;
int n = 108;
boolean withSummary = false;
boolean withDetail = true;
boolean withLevels = false;
boolean withLevelsAndItems = true;
int compaction = 0;
int sizeBytes = KllSketch.getMaxSerializedSizeBytes(k, n, DOUBLES_SKETCH, true);
WritableMemory wmem = WritableMemory.allocate(sizeBytes);
Expand All @@ -207,15 +207,15 @@ public void viewDirectCompactions() {
sk.update(i);
if (sk.levelsArr[0] == 0) {
println(LS + "#<<< BEFORE COMPACTION # " + (++compaction) + " >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
sk.update(++i);
println(LS + "#<<< AFTER COMPACTION # " + (compaction) + " >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
assertEquals(sk.getDoubleItemsArray()[sk.levelsArr[0]], i);
}
}
println(LS + "#<<< END STATE # >>>");
println(sk.toString(withSummary, withDetail));
println(sk.toString(withLevels, withLevelsAndItems));
println("");
}

Expand Down Expand Up @@ -341,14 +341,14 @@ private static void outputLevels(int weight, int[] levelsArr) {
public void viewMemorySketchData() {
int k = 20;
int n = 109;
boolean withSummary = true;
boolean withDetail = true;
boolean withLevels = true;
boolean withLevelsAndItems = true;
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k);
for (int i = 1; i <= n; i++) { sk.update(i); }
byte[] byteArr = sk.toByteArray();
Memory mem = Memory.wrap(byteArr);
KllDoublesSketch ddSk = KllDoublesSketch.wrap(mem);
println(ddSk.toString(withSummary, withDetail));
println(ddSk.toString(withLevels, withLevelsAndItems));
assertEquals(ddSk.getN(), n);
}

Expand Down
Loading

0 comments on commit b078525

Please sign in to comment.