Skip to content

Commit

Permalink
Updates to Quantiles to allow down sampling.
Browse files Browse the repository at this point in the history
Refined the Memory interface for get/set MemoryRequest
  • Loading branch information
Lee Rhodes committed Feb 5, 2016
1 parent fb8c745 commit a900b7c
Show file tree
Hide file tree
Showing 11 changed files with 585 additions and 170 deletions.
31 changes: 19 additions & 12 deletions src/main/java/com/yahoo/sketches/memory/AllocMemory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import static com.yahoo.sketches.memory.UnsafeUtil.unsafe;

import java.util.Arrays;

/**
* The AllocMemory class is a subclass of NativeMemory and is used to allocate direct, off-heap
* native memory, which is then accessed by the NativeMemory methods.
Expand All @@ -26,7 +24,9 @@ public class AllocMemory extends NativeMemory {
* @param capacityBytes the size in bytes of the native memory
*/
public AllocMemory(long capacityBytes) {
super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes);
super(0L, null, null);
super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes);
super.capacityBytes_ = capacityBytes;
super.memReq_ = null;
}

Expand All @@ -38,7 +38,9 @@ public AllocMemory(long capacityBytes) {
* @param memReq The MemoryRequest callback
*/
public AllocMemory(long capacityBytes, MemoryRequest memReq) {
super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes);
super(0L, null, null);
super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes);
super.capacityBytes_ = capacityBytes;
super.memReq_ = memReq;
}

