diff --git a/src/main/java/com/yahoo/sketches/memory/AllocMemory.java b/src/main/java/com/yahoo/sketches/memory/AllocMemory.java index fc0395013..019919430 100644 --- a/src/main/java/com/yahoo/sketches/memory/AllocMemory.java +++ b/src/main/java/com/yahoo/sketches/memory/AllocMemory.java @@ -6,8 +6,6 @@ import static com.yahoo.sketches.memory.UnsafeUtil.unsafe; -import java.util.Arrays; - /** * The AllocMemory class is a subclass of NativeMemory and is used to allocate direct, off-heap * native memory, which is then accessed by the NativeMemory methods. @@ -26,7 +24,9 @@ public class AllocMemory extends NativeMemory { * @param capacityBytes the size in bytes of the native memory */ public AllocMemory(long capacityBytes) { - super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes); + super(0L, null, null); + super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes); + super.capacityBytes_ = capacityBytes; super.memReq_ = null; } @@ -38,7 +38,9 @@ public AllocMemory(long capacityBytes) { * @param memReq The MemoryRequest callback */ public AllocMemory(long capacityBytes, MemoryRequest memReq) { - super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes); + super(0L, null, null); + super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes); + super.capacityBytes_ = capacityBytes; super.memReq_ = memReq; } @@ -52,14 +54,15 @@ public AllocMemory(long capacityBytes, MemoryRequest memReq) { * The OS is free to just expand the capacity of the current allocation at the same native * address, or reassign a completely different native address in which case the origMem will be * freed by the OS. - * The origMem internals will be reset to zero or nulls and must not be used again. + * The origMem capacity will be set to zero and must not be used again. * * @param newCapacityBytes the desired new capacity of the newly allocated memory in bytes * @param memReq The MemoryRequest callback, which may be null. */ public AllocMemory(NativeMemory origMem, long newCapacityBytes, MemoryRequest memReq) { - super(0L, null, null, - unsafe.reallocateMemory(origMem.nativeRawStartAddress_, newCapacityBytes), newCapacityBytes); + super(0L, null, null); + super.nativeRawStartAddress_ = unsafe.reallocateMemory(origMem.nativeRawStartAddress_, newCapacityBytes); + super.capacityBytes_ = newCapacityBytes; this.memReq_ = memReq; origMem.nativeRawStartAddress_ = 0; //does not require freeMem origMem.capacityBytes_ = 0; //Cannot be used again @@ -71,7 +74,7 @@ public AllocMemory(NativeMemory origMem, long newCapacityBytes, MemoryRequest me * zero to copyToBytes; clear the new memory from copyToBytes to capacityBytes. * @param origMem The original NativeMemory, a portion of which will be copied to the * newly allocated Memory. - * The reference must not be null. + * The reference must not be null. * This origMem is not modified in any way, may be reused and must be freed appropriately. * @param copyToBytes the upper limit of the region to be copied from origMem to the newly * allocated memory. @@ -79,9 +82,10 @@ public AllocMemory(NativeMemory origMem, long newCapacityBytes, MemoryRequest me * upper limit of the region to be cleared. * @param memReq The MemoryRequest callback, which may be null. */ - public AllocMemory(NativeMemory origMem, long copyToBytes, long capacityBytes, - MemoryRequest memReq) { - super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes); + public AllocMemory(NativeMemory origMem, long copyToBytes, long capacityBytes, MemoryRequest memReq) { + super(0L, null, null); + super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes); + super.capacityBytes_ = capacityBytes; this.memReq_ = memReq; MemoryUtil.copy(origMem, 0, this, 0, copyToBytes); this.clear(copyToBytes, capacityBytes-copyToBytes); @@ -102,7 +106,10 @@ protected void finalize() { System.err.println( "ERROR: freeMemory() has not been called: Address: "+ nativeRawStartAddress_ + ", capacity: " + capacityBytes_); - System.err.println(Arrays.toString(Thread.currentThread().getStackTrace())); + java.lang.StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + for (int i=0; i0 valid null 0 >0 FALSE FALSE + NativeMemory longArr >0 valid null 0 >0 FALSE FALSE + NativeMemory ByteBuffer Direct 0 null valid >0 >0 FALSE TRUE + NativeMemory ByteBuffer not Direct >0 valid valid 0 >0 FALSE FALSE + AllocMemory All cases 0 null null >0 >0 TRUE TRUE + */ + protected final long objectBaseOffset_; + protected final Object memArray_; //holding on to this to make sure that it is not garbage collected before we are done with it. protected final ByteBuffer byteBuf_; - - protected long nativeRawStartAddress_; //only non-zero for native - protected long capacityBytes_; //only non-zero if allocated and class still valid - - protected NativeMemory(long objectBaseOffset, Object memArray, ByteBuffer byteBuf, - long nativeRawStartAddress, long capacityBytes) { - this.objectBaseOffset_ = objectBaseOffset; - this.memArray_ = memArray; - this.byteBuf_ = byteBuf; - this.nativeRawStartAddress_ = nativeRawStartAddress; - this.capacityBytes_ = capacityBytes; + protected volatile long nativeRawStartAddress_; + protected volatile long capacityBytes_; + protected volatile MemoryRequest memReq_ = null; //set via AllocMemory + + //only sets the finals + protected NativeMemory(long objectBaseOffset, Object memArray, ByteBuffer byteBuf) { + objectBaseOffset_ = objectBaseOffset; + memArray_ = memArray; + byteBuf_ = byteBuf; } /** @@ -69,12 +74,14 @@ protected NativeMemory(long objectBaseOffset, Object memArray, ByteBuffer byteBu * @param byteArray an on-heap byte array */ public NativeMemory(byte[] byteArray) { - int arrLen = byteArray.length; - memArray_ = byteArray; - objectBaseOffset_ = BYTE_ARRAY_BASE_OFFSET; + this(BYTE_ARRAY_BASE_OFFSET, byteArray, null); + if ((byteArray == null) || (byteArray.length == 0)) { + throw new IllegalArgumentException( + "Array must must not be null and have a length greater than zero."); + } nativeRawStartAddress_ = 0L; - capacityBytes_ = arrLen; - byteBuf_ = null; + capacityBytes_ = byteArray.length; + } /** @@ -82,16 +89,13 @@ public NativeMemory(byte[] byteArray) { * @param longArray an on-heap long array */ public NativeMemory(long[] longArray) { - int arrLen = longArray.length; - if (arrLen <= 0) { + this(LONG_ARRAY_BASE_OFFSET, longArray, null); + if ((longArray == null) || (longArray.length == 0)) { throw new IllegalArgumentException( - "longArray must have a length greater than zero."); + "Array must must not be null and have a length greater than zero."); } - memArray_ = longArray; - objectBaseOffset_ = LONG_ARRAY_BASE_OFFSET; nativeRawStartAddress_ = 0L; - capacityBytes_ = arrLen << LONG_SHIFT; - byteBuf_ = null; + capacityBytes_ = longArray.length << LONG_SHIFT; } /** @@ -99,18 +103,20 @@ public NativeMemory(long[] longArray) { * @param byteBuf the given ByteBuffer */ public NativeMemory(ByteBuffer byteBuf) { - capacityBytes_ = byteBuf.capacity(); - byteBuf_ = byteBuf; - if (byteBuf_.isDirect()) { - memArray_ = null; + if (byteBuf.isDirect()) { objectBaseOffset_ = 0L; + memArray_ = null; + byteBuf_ = byteBuf; nativeRawStartAddress_ = ((sun.nio.ch.DirectBuffer)byteBuf).address(); } else { //must have array - memArray_ = byteBuf_.array(); objectBaseOffset_ = BYTE_ARRAY_BASE_OFFSET; + memArray_ = byteBuf.array(); + byteBuf_ = byteBuf; nativeRawStartAddress_ = 0L; } + capacityBytes_ = byteBuf.capacity(); + } @Override @@ -557,13 +563,22 @@ public Object getParent() { return memArray_; } + @Override + public void setMemoryRequest(MemoryRequest memReq) { + memReq_ = memReq; + } + @Override public String toHexString(String header, long offsetBytes, int lengthBytes) { StringBuilder sb = new StringBuilder(); sb.append(header).append("\n"); - String s1 = String.format("(%d, %d)", offsetBytes, lengthBytes); - sb.append(this.getClass().getName()); - sb.append(".toHexString").append(s1).append(", hash: ").append(this.hashCode()).append(":"); + String s1 = String.format("(..., %d, %d)", offsetBytes, lengthBytes); + sb.append(this.getClass().getSimpleName()).append(".toHexString"). + append(s1).append(", hash: ").append(this.hashCode()).append("\n"); + sb.append(" MemoryRequest: "); + if (memReq_ != null) { + sb.append(memReq_.getClass().getSimpleName()).append(", hash: ").append(memReq_.hashCode()); + } else sb.append("null"); return toHex(sb.toString(), offsetBytes, lengthBytes); } @@ -586,8 +601,20 @@ public ByteBuffer byteBuffer() { } /** - * Frees this Memory. If direct, off-heap native memory is allocated via the AllocMemory - * sub-class this method must be called in either the NativeMemory class or the AllocMemory class. + * Returns true if this NativeMemory is accessing native (off-heap) memory directly. + * This includes the case of a Direct ByteBuffer. + * @return true if this NativeMemory is accessing native (off-heap) memory directly. + */ + public boolean isDirect() { + return nativeRawStartAddress_ > 0; + } + + /** + * This frees this Memory only if it is required. This always sets the capacity to zero + * and the reference to MemoryRequest to null, which effectively disables this class. + * However, + * + * It is always safe to call this method when you are done with this class. */ public void freeMemory() { if (requiresFree()) { @@ -595,6 +622,7 @@ public void freeMemory() { nativeRawStartAddress_ = 0L; } capacityBytes_ = 0L; + memReq_ = null; } /** @@ -664,7 +692,7 @@ private String toHex(String header, long offsetBytes, int lengthBytes) { * @return true if the object should be freed when it is no longer needed */ protected boolean requiresFree() { - return nativeRawStartAddress_ != 0L && (byteBuf_ == null); + return (nativeRawStartAddress_ != 0L) && (byteBuf_ == null); } } diff --git a/src/main/java/com/yahoo/sketches/quantiles/HeapQuantilesSketch.java b/src/main/java/com/yahoo/sketches/quantiles/HeapQuantilesSketch.java index 9d70a3a4e..e3963b719 100644 --- a/src/main/java/com/yahoo/sketches/quantiles/HeapQuantilesSketch.java +++ b/src/main/java/com/yahoo/sketches/quantiles/HeapQuantilesSketch.java @@ -24,6 +24,7 @@ import static com.yahoo.sketches.quantiles.Util.computeNumLevelsNeeded; import static com.yahoo.sketches.quantiles.Util.linearTimeIncrementHistogramCounters; import static com.yahoo.sketches.quantiles.Util.positionOfLowestZeroBitStartingAt; +import static com.yahoo.sketches.Util.checkIfPowerOf2; import java.util.Arrays; import java.util.Random; @@ -212,6 +213,7 @@ public double getQuantile(double fraction) { @Override public double[] getQuantiles(double[] fractions) { + QuantilesSketch.validateSequential(fractions); Auxiliary aux = null; // double[] answers = new double[fractions.length]; for (int i = 0; i < fractions.length; i++) { @@ -275,7 +277,17 @@ public double getMinValue() { public double getMaxValue() { return maxValue_; } - + + @Override + public long getN() { + return n_; + } + + @Override + public short getSeed() { + return seed_; + } + @Override public boolean isDirect() { return false; @@ -291,7 +303,7 @@ public void reset() { minValue_ = java.lang.Double.POSITIVE_INFINITY; maxValue_ = java.lang.Double.NEGATIVE_INFINITY; } - + @Override public byte[] toByteArray() { int preLongs, arrLongs, flags; @@ -387,6 +399,7 @@ public String toString(boolean sketchSummary, boolean dataDetail) { sb.append(LS).append("### ").append(thisSimpleName).append(" SUMMARY: ").append(LS); sb.append(" K : ").append(getK()).append(LS); sb.append(" N : ").append(nStr).append(LS); + sb.append(" Seed : ").append(seed_).append(LS); sb.append(" BaseBufferCount : ").append(getBaseBufferCount()).append(LS); sb.append(" CombinedBufferAllocatedCount : ").append(bufCntStr).append(LS); sb.append(" Total Levels : ").append(numLevels).append(LS); @@ -403,11 +416,6 @@ public String toString(boolean sketchSummary, boolean dataDetail) { return sb.toString(); } - @Override - public void merge(QuantilesSketch qsSource) { - mergeInto(qsSource, this); - } - // It is easy to prove that the following simplified code which launches // multiple waves of carry propagation does exactly the same amount of merging work // (including the work of allocating fresh buffers) as the more complicated and @@ -427,17 +435,19 @@ public void merge(QuantilesSketch qsSource) { @Override - public void mergeInto(QuantilesSketch srcQS, QuantilesSketch tgtQS) { - if (srcQS.isDirect() || tgtQS.isDirect()) { + public void mergeInto(QuantilesSketch source, QuantilesSketch target) { + if (source.isDirect() || target.isDirect()) { throw new IllegalArgumentException("DirectQuantilesSketch not implemented."); } - HeapQuantilesSketch src = (HeapQuantilesSketch)srcQS; - HeapQuantilesSketch tgt = (HeapQuantilesSketch)tgtQS; - - if ( tgt.getK() != src.getK()) - throw new IllegalArgumentException("Given sketches must have the same value of k."); - + HeapQuantilesSketch src = (HeapQuantilesSketch)source; + HeapQuantilesSketch tgt = (HeapQuantilesSketch)target; + + if (src.getK() != tgt.getK()) { + downSamplingMergeInto(src, tgt); + return; + } + double[] srcLevels = src.getCombinedBuffer(); // aliasing is a bit dangerous double[] srcBaseBuffer = src.getCombinedBuffer(); // aliasing is a bit dangerous @@ -476,21 +486,20 @@ public void mergeInto(QuantilesSketch srcQS, QuantilesSketch tgtQS) { if (srcMin < tgtMin) { tgt.minValue_ = srcMin; } } + @Override - public long getN() { - return n_; + public QuantilesSketch downSample(int newK) { + HeapQuantilesSketch oldSketch = this; + HeapQuantilesSketch newSketch = HeapQuantilesSketch.getInstance(newK, Util.DEFAULT_SEED); + downSamplingMergeInto(oldSketch, newSketch); + return newSketch; } - //Restricted - - /** - * Returns the Auxiliary data structure which is only used for getQuantile() and getQuantiles() - * queries. - * @return the Auxiliary data structure - */ - Auxiliary constructAuxiliary() { - return new Auxiliary( this ); - //k_, n_, bitPattern_, combinedBuffer_, baseBufferCount_, numSamplesInSketch()); + //Restricted overrides + + @Override + int getBaseBufferCount() { + return baseBufferCount_; } @Override @@ -502,26 +511,82 @@ long getBitPattern() { double[] getCombinedBuffer() { return combinedBuffer_; } - - @Override - int getBaseBufferCount() { - return baseBufferCount_; - } + + //Other restricted /** - * Computes a checksum of all the samples in the sketch. Used in testing the Auxiliary - * @return a checksum of all the samples in the sketch - */ //Used by test - final double sumOfSamplesInSketch() { - double total = Util.sumOfDoublesInSubArray(combinedBuffer_, 0, baseBufferCount_); - long bits = bitPattern_; - assert bits == n_ / (2L * k_); // internal consistency check - for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) { - if ((bits & 1L) > 0L) { - total += Util.sumOfDoublesInSubArray(combinedBuffer_, ((2+lvl) * k_), k_); + * Merges the source sketch into the target sketch that can have a smaller value of K. + * However, it is required that the ratio of the two K values be a power of 2. + * I.e., source.getK() = target.getK() * 2^(nonnegative integer). + * The source is not modified. + * + * @param src The source sketch + * @param tgt The target sketch + */ + static void downSamplingMergeInto(HeapQuantilesSketch src, HeapQuantilesSketch tgt) { + + int targetK = tgt.getK(); + int sourceK = src.getK(); + + if ((sourceK % targetK) != 0) { + throw new IllegalArgumentException("source.getK() must equal target.getK() * 2^(nonnegative integer)."); + } + + int downFactor = sourceK / targetK; + checkIfPowerOf2(downFactor, "source.getK()/target.getK() ratio"); + int lgDownFactor = Integer.numberOfTrailingZeros(downFactor); + + double [] sourceLevels = src.getCombinedBuffer(); // aliasing is a bit dangerous + double [] sourceBaseBuffer = src.getCombinedBuffer(); // aliasing is a bit dangerous + + long nFinal = tgt.getN() + src.getN(); + + for (int i = 0; i < src.getBaseBufferCount(); i++) { + tgt.update (sourceBaseBuffer[i]); + } + + tgt.maybeGrowLevels (nFinal); + + double [] scratchBuf = new double [2*targetK]; + double [] downBuf = new double [targetK]; + + long srcBitPattern = src.getBitPattern(); + for (int srcLvl = 0; srcBitPattern != 0L; srcLvl++, srcBitPattern >>>= 1) { + if ((srcBitPattern & 1L) > 0L) { + justZipWithStride (sourceLevels, ((2+srcLvl) * sourceK), + downBuf, 0, + targetK, + downFactor); + tgt.inPlacePropagateCarry (srcLvl+lgDownFactor, + downBuf, 0, + scratchBuf, 0, + false); + // won't update target.n_ until the very end } } - return total; + + tgt.n_ = nFinal; + + assert tgt.getN() / (2*targetK) == tgt.bitPattern_; // internal consistency check + + double srcMax = src.getMaxValue(); + double srcMin = src.getMinValue(); + double tgtMax = tgt.getMaxValue(); + double tgtMin = tgt.getMinValue(); + + if (srcMax > tgtMax) { tgt.maxValue_ = srcMax; } + if (srcMin < tgtMin) { tgt.minValue_ = srcMin; } + + } + + /** + * Returns the Auxiliary data structure which is only used for getQuantile() and getQuantiles() + * queries. + * @return the Auxiliary data structure + */ + Auxiliary constructAuxiliary() { + return new Auxiliary( this ); + //k_, n_, bitPattern_, combinedBuffer_, baseBufferCount_, numSamplesInSketch()); } private void growBaseBuffer() { @@ -576,6 +641,18 @@ private static void zipSize2KBuffer(double[] bufA, int startA, // input } } + private static void justZipWithStride(double[] bufA, int startA, // input + double[] bufC, int startC, // output + int kC, // number of items that should be in the output + int stride) { + int randomOffset = (Util.rand.nextInt(stride)); + int limC = startC + kC; + + for (int a = startA + randomOffset, c = startC; c < limC; a += stride, c++ ) { + bufC[c] = bufA[a]; + } + } + private static void mergeTwoSizeKBuffers(double[] keySrc1, int arrStart1, double[] keySrc2, int arrStart2, double[] keyDst, int arrStart3, @@ -643,7 +720,7 @@ private void inPlacePropagateCarry(int startingLevel, // update bit pattern with binary-arithmetic ripple carry bitPattern_ = bitPattern_ + (((long) 1) << startingLevel); } - + /** * Called when the base buffer has just acquired 2*k elements. */ @@ -666,7 +743,7 @@ private void processFullBaseBuffer() { //Arrays.fill(baseBuffer, 0, 2*k_, DUMMY_VALUE); assert n_ / (2*k_) == bitPattern_; // internal consistency check } - + /** * Shared algorithm for both PMF and CDF functions. The splitPoints must be unique, monotonically * increasing values. @@ -678,7 +755,7 @@ private long[] internalBuildHistogram(double[] splitPoints) { double[] levelsArr = combinedBuffer_; // aliasing is a bit dangerous double[] baseBuffer = combinedBuffer_; // aliasing is a bit dangerous - QuantilesSketch.validateSplitPoints(splitPoints); + QuantilesSketch.validateSequential(splitPoints); int numSplitPoints = splitPoints.length; int numCounters = numSplitPoints + 1; @@ -712,5 +789,35 @@ private long[] internalBuildHistogram(double[] splitPoints) { } return counters; } - + + //Used for test + + /** + * Computes a checksum of all the samples in the sketch. Used in testing the Auxiliary + * @return a checksum of all the samples in the sketch + */ //Used by test + final double sumOfSamplesInSketch() { + double total = Util.sumOfDoublesInSubArray(combinedBuffer_, 0, baseBufferCount_); + long bits = bitPattern_; + assert bits == n_ / (2L * k_); // internal consistency check + for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) { + if ((bits & 1L) > 0L) { + total += Util.sumOfDoublesInSubArray(combinedBuffer_, ((2+lvl) * k_), k_); + } + } + return total; + } + + static boolean sameStructurePredicate( HeapQuantilesSketch mq1, HeapQuantilesSketch mq2) { + return ( + (mq1.k_ == mq2.k_) && + (mq1.n_ == mq2.n_) && + (mq1.combinedBufferAllocatedCount_ == mq2.combinedBufferAllocatedCount_) && + (mq1.baseBufferCount_ == mq2.baseBufferCount_) && + (mq1.bitPattern_ == mq2.bitPattern_) && + (mq1.minValue_ == mq2.minValue_) && + (mq1.maxValue_ == mq2.maxValue_) + ); + } + } // End of class HeapQuantilesSketch diff --git a/src/main/java/com/yahoo/sketches/quantiles/QuantilesSketch.java b/src/main/java/com/yahoo/sketches/quantiles/QuantilesSketch.java index 296a34ee9..ccb6546b5 100644 --- a/src/main/java/com/yahoo/sketches/quantiles/QuantilesSketch.java +++ b/src/main/java/com/yahoo/sketches/quantiles/QuantilesSketch.java @@ -151,43 +151,20 @@ public static final QuantilesSketchBuilder builder() { /** * This is a more efficent multiple-query version of getQuantile(). *

- * This returns an array that could have been generated by - * mapping getQuantile() over the given array of fractions. - * However, the computational overhead of getQuantile() is shared - * amongst the multiple queries. Therefore, we strongly recommend this method - * instead of multiple calls to getQuantile(). + * This returns an array that could have been generated by mapping getQuantile() over the given + * array of fractions. However, the computational overhead of getQuantile() is shared amongst + * the multiple queries. Therefore, we strongly recommend this method instead of multiple calls + * to getQuantile(). * * @param fractions given array of fractional positions in the hypothetical sorted stream. - * It is recommended that these be in increasing order. + * These fractions must be monotonic, in increasing order and in the interval + * [0.0, 1.0] inclusive. * - * @return array of approximations to the given fractions in the same order as given fractions array. + * @return array of approximations to the given fractions in the same order as given fractions + * array. */ public abstract double[] getQuantiles(double[] fractions); - /** - * Get the rank error normalized as a fraction between zero and one. - * The error of this sketch is specified as a fraction of the normalized rank of the hypothetical - * sorted stream of items presented to the sketch. - * - *

Suppose the sketch is presented with N values. The raw rank (0 to N-1) of an item - * would be its index position in the sorted version of the input stream. If we divide the - * raw rank by N, it becomes the normalized rank, which is between 0 and 1.0. - * - *

For example, choosing a K of 227 yields a normalized rank error of about 1%. - * The upper bound on the median value obtained by getQuantile(0.5) would be the value in the - * hypothetical ordered stream of values at the normalized rank of 0.51. - * The lower bound would be the value in the hypothetical ordered stream of values at the - * normalized rank of 0.49. - * - *

The error of this sketch cannot be translated into an error (relative or absolute) of the - * returned quantile values. - * - * @return the rank error normalized as a fraction between zero and one. - */ - public double getNormalizedRankError() { - return Util.EpsilonFromK.getAdjustedEpsilon(getK()); - } - /** * Returns an approximation to the Probability Mass Function (PMF) of the input stream * given a set of splitPoints (values). @@ -219,20 +196,12 @@ public double getNormalizedRankError() { */ public abstract double[] getCDF(double[] splitPoints); - //Internal parameters - /** * Returns the configured value of K * @return the configured value of K */ public abstract int getK(); - - /** - * Returns the length of the input stream so far. - * @return the length of the input stream so far - */ - public abstract long getN(); - + /** * Returns the min value of the stream * @return the min value of the stream @@ -246,13 +215,50 @@ public double getNormalizedRankError() { public abstract double getMaxValue(); /** - * Returns true if this sketch is empty - * @return true if this sketch is empty + * Returns the length of the input stream so far. + * @return the length of the input stream so far */ - public boolean isEmpty() { - return getN() == 0; + public abstract long getN(); + + /** + * Get the rank error normalized as a fraction between zero and one. + * The error of this sketch is specified as a fraction of the normalized rank of the hypothetical + * sorted stream of items presented to the sketch. + * + *

Suppose the sketch is presented with N values. The raw rank (0 to N-1) of an item + * would be its index position in the sorted version of the input stream. If we divide the + * raw rank by N, it becomes the normalized rank, which is between 0 and 1.0. + * + *

For example, choosing a K of 227 yields a normalized rank error of about 1%. + * The upper bound on the median value obtained by getQuantile(0.5) would be the value in the + * hypothetical ordered stream of values at the normalized rank of 0.51. + * The lower bound would be the value in the hypothetical ordered stream of values at the + * normalized rank of 0.49. + * + *

The error of this sketch cannot be translated into an error (relative or absolute) of the + * returned quantile values. + * + * @return the rank error normalized as a fraction between zero and one. + */ + public double getNormalizedRankError() { + return getNormalizedRankError(getK()); } + /** + * Static method version of {@link #getNormalizedRankError()} + * @param k the configuration parameter of a QuantilesSketch + * @return the rank error normalized as a fraction between zero and one. + */ + public static double getNormalizedRankError(int k) { + return Util.EpsilonFromK.getAdjustedEpsilon(k); + } + + /** + * Returns the seed + * @return the seed + */ + public abstract short getSeed(); + /** * Returns true if this sketch accesses its internal data using the Memory package. * (The Direct version of this sketch is not yet implemented.) @@ -261,7 +267,15 @@ public boolean isEmpty() { public abstract boolean isDirect(); /** - * Resets this sketch to a virgin state, but retains the original value of k. + * Returns true if this sketch is empty + * @return true if this sketch is empty + */ + public boolean isEmpty() { + return getN() == 0; + } + + /** + * Resets this sketch to a virgin state, but retains the original value of k and the seed. */ public abstract void reset(); @@ -288,18 +302,41 @@ public String toString() { */ public abstract String toString(boolean sketchSummary, boolean dataDetail); + //Merging etc + /** - * Merges the given sketch into this one - * @param qsSource the given source sketch + * Merges the given sketch into this one. + * Merges the source sketch into this sketch that can have a smaller value of K. + * However, it is required that the ratio of the two K values be a power of 2. + * I.e., source.getK() = this.getK() * 2^(nonnegative integer). + * The source is not modified. + * + * @param source the given source sketch */ - public abstract void merge(QuantilesSketch qsSource); + public void merge(QuantilesSketch source) { + mergeInto(source, this); + } /** - * Modifies the source sketch into the target sketch - * @param qsSource The source sketch - * @param qsTarget The target sketch + * Merges the source sketch into the target sketch that can have a smaller value of K. + * However, it is required that the ratio of the two K values be a power of 2. + * I.e., source.getK() = target.getK() * 2^(nonnegative integer). + * The source is not modified. + * + * @param source The source sketch + * @param target The target sketch */ - public abstract void mergeInto(QuantilesSketch qsSource, QuantilesSketch qsTarget); + public abstract void mergeInto(QuantilesSketch source, QuantilesSketch target); + + /** + * From an existing sketch, this creates a new sketch that can have a smaller value of K. + * The original sketch is not modified. + * + * @param smallerK the new sketch's value of K that must be smaller than this value of K. + * It is required that this.getK() = smallerK * 2^(nonnegative integer). + * @return the new sketch. + */ + public abstract QuantilesSketch downSample(int smallerK); /** * Heapify takes the sketch image in Memory and instantiates an on-heap Sketch. @@ -334,14 +371,8 @@ public int getStorageBytes() { return 40 + 8*Util.bufferElementCapacity(getK(), getN()); } - //restricted - - /** - * Returns the combined buffer reference - * @return the commbined buffer reference - */ - abstract double[] getCombinedBuffer(); - + //Restricted abstract + /** * Returns the base buffer count * @return the base buffer count @@ -354,6 +385,15 @@ public int getStorageBytes() { */ abstract long getBitPattern(); + /** + * Returns the combined buffer reference + * @return the commbined buffer reference + */ + abstract double[] getCombinedBuffer(); + + //Other restricted + + /** * Checks the validity of the given value k * @param k must be greater than or equal to 2 and less than 65536. @@ -447,15 +487,15 @@ static void checkFlags(int flags) { } /** - * Checks the validity of the split points. They must be unique, monotonically increasing and - * not NaN. - * @param splitPoints array + * Checks the sequential validity of the given array of values. + * They must be unique, monotonically increasing and not NaN. + * @param values array */ - static final void validateSplitPoints(double[] splitPoints) { - for (int j = 0; j < splitPoints.length - 1; j++) { - if (splitPoints[j] < splitPoints[j+1]) { continue; } + static final void validateSequential(double[] values) { + for (int j = 0; j < values.length - 1; j++) { + if (values[j] < values[j+1]) { continue; } throw new IllegalArgumentException( - "SplitPoints must be unique, monotonically increasing and not NaN."); + "Values must be unique, monotonically increasing and not NaN."); } } diff --git a/src/main/java/com/yahoo/sketches/quantiles/Util.java b/src/main/java/com/yahoo/sketches/quantiles/Util.java index 353f66f2f..daa1e314b 100644 --- a/src/main/java/com/yahoo/sketches/quantiles/Util.java +++ b/src/main/java/com/yahoo/sketches/quantiles/Util.java @@ -386,7 +386,7 @@ static class EpsilonFromK { * @return the resulting epsilon */ //used by HeapQS, so far static double getAdjustedEpsilon(int k) { - if (k == 1) return 1.0; //TODO IS THIS THE RIGHT VALUE? + if (k == 1) return 1.0; return getTheoreticalEpsilon(k, adjustKForEps); } diff --git a/src/test/java/com/yahoo/sketches/memory/MemoryRegionTest.java b/src/test/java/com/yahoo/sketches/memory/MemoryRegionTest.java index b7961997f..c099ebe4c 100644 --- a/src/test/java/com/yahoo/sketches/memory/MemoryRegionTest.java +++ b/src/test/java/com/yahoo/sketches/memory/MemoryRegionTest.java @@ -12,6 +12,7 @@ import static com.yahoo.sketches.memory.CommonTests.setGetTests; import static com.yahoo.sketches.memory.CommonTests.toHexStringAllMemTests; import static org.testng.Assert.assertEquals; +import static java.lang.Math.*; import org.testng.annotations.Test; @@ -271,6 +272,78 @@ public void checkReassign() { assertEquals(reg.getLong(0), -2L); } + ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// + //this one allocates what was asked from MemoryRegion + private class MemoryRegionManager implements MemoryRequest { + private Memory parent_ = null; + private long capUsed_ = 0; //a very simple memory management scheme! + + MemoryRegionManager(Memory parent) { + parent_ = parent; + } + + @Override + public Memory request(long capacityBytes) { + if (capacityBytes <= (parent_.getCapacity() - capUsed_)) { + Memory newMem = new MemoryRegion(parent_, capUsed_, capacityBytes, this); + capUsed_ += capacityBytes; + return newMem; + } + return null; //could not satisfy the request + } + + @Override + public Memory request(Memory origMem, long copyToBytes, long capacityBytes) { + if (capacityBytes <= (parent_.getCapacity() - capUsed_)) { + MemoryRegion newMem = new MemoryRegion(parent_, capUsed_, capacityBytes); + capUsed_ += capacityBytes; + long minCopyToBytes = min(min(origMem.getCapacity(), copyToBytes), capacityBytes); + MemoryUtil.copy(origMem, 0, newMem, 0, minCopyToBytes); + if (minCopyToBytes < capacityBytes) { + newMem.clear(minCopyToBytes, capacityBytes - minCopyToBytes); + } + return newMem; + } + return null; //could not satisfy the request + } + + @Override + public void free(Memory mem) { + //In a more sophisticated memory management scheme this would allow reallocation of + // memory regions. + } + + @Override + public void free(Memory memToFree, Memory newMem) { + //In a more sophisticated memory management scheme this would allow reallocation of + // memory regions. + } + } + ////////////////////////////////////////////////////// + + @Test + public void checkMemoryRegionRequest() { + int parentCap = 256; + byte[] memArr = new byte[parentCap]; + NativeMemory parent = new NativeMemory(memArr); + MemoryRequest mr = new MemoryRegionManager(parent); + //mark the memory so we can see it + for (int i=0; i= 1; i--) { + origSketch.update (i); + directSketch.update (i); + } + HeapQuantilesSketch downSketch = (HeapQuantilesSketch)origSketch.downSample(smallK); + println ("\nOrig\n"); + println(origSketch.toString(true, true)); + println ("\nDown\n"); + println(downSketch.toString(true, true)); + println("\nDirect\n"); + println(directSketch.toString(true, true)); + } + + @Test + public void checkDownSampling() { + testDownSampling(4,4); + testDownSampling(16,4); + testDownSampling(12,3); + } + + @Test + public void testDownSampling2 () { + HeapQuantilesSketch origSketch = HeapQuantilesSketch.getInstance(8, Util.DEFAULT_SEED); + HeapQuantilesSketch directSketch = HeapQuantilesSketch.getInstance(2, Util.DEFAULT_SEED); + HeapQuantilesSketch downSketch; + downSketch = (HeapQuantilesSketch)origSketch.downSample(2); + assertTrue(HeapQuantilesSketch.sameStructurePredicate (directSketch, downSketch)); + for (int i = 0; i < 50; i++) { + origSketch.update (i); + directSketch.update (i); + downSketch = (HeapQuantilesSketch)origSketch.downSample(2); + assertTrue (HeapQuantilesSketch.sameStructurePredicate (directSketch, downSketch)); + } + + } + + @Test + public void testDownSampling3() { + for (int n1 = 0; n1 < 50; n1++ ) { + HeapQuantilesSketch bigSketch = HeapQuantilesSketch.getInstance(8, Util.DEFAULT_SEED); + for (int i1 = 1; i1 <= n1; i1++ ) { + bigSketch.update(i1); + } + for (int n2 = 0; n2 < 50; n2++ ) { + HeapQuantilesSketch directSketch = HeapQuantilesSketch.getInstance(2, Util.DEFAULT_SEED); + for (int i1 = 1; i1 <= n1; i1++ ) { + directSketch.update(i1); + } + for (int i2 = 1; i2 <= n2; i2++ ) { + directSketch.update(i2); + } + HeapQuantilesSketch smlSketch = HeapQuantilesSketch.getInstance(2, Util.DEFAULT_SEED); + for (int i2 = 1; i2 <= n2; i2++ ) { + smlSketch.update(i2); + } + HeapQuantilesSketch.downSamplingMergeInto(bigSketch, smlSketch); + assertTrue (HeapQuantilesSketch.sameStructurePredicate(directSketch, smlSketch)); + } + } + } + + @Test + public void testDownSampling4() { + for (int n1 = 0; n1 < 50; n1++ ) { + HeapQuantilesSketch bigSketch = HeapQuantilesSketch.getInstance(8, Util.DEFAULT_SEED); + for (int i1 = 1; i1 <= n1; i1++ ) { + bigSketch.update(i1); + } + for (int n2 = 0; n2 < 50; n2++ ) { + HeapQuantilesSketch directSketch = HeapQuantilesSketch.getInstance(2, Util.DEFAULT_SEED); + for (int i1 = 1; i1 <= n1; i1++ ) { + directSketch.update(i1); + } + for (int i2 = 1; i2 <= n2; i2++ ) { + directSketch.update(i2); + } + HeapQuantilesSketch smlSketch = HeapQuantilesSketch.getInstance(2, Util.DEFAULT_SEED); + for (int i2 = 1; i2 <= n2; i2++ ) { + smlSketch.update(i2); + } + smlSketch.merge(bigSketch); + assertTrue (HeapQuantilesSketch.sameStructurePredicate(directSketch, smlSketch)); + } + } + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testDownSamplingExceptions1() { + QuantilesSketch qs1 = QuantilesSketch.builder().build(4); // not smaller + QuantilesSketch qs2 = QuantilesSketch.builder().build(3); + qs1.merge(qs2); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testDownSamplingExceptions2() { + QuantilesSketch qs1 = QuantilesSketch.builder().build(4); + QuantilesSketch qs2 = QuantilesSketch.builder().build(7); // 7/4 not pwr of 2 + qs1.merge(qs2); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testDownSamplingExceptions3() { + QuantilesSketch qs1 = QuantilesSketch.builder().build(4); + QuantilesSketch qs2 = QuantilesSketch.builder().build(12); // 12/4 not pwr of 2 + qs1.merge(qs2); + } + private static void checksForK(int k) { String s = "Did not catch improper k: "+k; try { @@ -585,6 +696,19 @@ static QuantilesSketch buildQS(int k, long n, int startV, short seed) { return qs; } + @Test + public void checkGetSeed() { + QuantilesSketch qs1 = QuantilesSketch.builder().build(4); + assertEquals(qs1.getSeed(), 0); + } + + @Test + public void checkKisOne() { + QuantilesSketch qs1 = QuantilesSketch.builder().build(1); + double err = qs1.getNormalizedRankError(); + assertEquals(err, 1.0, 0.0); + } + @Test public void printlnTest() { println("PRINTING: "+this.getClass().getName()); diff --git a/src/test/java/com/yahoo/sketches/theta/DirectQuickSelectSketchTest.java b/src/test/java/com/yahoo/sketches/theta/DirectQuickSelectSketchTest.java index b44032fff..ecd29ce82 100644 --- a/src/test/java/com/yahoo/sketches/theta/DirectQuickSelectSketchTest.java +++ b/src/test/java/com/yahoo/sketches/theta/DirectQuickSelectSketchTest.java @@ -732,7 +732,9 @@ private class MemoryManager2 implements MemoryRequest { public Memory request(long capacityBytes) { long newCap = capacityBytes*2; println("ReqCap: "+capacityBytes + ", Granted: "+newCap); - return new AllocMemory(newCap, this); + Memory newMem = new AllocMemory(newCap); + newMem.setMemoryRequest(this); + return newMem; } @Override @@ -774,6 +776,8 @@ public void checkLimitedMemoryScenarios2() { } assertEquals(usk1.getEstimate(), u, 0.05*u); NativeMemory nMem = (NativeMemory) usk1.getMemory(); + println(nMem.toHexString("TestMemory", 0, 128)); + println("Freed: " + nMem.getCapacity()); nMem.freeMemory(); }