Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kll weighted updates group2 #488

Merged
merged 4 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public long getN() {

//restricted

@Override //returns updatable, expanded array including empty/garbage space at bottom
@Override //returns updatable, expanded array including free space at bottom
double[] getDoubleItemsArray() {
final int k = getK();
if (sketchStructure == COMPACT_EMPTY) { return new double[k]; }
Expand All @@ -196,7 +196,7 @@ public long getN() {
return doubleItemsArr;
}

@Override //returns compact items array of retained items, no empty/garbage.
@Override //returns compact items array of retained items, no free space.
double[] getDoubleRetainedItemsArray() {
if (sketchStructure == COMPACT_EMPTY) { return new double[0]; }
if (sketchStructure == COMPACT_SINGLE) { return new double[] { getDoubleSingleItem() }; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public long getN() {

//restricted

@Override //returns updatable, expanded array including empty/garbage space at bottom
@Override //returns updatable, expanded array including free space at bottom
float[] getFloatItemsArray() {
final int k = getK();
if (sketchStructure == COMPACT_EMPTY) { return new float[k]; }
Expand All @@ -196,7 +196,7 @@ float[] getFloatItemsArray() {
return floatItemsArr;
}

@Override //returns compact items array of retained items, no empty/garbage.
@Override //returns compact items array of retained items, no free space.
float[] getFloatRetainedItemsArray() {
if (sketchStructure == COMPACT_EMPTY) { return new float[0]; }
if (sketchStructure == COMPACT_SINGLE) { return new float[] { getFloatSingleItem() }; }
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.datasketches.common.Util.isEven;
import static org.apache.datasketches.common.Util.isOdd;
import static org.apache.datasketches.kll.KllHelper.findLevelToCompact;
import static org.apache.datasketches.kll.KllSketch.DEFAULT_M;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.util.Arrays;
import java.util.Random;
Expand Down Expand Up @@ -313,30 +315,31 @@ private static void randomlyHalveUpDoubles(final double[] buf, final int start,
}
}

//Called from KllDoublesSketch::update and this
//Called from KllDoublesSketch::update and merge
static void updateDouble(final KllDoublesSketch dblSk, final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (dblSk.isEmpty()) {
dblSk.setMinItem(item);
dblSk.setMaxItem(item);
} else {
dblSk.setMinItem(min(dblSk.getMinItem(), item));
dblSk.setMaxItem(max(dblSk.getMaxItem(), item));
}
int level0space = dblSk.levelsArr[0];
assert (level0space >= 0);
if (level0space == 0) {
int freeSpace = dblSk.levelsArr[0];
assert (freeSpace >= 0);
if (freeSpace == 0) {
compressWhileUpdatingSketch(dblSk);
level0space = dblSk.levelsArr[0];
assert (level0space > 0);
freeSpace = dblSk.levelsArr[0];
assert (freeSpace > 0);
}
dblSk.incN();
dblSk.setLevelZeroSorted(false);
final int nextPos = level0space - 1;
final int nextPos = freeSpace - 1;
dblSk.setLevelsArrayAt(0, nextPos);
dblSk.setDoubleItemsArrayAt(nextPos, item);
}

static void updateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
if (weight < dblSk.getLevelsArray(UPDATABLE)[0]) {
for (int i = 0; i < weight; i++) { dblSk.update(item); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
dblSk.merge(tmpSk);
}
}

/**
* Compression algorithm used to merge higher levels.
* <p>Here is what we do for each level:</p>
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,21 @@ public byte[] toByteArray() {
}

@Override
public String toString(final boolean withSummary, final boolean withData) {
public String toString(final boolean withSummary, final boolean withDetail) {
KllSketch sketch = this;
if (withData && sketchStructure != UPDATABLE) {
if (withDetail && sketchStructure != UPDATABLE) {
final Memory mem = getWritableMemory();
assert mem != null;
sketch = KllDoublesSketch.heapify(getWritableMemory());
}
return KllHelper.toStringImpl(sketch, withSummary, withData, getSerDe());
return KllHelper.toStringImpl(sketch, withSummary, withDetail, getSerDe());
}

@Override
public void update(final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
updateMinMax(item);
KllDoublesHelper.updateDouble(this, item);
kllDoublesSV = null;
}
Expand All @@ -329,11 +331,13 @@ public void update(final double item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void weightedUpdate(final double item, final int weight) {
public void update(final double item, final int weight) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (Double.isNaN(item)) { return; } //ignore
KllHeapDoublesSketch.weightedUpdateDouble(this, item, weight);
updateMinMax(item);
KllDoublesHelper.updateDouble(this, item, weight);
kllDoublesSV = null;
}

Expand Down Expand Up @@ -403,4 +407,14 @@ private final void refreshSortedView() {

abstract void setMinItem(double item);

private void updateMinMax(final double item) {
if (isEmpty()) {
setMinItem(item);
setMaxItem(item);
} else {
setMinItem(min(getMinItem(), item));
setMaxItem(max(getMaxItem(), item));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KllDoublesSketchSortedView(final KllDoublesSketch sketch) {
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = new double[numQuantiles];
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KllFloatsSketchSortedView(final KllFloatsSketch sketch) {
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = new float[numQuantiles];
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ else if (memStructure == COMPACT_FULL) {
maxDoubleItem = srcMem.getDouble(offsetBytes);
offsetBytes += Double.BYTES;
final int capacityItems = levelsArr[getNumLevels()];
final int garbageItems = levelsArr[0];
final int retainedItems = capacityItems - garbageItems;
final int freeSpace = levelsArr[0];
final int retainedItems = capacityItems - freeSpace;
doubleItems = new double[capacityItems];
srcMem.getDoubleArray(offsetBytes, doubleItems, garbageItems, retainedItems);
srcMem.getDoubleArray(offsetBytes, doubleItems, freeSpace, retainedItems);
}
else { //(memStructure == UPDATABLE)
int offsetBytes = DATA_START_ADR;
Expand Down Expand Up @@ -301,13 +301,4 @@ void setNumLevels(final int numLevels) {
@Override
void setWritableMemory(final WritableMemory wmem) { }

static void weightedUpdateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
if (weight < dblSk.getLevelsArray(UPDATABLE)[0]) {
for (int i = 0; i < weight; i++) { dblSk.update(item); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
dblSk.merge(tmpSk);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ else if (memStructure == COMPACT_FULL) {
maxFloatItem = srcMem.getFloat(offsetBytes);
offsetBytes += Float.BYTES;
final int capacityItems = levelsArr[getNumLevels()];
final int garbageItems = levelsArr[0];
final int retainedItems = capacityItems - garbageItems;
final int freeSpace = levelsArr[0];
final int retainedItems = capacityItems - freeSpace;
floatItems = new float[capacityItems];
srcMem.getFloatArray(offsetBytes, floatItems, garbageItems, retainedItems);
srcMem.getFloatArray(offsetBytes, floatItems, freeSpace, retainedItems);
}
else { //(memStructure == UPDATABLE)
int offsetBytes = DATA_START_ADR;
Expand Down
122 changes: 70 additions & 52 deletions src/main/java/org/apache/datasketches/kll/KllHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,10 @@ private static String outputData(final KllSketch sketch) {
final int k = sketch.getK();
final int m = sketch.getM();
final StringBuilder sb = new StringBuilder();
sb.append("### KllSketch itemsArray & levelsArray data:").append(LS);
sb.append(LS + "### KLL ItemsArray & LevelsArray Detail:").append(LS);
sb.append("Index, Value").append(LS);
if (levelsArr[0] > 0) {
final String gbg = " Empty or Garbage, size = " + levelsArr[0];
final String gbg = " Free Space, Size = " + levelsArr[0];
for (int i = 0; i < levelsArr[0]; i++) {
sb.append(" ").append(i + ", ").append(sketch.getItemAsString(i));
if (i == 0) { sb.append(gbg); }
Expand All @@ -381,10 +381,10 @@ private static String outputData(final KllSketch sketch) {
final int toIndex = levelsArr[level + 1]; // exclusive
String lvlData = "";
if (fromIndex < toIndex) {
lvlData = " level[" + level + "]=" + levelsArr[level]
+ ", cap=" + KllHelper.levelCapacity(k, numLevels, level, m)
+ ", size=" + KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)
+ ", wt=" + (1 << level) + LS;
lvlData = " Level[" + level + "]=" + levelsArr[level]
+ ", Cap=" + KllHelper.levelCapacity(k, numLevels, level, m)
+ ", Size=" + KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)
+ ", Wt=" + (1 << level) + LS;
}

for (int i = fromIndex; i < toIndex; i++) {
Expand All @@ -393,10 +393,25 @@ private static String outputData(final KllSketch sketch) {
}
level++;
}
sb.append(" ----------level[" + level + "]=" + levelsArr[level] + ": itemsArray[].length");
sb.append(" ----------Level[" + level + "]=" + levelsArr[level] + ": ItemsArray[].length");
sb.append(LS);
sb.append("### End data").append(LS);
sb.append("### End ItemsArray & LevelsArray Detail").append(LS);
return sb.toString();
}

static String outputLevels(final int k, final int m, final int numLevels, final int[] levelsArr) {
final StringBuilder sb = new StringBuilder();
sb.append(LS + "### KLL Levels Array:").append(LS)
.append(" Level, Offset: Nominal Capacity, Actual Capacity").append(LS);
int level = 0;
for ( ; level < numLevels; level++) {
sb.append(" ").append(level).append(", ").append(levelsArr[level]).append(": ")
.append(KllHelper.levelCapacity(k, numLevels, level, m))
.append(", ").append(KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)).append(LS);
}
sb.append(" ").append(level).append(", ").append(levelsArr[level]).append(": ----ItemsArray[].length")
.append(LS);
sb.append("### End Levels Array").append(LS);
return sb.toString();
}

Expand Down Expand Up @@ -479,55 +494,58 @@ static byte[] toByteArray(final KllSketch srcSk, final boolean updatable) {

static <T> String toStringImpl(final KllSketch sketch, final boolean withSummary, final boolean withData,
final ArrayOfItemsSerDe<T> serDe) {
final SketchType sketchType = sketch.sketchType;
final boolean hasMemory = sketch.hasMemory();
final StringBuilder sb = new StringBuilder();
final int k = sketch.getK();
final int m = sketch.getM();
final long n = sketch.getN();
final int numLevels = sketch.getNumLevels();
final int[] fullLevelsArr = sketch.getLevelsArray(UPDATABLE);
//final int[] levelsArr = sketch.getLevelsArray(sketch.sketchStructure);
final String epsPct = String.format("%.3f%%", sketch.getNormalizedRankError(false) * 100);
final String epsPMFPct = String.format("%.3f%%", sketch.getNormalizedRankError(true) * 100);
final boolean compact = sketch.isCompactMemoryFormat();

final StringBuilder sb = new StringBuilder();
final String directStr = hasMemory ? "Direct" : "";
final String compactStr = compact ? "Compact" : "";
final String readOnlyStr = sketch.isReadOnly() ? "true" + ("(" + (compact ? "Format" : "Memory") + ")") : "false";
final String skTypeStr = sketchType.getName();
final String className = "Kll" + directStr + compactStr + skTypeStr;

sb.append(LS).append("### ").append(className).append(" Summary:").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" Dynamic min K : ").append(sketch.getMinK()).append(LS);
sb.append(" M : ").append(m).append(LS);
sb.append(" N : ").append(n).append(LS);
sb.append(" Epsilon : ").append(epsPct).append(LS);
sb.append(" Epsilon PMF : ").append(epsPMFPct).append(LS);
sb.append(" Empty : ").append(sketch.isEmpty()).append(LS);
sb.append(" Estimation Mode : ").append(sketch.isEstimationMode()).append(LS);
sb.append(" Levels : ").append(numLevels).append(LS);
sb.append(" Level 0 Sorted : ").append(sketch.isLevelZeroSorted()).append(LS);
sb.append(" Capacity Items : ").append(fullLevelsArr[numLevels]).append(LS);
sb.append(" Retained Items : ").append(sketch.getNumRetained()).append(LS);
sb.append(" Empty/Garbage Items : ").append(sketch.levelsArr[0]).append(LS);
sb.append(" ReadOnly : ").append(readOnlyStr).append(LS);
if (sketchType != ITEMS_SKETCH) {
sb.append(" Updatable Storage Bytes: ").append(sketch.currentSerializedSizeBytes(true)).append(LS);
}
sb.append(" Compact Storage Bytes : ").append(sketch.currentSerializedSizeBytes(false)).append(LS);

final String emptyStr = (sketchType == ITEMS_SKETCH) ? "Null" : "NaN";

sb.append(" Min Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMinItemAsString())
.append(LS);
sb.append(" Max Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMaxItemAsString())
.append(LS);
sb.append("### End sketch summary").append(LS);

if (! withSummary) { sb.setLength(0); }
if (withData) { sb.append(outputData(sketch)); }
if (withSummary) {
final SketchType sketchType = sketch.sketchType;
final boolean hasMemory = sketch.hasMemory();
final long n = sketch.getN();
final String epsPct = String.format("%.3f%%", sketch.getNormalizedRankError(false) * 100);
final String epsPMFPct = String.format("%.3f%%", sketch.getNormalizedRankError(true) * 100);
final boolean compact = sketch.isCompactMemoryFormat();

final String directStr = hasMemory ? "Direct" : "";
final String compactStr = compact ? "Compact" : "";
final String readOnlyStr = sketch.isReadOnly() ? "true" + ("(" + (compact ? "Format" : "Memory") + ")") : "false";
final String skTypeStr = sketchType.getName();
final String className = "Kll" + directStr + compactStr + skTypeStr;

sb.append(LS + "### ").append(className).append(" Summary:").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" Dynamic min K : ").append(sketch.getMinK()).append(LS);
sb.append(" M : ").append(m).append(LS);
sb.append(" N : ").append(n).append(LS);
sb.append(" Epsilon : ").append(epsPct).append(LS);
sb.append(" Epsilon PMF : ").append(epsPMFPct).append(LS);
sb.append(" Empty : ").append(sketch.isEmpty()).append(LS);
sb.append(" Estimation Mode : ").append(sketch.isEstimationMode()).append(LS);
sb.append(" Levels : ").append(numLevels).append(LS);
sb.append(" Level 0 Sorted : ").append(sketch.isLevelZeroSorted()).append(LS);
sb.append(" Capacity Items : ").append(fullLevelsArr[numLevels]).append(LS);
sb.append(" Retained Items : ").append(sketch.getNumRetained()).append(LS);
sb.append(" Free Space : ").append(sketch.levelsArr[0]).append(LS);
sb.append(" ReadOnly : ").append(readOnlyStr).append(LS);
if (sketchType != ITEMS_SKETCH) {
sb.append(" Updatable Storage Bytes: ").append(sketch.currentSerializedSizeBytes(true)).append(LS);
}
sb.append(" Compact Storage Bytes : ").append(sketch.currentSerializedSizeBytes(false)).append(LS);

final String emptyStr = (sketchType == ITEMS_SKETCH) ? "Null" : "NaN";

sb.append(" Min Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMinItemAsString())
.append(LS);
sb.append(" Max Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMaxItemAsString())
.append(LS);
sb.append("### End sketch summary").append(LS);
}
if (withData) {
sb.append(outputLevels(k, m, numLevels, fullLevelsArr));
sb.append(outputData(sketch));
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private final KllItemsSketchSortedView<T> refreshSortedView() {

/**
* @return a full array of items as if the sketch was in COMPACT_FULL or UPDATABLE format.
* This will include zeros and possibly some garbage items.
* This will include zeros and possibly some free space.
*/
abstract T[] getTotalItemsArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class KllItemsSketchSortedView<T> implements GenericSortedView<T>, Partit
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = (T[]) Array.newInstance(sketch.serDe.getClassOfT(), numQuantiles);
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Loading