Expand All @@ -52,14 +54,15 @@ public AllocMemory(long capacityBytes, MemoryRequest memReq) {
* The OS is free to just expand the capacity of the current allocation at the same native
* address, or reassign a completely different native address in which case the origMem will be
* freed by the OS.
* The origMem internals will be reset to zero or nulls and must not be used again.
* The origMem capacity will be set to zero and must not be used again.
*
* @param newCapacityBytes the desired new capacity of the newly allocated memory in bytes
* @param memReq The MemoryRequest callback, which may be null.
*/
public AllocMemory(NativeMemory origMem, long newCapacityBytes, MemoryRequest memReq) {
super(0L, null, null,
unsafe.reallocateMemory(origMem.nativeRawStartAddress_, newCapacityBytes), newCapacityBytes);
super(0L, null, null);
super.nativeRawStartAddress_ = unsafe.reallocateMemory(origMem.nativeRawStartAddress_, newCapacityBytes);
super.capacityBytes_ = newCapacityBytes;
this.memReq_ = memReq;
origMem.nativeRawStartAddress_ = 0; //does not require freeMem
origMem.capacityBytes_ = 0; //Cannot be used again
Expand All @@ -71,17 +74,18 @@ public AllocMemory(NativeMemory origMem, long newCapacityBytes, MemoryRequest me
* zero to copyToBytes; clear the new memory from copyToBytes to capacityBytes.
* @param origMem The original NativeMemory, a portion of which will be copied to the
* newly allocated Memory.
* The reference must not be null.
* The reference must not be null.
* This origMem is not modified in any way, may be reused and must be freed appropriately.
* @param copyToBytes the upper limit of the region to be copied from origMem to the newly
* allocated memory.
* @param capacityBytes the desired new capacity of the newly allocated memory in bytes and the
* upper limit of the region to be cleared.
* @param memReq The MemoryRequest callback, which may be null.
*/
public AllocMemory(NativeMemory origMem, long copyToBytes, long capacityBytes,
MemoryRequest memReq) {
super(0L, null, null, unsafe.allocateMemory(capacityBytes), capacityBytes);
public AllocMemory(NativeMemory origMem, long copyToBytes, long capacityBytes, MemoryRequest memReq) {
super(0L, null, null);
super.nativeRawStartAddress_ = unsafe.allocateMemory(capacityBytes);
super.capacityBytes_ = capacityBytes;
this.memReq_ = memReq;
MemoryUtil.copy(origMem, 0, this, 0, copyToBytes);
this.clear(copyToBytes, capacityBytes-copyToBytes);
Expand All @@ -102,7 +106,10 @@ protected void finalize() {
System.err.println(
"ERROR: freeMemory() has not been called: Address: "+ nativeRawStartAddress_ +
", capacity: " + capacityBytes_);
System.err.println(Arrays.toString(Thread.currentThread().getStackTrace()));
java.lang.StackTraceElement[] arr = Thread.currentThread().getStackTrace();
for (int i=0; i<arr.length; i++) {
System.err.println(arr[i].toString());
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/yahoo/sketches/memory/Memory.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ public interface Memory {
*/
Object getParent();

/**
* Sets a MemoryRequest
* @param memReq the MemoryRequest
*/
void setMemoryRequest(MemoryRequest memReq);

/**
* Returns a formatted hex string of an area of this Memory.
* Used primarily for testing.
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/com/yahoo/sketches/memory/MemoryRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MemoryRegion implements Memory {
private final Memory mem_;
private volatile long memOffsetBytes_;
private volatile long capacityBytes_;
private MemoryRequest memReq_ = null;
private volatile MemoryRequest memReq_ = null;

/**
* Defines a region of the given parent Memory by defining an offset and capacity that are
Expand Down Expand Up @@ -431,14 +431,23 @@ public Object getParent() {
return mem_;
}

@Override
public void setMemoryRequest(MemoryRequest memReq) {
memReq_ = memReq;
}

@Override
public String toHexString(String header, long offsetBytes, int lengthBytes) {
assertBounds(offsetBytes, lengthBytes, capacityBytes_);
StringBuilder sb = new StringBuilder();
sb.append(header).append("\n");
String s1 = String.format("(%d, %d)", offsetBytes, lengthBytes);
sb.append(this.getClass().getName());
sb.append(".toHexString").append(s1).append(", hash: ").append(this.hashCode()).append(":");
String s1 = String.format("(..., %d, %d)", offsetBytes, lengthBytes);
sb.append(this.getClass().getSimpleName()).append(".toHexString").
append(s1).append(", hash: ").append(this.hashCode()).append("\n");
sb.append(" MemoryRequest: ");
if (memReq_ != null) {
sb.append(memReq_.getClass().getSimpleName()).append(", hash: ").append(memReq_.hashCode());
} else sb.append("null");
return mem_.toHexString(sb.toString(), getAddress(offsetBytes), lengthBytes);
}

Expand Down
102 changes: 65 additions & 37 deletions src/main/java/com/yahoo/sketches/memory/NativeMemory.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,71 +46,77 @@
*/
@SuppressWarnings("restriction")
public class NativeMemory implements Memory {
protected final long objectBaseOffset_; //only non-zero for on-heap objects. freeMemory sets to 0.
protected final Object memArray_; //null for off-heap, valid for on-heap.
protected MemoryRequest memReq_ = null; //set via AllocMemory
/*
Class Case ObjBaseOff MemArr byteBuf rawAdd CapacityBytes ReqFree Direct
NativeMemory byteArr >0 valid null 0 >0 FALSE FALSE
NativeMemory longArr >0 valid null 0 >0 FALSE FALSE
NativeMemory ByteBuffer Direct 0 null valid >0 >0 FALSE TRUE
NativeMemory ByteBuffer not Direct >0 valid valid 0 >0 FALSE FALSE
AllocMemory All cases 0 null null >0 >0 TRUE TRUE
*/
protected final long objectBaseOffset_;
protected final Object memArray_;
//holding on to this to make sure that it is not garbage collected before we are done with it.
protected final ByteBuffer byteBuf_;

protected long nativeRawStartAddress_; //only non-zero for native
protected long capacityBytes_; //only non-zero if allocated and class still valid

protected NativeMemory(long objectBaseOffset, Object memArray, ByteBuffer byteBuf,
long nativeRawStartAddress, long capacityBytes) {
this.objectBaseOffset_ = objectBaseOffset;
this.memArray_ = memArray;
this.byteBuf_ = byteBuf;
this.nativeRawStartAddress_ = nativeRawStartAddress;
this.capacityBytes_ = capacityBytes;
protected volatile long nativeRawStartAddress_;
protected volatile long capacityBytes_;
protected volatile MemoryRequest memReq_ = null; //set via AllocMemory

//only sets the finals
protected NativeMemory(long objectBaseOffset, Object memArray, ByteBuffer byteBuf) {
objectBaseOffset_ = objectBaseOffset;
memArray_ = memArray;
byteBuf_ = byteBuf;
}

/**
* Provides access to the given byteArray using Memory interface
* @param byteArray an on-heap byte array
*/
public NativeMemory(byte[] byteArray) {
int arrLen = byteArray.length;
memArray_ = byteArray;
objectBaseOffset_ = BYTE_ARRAY_BASE_OFFSET;
this(BYTE_ARRAY_BASE_OFFSET, byteArray, null);
if ((byteArray == null) || (byteArray.length == 0)) {
throw new IllegalArgumentException(
"Array must must not be null and have a length greater than zero.");
}
nativeRawStartAddress_ = 0L;
capacityBytes_ = arrLen;
byteBuf_ = null;
capacityBytes_ = byteArray.length;

}

/**
* Provides access to the given longArray using Memory interface
* @param longArray an on-heap long array
*/
public NativeMemory(long[] longArray) {
int arrLen = longArray.length;
if (arrLen <= 0) {
this(LONG_ARRAY_BASE_OFFSET, longArray, null);
if ((longArray == null) || (longArray.length == 0)) {
throw new IllegalArgumentException(
"longArray must have a length greater than zero.");
"Array must must not be null and have a length greater than zero.");
}
memArray_ = longArray;
objectBaseOffset_ = LONG_ARRAY_BASE_OFFSET;
nativeRawStartAddress_ = 0L;
capacityBytes_ = arrLen << LONG_SHIFT;
byteBuf_ = null;
capacityBytes_ = longArray.length << LONG_SHIFT;
}

/**
* Provides access to the backing store of the given ByteBuffer using Memory interface
* @param byteBuf the given ByteBuffer
*/
public NativeMemory(ByteBuffer byteBuf) {
capacityBytes_ = byteBuf.capacity();
byteBuf_ = byteBuf;
if (byteBuf_.isDirect()) {
memArray_ = null;
if (byteBuf.isDirect()) {
objectBaseOffset_ = 0L;
memArray_ = null;
byteBuf_ = byteBuf;
nativeRawStartAddress_ = ((sun.nio.ch.DirectBuffer)byteBuf).address();
}
else { //must have array
memArray_ = byteBuf_.array();
objectBaseOffset_ = BYTE_ARRAY_BASE_OFFSET;
memArray_ = byteBuf.array();
byteBuf_ = byteBuf;
nativeRawStartAddress_ = 0L;
}
capacityBytes_ = byteBuf.capacity();

}

@Override
Expand Down Expand Up @@ -557,13 +563,22 @@ public Object getParent() {
return memArray_;
}

@Override
public void setMemoryRequest(MemoryRequest memReq) {
memReq_ = memReq;
}

@Override
public String toHexString(String header, long offsetBytes, int lengthBytes) {
StringBuilder sb = new StringBuilder();
sb.append(header).append("\n");
String s1 = String.format("(%d, %d)", offsetBytes, lengthBytes);
sb.append(this.getClass().getName());
sb.append(".toHexString").append(s1).append(", hash: ").append(this.hashCode()).append(":");
String s1 = String.format("(..., %d, %d)", offsetBytes, lengthBytes);
sb.append(this.getClass().getSimpleName()).append(".toHexString").
append(s1).append(", hash: ").append(this.hashCode()).append("\n");
sb.append(" MemoryRequest: ");
if (memReq_ != null) {
sb.append(memReq_.getClass().getSimpleName()).append(", hash: ").append(memReq_.hashCode());
} else sb.append("null");
return toHex(sb.toString(), offsetBytes, lengthBytes);
}

Expand All @@ -586,15 +601,28 @@ public ByteBuffer byteBuffer() {
}

/**
* Frees this Memory. If direct, off-heap native memory is allocated via the AllocMemory
* sub-class this method must be called in either the NativeMemory class or the AllocMemory class.
* Returns true if this NativeMemory is accessing native (off-heap) memory directly.
* This includes the case of a Direct ByteBuffer.
* @return true if this NativeMemory is accessing native (off-heap) memory directly.
*/
public boolean isDirect() {
return nativeRawStartAddress_ > 0;
}

/**
* This frees this Memory only if it is required. This always sets the capacity to zero
* and the reference to MemoryRequest to null, which effectively disables this class.
* However,
*
* It is always safe to call this method when you are done with this class.
*/
public void freeMemory() {
if (requiresFree()) {
unsafe.freeMemory(nativeRawStartAddress_);
nativeRawStartAddress_ = 0L;
}
capacityBytes_ = 0L;
memReq_ = null;
}

/**
Expand Down Expand Up @@ -664,7 +692,7 @@ private String toHex(String header, long offsetBytes, int lengthBytes) {
* @return true if the object should be freed when it is no longer needed
*/
protected boolean requiresFree() {
return nativeRawStartAddress_ != 0L && (byteBuf_ == null);
return (nativeRawStartAddress_ != 0L) && (byteBuf_ == null);
}

}
Loading

0 comments on commit a900b7c

Please sign in to comment.