Skip to content

Commit

Permalink
Implemented weighted updates for KllItemsSketch.
Browse files Browse the repository at this point in the history
Made a few very minor adjustments to some of the parallel floats and
doubles classes.
  • Loading branch information
leerho committed Jan 25, 2024
1 parent e100634 commit 84c9cb2
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ public int sizeOf(final T[] items) {
* Returns the concrete class of type T
* @return the concrete class of type T
*/
public abstract Class<?> getClassOfT();
public abstract Class<T> getClassOfT();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllDoublesSketch
Expand Down Expand Up @@ -337,6 +338,7 @@ static void updateDouble(final KllDoublesSketch dblSk, final double item, final
} else {
dblSk.updateMinMax(item);
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);

dblSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -470,7 +472,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif

workLevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self",
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
Expand All @@ -480,7 +482,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllFloatsSketch
Expand Down Expand Up @@ -337,6 +338,7 @@ static void updateFloat(final KllFloatsSketch fltSk, final float item, final int
} else {
fltSk.updateMinMax(item);
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);

fltSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -470,7 +472,7 @@ private static void populateFloatWorkArrays(

worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
Expand All @@ -480,7 +482,7 @@ private static void populateFloatWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/org/apache/datasketches/kll/KllHeapItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.lang.reflect.Array;
import java.util.Comparator;
Expand All @@ -34,6 +35,14 @@
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;

/**
* This class implements an on-heap doubles KllSketch.
*
* <p>Please refer to the documentation in the package-info:<br>
* {@link org.apache.datasketches.kll}</p>
*
* @author Lee Rhodes, Kevin Lang
*/
@SuppressWarnings("unchecked")
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private final int k; // configured size of K.
Expand All @@ -46,14 +55,17 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private Object[] itemsArr;

/**
* Constructs a new empty instance of this sketch on the Java heap.
* New instance heap constructor.
* @param k parameter that controls size of the sketch and accuracy of estimates.
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
* @param comparator user specified comparator of type T.
* @param serDe serialization / deserialization class
*/
KllHeapItemsSketch(
final int k,
final int m,
final Comparator<? super T> comparator,
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(SketchStructure.UPDATABLE, comparator, serDe);
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = new int[] {k, k};
Expand All @@ -69,11 +81,27 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
}

/**
* The Heapify constructor, which constructs an image of this sketch from
* a Memory (or WritableMemory) object that was created by this sketch
* and has a type T consistent with the given comparator and serDe.
* Once the data from the given Memory has been transferred into this heap sketch,
* the reference to the Memory object is no longer retained.
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapItemsSketch(final int k, final int m, final T item, final int weight, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = item;
this.maxItem = item;
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
}

/**
* The Heapify constructor
* @param srcMem the Source Memory image that contains data.
* @param comparator the comparator for this sketch and given Memory.
* @param serDe the serializer / deserializer for this sketch and the given Memory.
Expand Down
80 changes: 47 additions & 33 deletions src/main/java/org/apache/datasketches/kll/KllItemsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.reflect.Array.newInstance;
import static org.apache.datasketches.common.Util.isEven;
import static org.apache.datasketches.common.Util.isOdd;
import static org.apache.datasketches.kll.KllHelper.findLevelToCompact;
import static org.apache.datasketches.kll.KllSketch.DEFAULT_M;

import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -37,7 +39,7 @@
* @author Lee Rhodes
*/
@SuppressWarnings("unchecked")
final class KllItemsHelper<T> {
final class KllItemsHelper {

/**
* Create Items Array from given item and weight.
Expand All @@ -46,11 +48,11 @@ final class KllItemsHelper<T> {
* @param weight the given weight
* @return the Items Array.
*/
static <T> T[] createItemsArray(final T item, final int weight) {
static <T> T[] createItemsArray(final Class<T> clazz, final T item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
final Object[] itemsArr = new Object[itemsArrLen];
final T[] itemsArr = (T[])newInstance(clazz, itemsArrLen);
Arrays.fill(itemsArr, item);
return (T[]) itemsArr;
return itemsArr;
}

/**
Expand Down Expand Up @@ -140,12 +142,12 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

//MERGE: update this sketch with level0 items from the other sketch
if (otherItmSk.isCompactSingleItem()) {
updateItem(mySketch, otherItmSk.getSingleItem(), comp);
updateItem(mySketch, otherItmSk.getSingleItem());
otherItemsArr = new Object[0];
} else {
otherItemsArr = otherItmSk.getTotalItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
updateItem(mySketch, otherItemsArr[i], comp);
updateItem(mySketch, otherItemsArr[i]);
}
}

Expand All @@ -164,12 +166,13 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,
final int tmpSpaceNeeded = mySketch.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final Object[] workbuf = new Object[tmpSpaceNeeded];
final int ub = KllHelper.ubOnNumLevels(finalN);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

final int ub = max(KllHelper.ubOnNumLevels(finalN), provisionalNumLevels);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

populateItemWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurItemsArr,
otherNumLevels, otherLevelsArr, otherItemsArr, comp);
Expand Down Expand Up @@ -209,7 +212,11 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

//MEMORY SPACE MANAGEMENT
//not used
}
//extra spaces to make comparison with other helpers easier
//
//
//
} //end of updating levels above level 0

//Update Preamble:
mySketch.setN(finalN);
Expand All @@ -235,7 +242,7 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,
assert KllHelper.sumTheSampleWeights(mySketch.getNumLevels(), mySketch.levelsArr) == mySketch.getN();
}

private static <T> void mergeSortedItemsArrays(
private static <T> void mergeSortedItemsArrays( //only bufC is modified
final Object[] bufA, final int startA, final int lenA,
final Object[] bufB, final int startB, final int lenB,
final Object[] bufC, final int startC, final Comparator<? super T> comp) {
Expand Down Expand Up @@ -309,30 +316,34 @@ private static void randomlyHalveUpItems(final Object[] buf, final int start, fi
}

//Called from KllItemsSketch::update and this
static <T> void updateItem(final KllItemsSketch<T> itmSk,
final Object item, final Comparator<? super T> comp) {
if (item == null) { return; } //ignore
if (itmSk.isEmpty()) {
itmSk.setMinItem(item);
itmSk.setMaxItem(item);
} else {
itmSk.setMinItem(Util.minT(itmSk.getMinItem(), item, comp));
itmSk.setMaxItem(Util.maxT(itmSk.getMaxItem(), item, comp));
}
int level0space = itmSk.levelsArr[0];
assert level0space >= 0;
if (level0space == 0) {
static <T> void updateItem(final KllItemsSketch<T> itmSk, final Object item) {
itmSk.updateMinMax((T)item);
int freeSpace = itmSk.levelsArr[0];
assert freeSpace >= 0;
if (freeSpace == 0) {
compressWhileUpdatingSketch(itmSk);
level0space = itmSk.levelsArr[0];
assert (level0space > 0);
freeSpace = itmSk.levelsArr[0];
assert (freeSpace > 0);
}
itmSk.incN();
itmSk.setLevelZeroSorted(false);
final int nextPos = level0space - 1;
final int nextPos = freeSpace - 1;
itmSk.setLevelsArrayAt(0, nextPos);
itmSk.setItemsArrayAt(nextPos, item);
}

//Called from KllItemsSketch::update with weight
static <T> void updateItem(final KllItemsSketch<T> itmSk, final T item, final int weight) {
if (weight < itmSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateItem(itmSk, item); }
} else {
itmSk.updateMinMax(item);
final KllHeapItemsSketch<T> tmpSk =
new KllHeapItemsSketch<>(itmSk.getK(), DEFAULT_M, item, weight, itmSk.comparator, itmSk.serDe);
itmSk.merge(tmpSk);
}
}

/**
* Compression algorithm used to merge higher levels.
* <p>Here is what we do for each level:</p>
Expand Down Expand Up @@ -462,7 +473,8 @@ private static <T> void populateItemWorkArrays(
final Comparator<? super T> comp) {
worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;
Expand All @@ -471,12 +483,15 @@ private static <T> void populateItemWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop > 0 && otherPop == 0) {
assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedItemsArrays(
myCurItemsArr, myCurLevelsArr[lvl], selfPop,
otherItemsArr, otherLevelsArr[lvl], otherPop,
Expand All @@ -500,4 +515,3 @@ private static <T> void populateItemWorkArrays(
// }

}

23 changes: 22 additions & 1 deletion src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
Expand Down Expand Up @@ -288,8 +289,18 @@ public String toString(final boolean withLevels, final boolean withLevelsAndItem

@Override
public void update(final T item) {
if (item == null) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
KllItemsHelper.updateItem(this, item, comparator);
KllItemsHelper.updateItem(this, item);
kllItemsSV = null;
}

public void update(final T item, final int weight) {
if (item == null) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllItemsHelper.updateItem(this, item); }
else { KllItemsHelper.updateItem(this, item, weight); }
kllItemsSV = null;
}

Expand Down Expand Up @@ -374,4 +385,14 @@ void setWritableMemory(final WritableMemory wmem) {
throw new SketchesArgumentException(UNSUPPORTED_MSG + "Sketch not writable.");
}

void updateMinMax(final T item) {
if (isEmpty()) {
setMinItem(item);
setMaxItem(item);
} else {
setMinItem(Util.minT(getMinItem(), item, comparator));
setMaxItem(Util.maxT(getMaxItem(), item, comparator));
}
}

}
Loading

0 comments on commit 84c9cb2

Please sign in to comment.