Skip to content

Commit

Permalink
Add Java APIs to copy column data to host asynchronously (#16429)
Browse files Browse the repository at this point in the history
Adds Java methods to ColumnView to allow copying of column data to host memory asynchronously.  This can be used to avoid many unnecessary stream synchronization when copying many columns to the host.

Authors:
  - Jason Lowe (https://github.com/jlowe)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Robert (Bobby) Evans (https://github.com/revans2)

URL: #16429
  • Loading branch information
jlowe authored Jul 30, 2024
1 parent 7b3e73a commit 8def2ec
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
52 changes: 35 additions & 17 deletions java/src/main/java/ai/rapids/cudf/ColumnView.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -5034,8 +5034,8 @@ private static NestedColumnVector createNestedColumnVector(DType type, long rows
// DATA MOVEMENT
/////////////////////////////////////////////////////////////////////////////

private static HostColumnVectorCore copyToHostNestedHelper(
ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) {
private static HostColumnVectorCore copyToHostAsyncNestedHelper(
Cuda.Stream stream, ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) {
if (deviceCvPointer == null) {
return null;
}
Expand All @@ -5056,20 +5056,20 @@ private static HostColumnVectorCore copyToHostNestedHelper(
currValidity = deviceCvPointer.getValid();
if (currData != null) {
hostData = hostMemoryAllocator.allocate(currData.length);
hostData.copyFromDeviceBuffer(currData);
hostData.copyFromDeviceBufferAsync(currData, stream);
}
if (currValidity != null) {
hostValid = hostMemoryAllocator.allocate(currValidity.length);
hostValid.copyFromDeviceBuffer(currValidity);
hostValid.copyFromDeviceBufferAsync(currValidity, stream);
}
if (currOffsets != null) {
hostOffsets = hostMemoryAllocator.allocate(currOffsets.length);
hostOffsets.copyFromDeviceBuffer(currOffsets);
hostOffsets.copyFromDeviceBufferAsync(currOffsets, stream);
}
int numChildren = deviceCvPointer.getNumChildren();
for (int i = 0; i < numChildren; i++) {
try(ColumnView childDevPtr = deviceCvPointer.getChildColumnView(i)) {
children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator));
children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator));
}
}
currNullCount = deviceCvPointer.getNullCount();
Expand Down Expand Up @@ -5103,11 +5103,20 @@ private static HostColumnVectorCore copyToHostNestedHelper(
}
}

/** Copy the data to the host synchronously. */
public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
HostColumnVector result = copyToHostAsync(Cuda.DEFAULT_STREAM, hostMemoryAllocator);
Cuda.DEFAULT_STREAM.sync();
return result;
}

/**
* Copy the data to the host.
* Copy the data to the host asynchronously. The caller MUST synchronize on the stream
* before examining the result.
*/
public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
try (NvtxRange toHost = new NvtxRange("ensureOnHost", NvtxColor.BLUE)) {
public HostColumnVector copyToHostAsync(Cuda.Stream stream,
HostMemoryAllocator hostMemoryAllocator) {
try (NvtxRange toHost = new NvtxRange("toHostAsync", NvtxColor.BLUE)) {
HostMemoryBuffer hostDataBuffer = null;
HostMemoryBuffer hostValidityBuffer = null;
HostMemoryBuffer hostOffsetsBuffer = null;
Expand All @@ -5127,16 +5136,16 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
if (!type.isNestedType()) {
if (valid != null) {
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream);
}
if (offsets != null) {
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length);
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream);
}
// If a strings column is all null values there is no data buffer allocated
if (data != null) {
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
hostDataBuffer.copyFromDeviceBufferAsync(data, stream);
}
HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount),
hostDataBuffer, hostValidityBuffer, hostOffsetsBuffer);
Expand All @@ -5145,21 +5154,21 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
} else {
if (data != null) {
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
hostDataBuffer.copyFromDeviceBufferAsync(data, stream);
}

if (valid != null) {
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream);
}
if (offsets != null) {
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength());
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream);
}
List<HostColumnVectorCore> children = new ArrayList<>();
for (int i = 0; i < getNumChildren(); i++) {
try (ColumnView childDevPtr = getChildColumnView(i)) {
children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator));
children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator));
}
}
HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount),
Expand Down Expand Up @@ -5192,10 +5201,19 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
}
}

/** Copy the data to host memory synchronously */
public HostColumnVector copyToHost() {
return copyToHost(DefaultHostMemoryAllocator.get());
}

/**
* Copy the data to the host asynchronously. The caller MUST synchronize on the stream
* before examining the result.
*/
public HostColumnVector copyToHostAsync(Cuda.Stream stream) {
return copyToHostAsync(stream, DefaultHostMemoryAllocator.get());
}

/**
* Calculate the total space required to copy the data to the host. This should be padded to
* the alignment that the CPU requires.
Expand Down
4 changes: 4 additions & 0 deletions java/src/main/java/ai/rapids/cudf/HostColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public interface EventHandler {
public HostColumnVector(DType type, long rows, Optional<Long> nullCount,
HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer,
HostMemoryBuffer offsetBuffer, List<HostColumnVectorCore> nestedHcv) {
// NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be
// asynchronously written by the device.
super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, nestedHcv);
refCount = 0;
incRefCountInternal(true);
Expand All @@ -100,6 +102,8 @@ public HostColumnVector(DType type, long rows, Optional<Long> nullCount,
HostColumnVector(DType type, long rows, Optional<Long> nullCount,
HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer,
HostMemoryBuffer offsetBuffer) {
// NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be
// asynchronously written by the device.
super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, new ArrayList<>());
assert !type.equals(DType.LIST) : "This constructor should not be used for list type";
if (nullCount.isPresent() && nullCount.get() > 0 && hostValidityBuffer == null) {
Expand Down
4 changes: 3 additions & 1 deletion java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,8 @@ public class HostColumnVectorCore implements AutoCloseable {
public HostColumnVectorCore(DType type, long rows,
Optional<Long> nullCount, HostMemoryBuffer data, HostMemoryBuffer validity,
HostMemoryBuffer offsets, List<HostColumnVectorCore> nestedChildren) {
// NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be
// asynchronously written by the device.
this.offHeap = new OffHeapState(data, validity, offsets);
MemoryCleaner.register(this, offHeap);
this.type = type;
Expand Down
5 changes: 3 additions & 2 deletions java/src/main/java/ai/rapids/cudf/JCudfSerialization.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -907,8 +907,9 @@ private static ColumnBufferProvider[] providersFrom(ColumnVector[] columns) {
boolean success = false;
try {
for (int i = 0; i < columns.length; i++) {
onHost[i] = columns[i].copyToHost();
onHost[i] = columns[i].copyToHostAsync(Cuda.DEFAULT_STREAM);
}
Cuda.DEFAULT_STREAM.sync();
ColumnBufferProvider[] ret = providersFrom(onHost, true);
success = true;
return ret;
Expand Down

0 comments on commit 8def2ec

Please sign in to comment.