diff --git a/cpp/include/cudf/contiguous_split.hpp b/cpp/include/cudf/contiguous_split.hpp index 5fe4e738714..bf10f1fd489 100644 --- a/cpp/include/cudf/contiguous_split.hpp +++ b/cpp/include/cudf/contiguous_split.hpp @@ -28,7 +28,7 @@ namespace cudf { * @addtogroup column_copy * @{ * @file - * @brief Table APIs for contiguous_split, pack, unpack, and metadadata + * @brief Table APIs for contiguous_split, pack, unpack, and metadata */ /** diff --git a/cpp/include/cudf/detail/utilities/cuda.cuh b/cpp/include/cudf/detail/utilities/cuda.cuh index c95189f1f94..264302df0e9 100644 --- a/cpp/include/cudf/detail/utilities/cuda.cuh +++ b/cpp/include/cudf/detail/utilities/cuda.cuh @@ -92,6 +92,29 @@ class grid_1d { { return global_thread_id(threadIdx.x, blockIdx.x, blockDim.x); } + + /** + * @brief Returns the stride of a 1D grid. + * + * The returned stride is the total number of threads in the grid. + * + * @param thread_id The thread index within the block + * @param block_id The block index within the grid + * @param num_threads_per_block The number of threads per block + * @return thread_index_type The global thread index + */ + static constexpr thread_index_type grid_stride(thread_index_type num_threads_per_block, + thread_index_type num_blocks_per_grid) + { + return num_threads_per_block * num_blocks_per_grid; + } + + /** + * @brief Returns the stride of the current 1D grid. + * + * @return thread_index_type The number of threads in the grid. + */ + static __device__ thread_index_type grid_stride() { return grid_stride(blockDim.x, gridDim.x); } }; /** diff --git a/cpp/include/cudf/detail/valid_if.cuh b/cpp/include/cudf/detail/valid_if.cuh index bed884a23eb..f3f95dad017 100644 --- a/cpp/include/cudf/detail/valid_if.cuh +++ b/cpp/include/cudf/detail/valid_if.cuh @@ -49,8 +49,8 @@ __global__ void valid_if_kernel( { constexpr size_type leader_lane{0}; auto const lane_id{threadIdx.x % warp_size}; - thread_index_type i = threadIdx.x + blockIdx.x * blockDim.x; - thread_index_type const stride = blockDim.x * gridDim.x; + auto i = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); size_type warp_valid_count{0}; auto active_mask = __ballot_sync(0xFFFF'FFFFu, i < size); diff --git a/cpp/src/bitmask/null_mask.cu b/cpp/src/bitmask/null_mask.cu index 33dc7e0556b..5a0d3e4f120 100644 --- a/cpp/src/bitmask/null_mask.cu +++ b/cpp/src/bitmask/null_mask.cu @@ -108,7 +108,7 @@ __global__ void set_null_mask_kernel(bitmask_type* __restrict__ destination, thread_index_type const last_word = word_index(end_bit) - word_index(begin_bit); bitmask_type fill_value = valid ? 0xffff'ffff : 0; - thread_index_type const stride = blockDim.x * gridDim.x; + auto const stride = cudf::detail::grid_1d::grid_stride(); for (thread_index_type destination_word_index = grid_1d::global_thread_id(); destination_word_index < number_of_mask_words; @@ -191,7 +191,7 @@ __global__ void copy_offset_bitmask(bitmask_type* __restrict__ destination, size_type source_end_bit, size_type number_of_mask_words) { - thread_index_type const stride = blockDim.x * gridDim.x; + auto const stride = cudf::detail::grid_1d::grid_stride(); for (thread_index_type destination_word_index = grid_1d::global_thread_id(); destination_word_index < number_of_mask_words; destination_word_index += stride) { @@ -265,7 +265,7 @@ __global__ void count_set_bits_kernel(bitmask_type const* bitmask, auto const first_word_index{word_index(first_bit_index)}; auto const last_word_index{word_index(last_bit_index)}; thread_index_type const tid = grid_1d::global_thread_id(); - thread_index_type const stride = blockDim.x * gridDim.x; + thread_index_type const stride = grid_1d::grid_stride(); thread_index_type thread_word_index = tid + first_word_index; size_type thread_count{0}; diff --git a/cpp/src/copying/scatter.cu b/cpp/src/copying/scatter.cu index 1cfde7e6d72..2b05c282261 100644 --- a/cpp/src/copying/scatter.cu +++ b/cpp/src/copying/scatter.cu @@ -52,8 +52,8 @@ __global__ void marking_bitmask_kernel(mutable_column_device_view destination, MapIterator scatter_map, size_type num_scatter_rows) { - thread_index_type row = threadIdx.x + blockIdx.x * blockDim.x; - thread_index_type const stride = blockDim.x * gridDim.x; + auto row = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); while (row < num_scatter_rows) { size_type const output_row = scatter_map[row]; diff --git a/cpp/src/partitioning/partitioning.cu b/cpp/src/partitioning/partitioning.cu index ff9c4ea2f59..7b6676346c2 100644 --- a/cpp/src/partitioning/partitioning.cu +++ b/cpp/src/partitioning/partitioning.cu @@ -134,8 +134,8 @@ __global__ void compute_row_partition_numbers(row_hasher_t the_hasher, // Accumulate histogram of the size of each partition in shared memory extern __shared__ size_type shared_partition_sizes[]; - auto tid = cudf::thread_index_type{threadIdx.x} + - cudf::thread_index_type{blockIdx.x} * cudf::thread_index_type{blockDim.x}; + auto tid = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); // Initialize local histogram size_type partition_number = threadIdx.x; @@ -160,7 +160,7 @@ __global__ void compute_row_partition_numbers(row_hasher_t the_hasher, row_partition_offset[row_number] = atomicAdd(&(shared_partition_sizes[partition_number]), size_type(1)); - tid += cudf::thread_index_type{blockDim.x} * cudf::thread_index_type{gridDim.x}; + tid += stride; } __syncthreads(); @@ -215,8 +215,8 @@ __global__ void compute_row_output_locations(size_type* __restrict__ row_partiti } __syncthreads(); - auto tid = cudf::thread_index_type{threadIdx.x} + - cudf::thread_index_type{blockIdx.x} * cudf::thread_index_type{blockDim.x}; + auto tid = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); // Get each row's partition number, and get it's output location by // incrementing block's offset counter for that partition number @@ -234,7 +234,7 @@ __global__ void compute_row_output_locations(size_type* __restrict__ row_partiti // Store the row's output location in-place row_partition_numbers[row_number] = row_output_location; - tid += cudf::thread_index_type{blockDim.x} * cudf::thread_index_type{gridDim.x}; + tid += stride; } } @@ -311,10 +311,8 @@ __global__ void copy_block_partitions(InputIter input_iter, __syncthreads(); // Fetch the input data to shared memory - for (auto tid = cudf::thread_index_type{threadIdx.x} + - cudf::thread_index_type{blockIdx.x} * cudf::thread_index_type{blockDim.x}; - tid < num_rows; - tid += cudf::thread_index_type{blockDim.x} * cudf::thread_index_type{gridDim.x}) { + for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < num_rows; + tid += cudf::detail::grid_1d::grid_stride()) { auto const row_number = static_cast(tid); size_type const ipartition = row_partition_numbers[row_number]; diff --git a/cpp/src/replace/nulls.cu b/cpp/src/replace/nulls.cu index e033db0e52a..5b9fd3d9f0f 100644 --- a/cpp/src/replace/nulls.cu +++ b/cpp/src/replace/nulls.cu @@ -64,9 +64,9 @@ __global__ void replace_nulls_strings(cudf::column_device_view input, char* chars, cudf::size_type* valid_counter) { - cudf::size_type nrows = input.size(); - cudf::thread_index_type i = blockIdx.x * blockDim.x + threadIdx.x; - cudf::thread_index_type const stride = blockDim.x * gridDim.x; + cudf::size_type nrows = input.size(); + auto i = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); uint32_t active_mask = 0xffff'ffff; active_mask = __ballot_sync(active_mask, i < nrows); @@ -117,9 +117,9 @@ __global__ void replace_nulls(cudf::column_device_view input, cudf::mutable_column_device_view output, cudf::size_type* output_valid_count) { - cudf::size_type nrows = input.size(); - cudf::thread_index_type i = blockIdx.x * blockDim.x + threadIdx.x; - cudf::thread_index_type const stride = blockDim.x * gridDim.x; + cudf::size_type nrows = input.size(); + auto i = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); uint32_t active_mask = 0xffff'ffff; active_mask = __ballot_sync(active_mask, i < nrows); diff --git a/cpp/src/transform/compute_column.cu b/cpp/src/transform/compute_column.cu index 61293d51ba2..224dd93b048 100644 --- a/cpp/src/transform/compute_column.cu +++ b/cpp/src/transform/compute_column.cu @@ -69,9 +69,8 @@ __launch_bounds__(max_block_size) __global__ auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * device_expression_data.num_intermediates]; - auto const start_idx = - static_cast(threadIdx.x + blockIdx.x * blockDim.x); - auto const stride = static_cast(blockDim.x * gridDim.x); + auto start_idx = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); auto evaluator = cudf::ast::detail::expression_evaluator(table, device_expression_data); diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java b/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java new file mode 100644 index 00000000000..72c2e659372 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +/** + * Represents some amount of host memory that has been reserved. A reservation guarantees that one + * or more allocations up to the reserved amount, minus padding for alignment will succeed. A + * reservation typically guarantees the amount can be allocated one, meaning when a buffer + * allocated from a reservation is freed it is not returned to the reservation, but to the pool of + * memory the reservation originally came from. If more memory is allocated from the reservation + * an OutOfMemoryError may be thrown, but it is not guaranteed to happen. + * + * When the reservation is closed any unused reservation will be returned to the pool of memory + * the reservation came from. + */ +public interface HostMemoryReservation extends HostMemoryAllocator, AutoCloseable {} diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 969946a9533..9ce72ba237e 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,13 +37,14 @@ */ public final class PinnedMemoryPool implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(PinnedMemoryPool.class); - private static final long ALIGNMENT = 8; + private static final long ALIGNMENT = ColumnView.hostPaddingSizeInBytes(); // These static fields should only ever be accessed when class-synchronized. // Do NOT use singleton_ directly! Use the getSingleton accessor instead. private static volatile PinnedMemoryPool singleton_ = null; private static Future initFuture = null; + private final long totalPoolSize; private final long pinnedPoolBase; private final SortedSet freeHeap = new TreeSet<>(new SortedByAddress()); private int numAllocatedSections = 0; @@ -164,6 +165,14 @@ private static void freeInternal(MemorySection section) { Objects.requireNonNull(getSingleton()).free(section); } + /** + * Used to indicate that memory was allocated from a reservation. This primarily is for + * keeping track of outstanding allocations. + */ + private static void reserveAllocInternal(MemorySection section) { + Objects.requireNonNull(getSingleton()).reserveAllocHappened(section); + } + /** * Initialize the pool. * @@ -226,6 +235,21 @@ public static HostMemoryBuffer tryAllocate(long bytes) { return result; } + /** + * Factory method to create a pinned host memory reservation. + * + * @param bytes size in bytes to reserve + * @return newly created reservation or null if insufficient pinned memory to cover it. + */ + public static HostMemoryReservation tryReserve(long bytes) { + HostMemoryReservation result = null; + PinnedMemoryPool pool = getSingleton(); + if (pool != null) { + result = pool.tryReserveInternal(bytes); + } + return result; + } + /** * Factory method to create a host buffer but preferably pointing to pinned memory. * It is not guaranteed that the returned buffer will be pointer to pinned memory. @@ -233,7 +257,7 @@ public static HostMemoryBuffer tryAllocate(long bytes) { * @param bytes size in bytes to allocate * @return newly created buffer */ - public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMemoryAllocator) { + public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMemoryAllocator) { HostMemoryBuffer result = tryAllocate(bytes); if (result == null) { result = hostMemoryAllocator.allocate(bytes, false); @@ -241,6 +265,13 @@ public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMem return result; } + /** + * Factory method to create a host buffer but preferably pointing to pinned memory. + * It is not guaranteed that the returned buffer will be pointer to pinned memory. + * + * @param bytes size in bytes to allocate + * @return newly created buffer + */ public static HostMemoryBuffer allocate(long bytes) { return allocate(bytes, DefaultHostMemoryAllocator.get()); } @@ -258,12 +289,24 @@ public static long getAvailableBytes() { return 0; } + /** + * Get the number of bytes that the pinned memory pool was allocated with. + */ + public static long getTotalPoolSizeBytes() { + PinnedMemoryPool pool = getSingleton(); + if (pool != null) { + return pool.getTotalPoolSizeInternal(); + } + return 0; + } + private PinnedMemoryPool(long poolSize, int gpuId) { if (gpuId > -1) { // set the gpu device to use Cuda.setDevice(gpuId); Cuda.freeZero(); } + this.totalPoolSize = poolSize; this.pinnedPoolBase = Cuda.hostAllocPinned(poolSize); freeHeap.add(new MemorySection(pinnedPoolBase, poolSize)); this.availableBytes = poolSize; @@ -271,32 +314,42 @@ private PinnedMemoryPool(long poolSize, int gpuId) { @Override public void close() { - assert numAllocatedSections == 0; + assert numAllocatedSections == 0 : "Leaked " + numAllocatedSections + " pinned allocations"; Cuda.freePinned(pinnedPoolBase); } - private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { + /** + * Pads a length of bytes to the alignment the CPU wants in the worst case. This helps to + * calculate the size needed for a reservation if there are multiple buffers. + * @param bytes the size in bytes + * @return the new padded size in bytes. + */ + public static long padToCpuAlignment(long bytes) { + return ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT; + } + + private synchronized MemorySection tryGetInternal(long bytes, String what) { if (freeHeap.isEmpty()) { log.debug("No free pinned memory left"); return null; } // Align the allocation - long alignedBytes = ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT; + long alignedBytes = padToCpuAlignment(bytes); Optional firstFit = freeHeap.stream() - .filter(section -> section.size >= alignedBytes) - .findFirst(); + .filter(section -> section.size >= alignedBytes) + .findFirst(); if (!firstFit.isPresent()) { if (log.isDebugEnabled()) { MemorySection largest = freeHeap.stream() - .max(new SortedBySize()) - .orElse(new MemorySection(0, 0)); + .max(new SortedBySize()) + .orElse(new MemorySection(0, 0)); log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size); } return null; } MemorySection first = firstFit.get(); - log.debug("Allocating {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}", - bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections); + log.debug("{} {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}", + what, bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections); freeHeap.remove(first); MemorySection allocated; if (first.size == alignedBytes) { @@ -307,9 +360,74 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { } numAllocatedSections++; availableBytes -= allocated.size; - log.debug("Allocated {} free {} outstanding {}", allocated, freeHeap, numAllocatedSections); - return new HostMemoryBuffer(allocated.baseAddress, bytes, - new PinnedHostBufferCleaner(allocated, bytes)); + log.debug("{} {} free {} outstanding {}", what, allocated, freeHeap, numAllocatedSections); + return allocated; + } + + private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { + MemorySection allocated = tryGetInternal(bytes, "allocate"); + if (allocated == null) { + return null; + } else { + return new HostMemoryBuffer(allocated.baseAddress, bytes, + new PinnedHostBufferCleaner(allocated, bytes)); + } + } + + private class PinnedReservation implements HostMemoryReservation { + private MemorySection section = null; + + public PinnedReservation(MemorySection section) { + this.section = section; + } + + @Override + public synchronized HostMemoryBuffer allocate(long bytes, boolean preferPinned) { + return this.allocate(bytes); + } + + @Override + public synchronized HostMemoryBuffer allocate(long bytes) { + if (section == null || section.size < bytes) { + throw new OutOfMemoryError("Reservation didn't have enough space " + bytes + " / " + + (section == null ? 0 : section.size)); + } + long alignedSize = padToCpuAlignment(bytes); + MemorySection allocated; + if (section.size >= bytes && section.size <= alignedSize) { + allocated = section; + section = null; + // No need for reserveAllocInternal because the original section is already tracked + } else { + allocated = section.splitOff(alignedSize); + PinnedMemoryPool.reserveAllocInternal(allocated); + } + return new HostMemoryBuffer(allocated.baseAddress, bytes, + new PinnedHostBufferCleaner(allocated, bytes)); + } + + @Override + public synchronized void close() throws Exception { + if (section != null) { + try { + PinnedMemoryPool.freeInternal(section); + } finally { + // Always mark the resource as freed even if an exception is thrown. + // We cannot know how far it progressed before the exception, and + // therefore it is unsafe to retry. + section = null; + } + } + } + } + + private HostMemoryReservation tryReserveInternal(long bytes) { + MemorySection allocated = tryGetInternal(bytes, "allocate"); + if (allocated == null) { + return null; + } else { + return new PinnedReservation(allocated); + } } private synchronized void free(MemorySection section) { @@ -328,7 +446,17 @@ private synchronized void free(MemorySection section) { log.debug("After freeing {} outstanding {}", freeHeap, numAllocatedSections); } + private synchronized void reserveAllocHappened(MemorySection section) { + if (section != null && section.size > 0) { + numAllocatedSections++; + } + } + private synchronized long getAvailableBytesInternal() { return this.availableBytes; } + + private long getTotalPoolSizeInternal() { + return this.totalPoolSize; + } } diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 38b07eca330..b300c55b537 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -313,9 +313,10 @@ def dtypes(self): 3 object int64 """ index = self.grouping.keys.unique().sort_values().to_pandas() + obj_dtypes = self.obj._dtypes return pd.DataFrame( { - name: [self.obj._dtypes[name]] * len(index) + name: [obj_dtypes[name]] * len(index) for name in self.grouping.values._column_names }, index=index, diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 4c6eb3a50e9..33ac97d7ef8 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -822,11 +822,7 @@ def replace( ) = _get_replacement_values_for_columns( to_replace=to_replace, value=value, - # TODO: This should be replaced with `DataFrame._dtypes` once - # that is moved up to `Frame`. - columns_dtype_map={ - col: self._data[col].dtype for col in self._data - }, + columns_dtype_map=self._dtypes, ) for name, col in self._data.items(): diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 95e0aa18070..bacc0641639 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -123,11 +123,12 @@ def read_csv( if dtype is None or isinstance(dtype, abc.Mapping): # There exists some dtypes in the result columns that is inferred. # Find them and map them to the default dtypes. - dtype = {} if dtype is None else dtype + specified_dtypes = {} if dtype is None else dtype + df_dtypes = df._dtypes unspecified_dtypes = { - name: df._dtypes[name] + name: df_dtypes[name] for name in df._column_names - if name not in dtype + if name not in specified_dtypes } default_dtypes = {} diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 4de9a92a068..efac24aee17 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -158,11 +158,12 @@ def read_json( if dtype is True or isinstance(dtype, abc.Mapping): # There exists some dtypes in the result columns that is inferred. # Find them and map them to the default dtypes. - dtype = {} if dtype is True else dtype + specified_dtypes = {} if dtype is True else dtype + df_dtypes = df._dtypes unspecified_dtypes = { - name: df._dtypes[name] + name: df_dtypes[name] for name in df._column_names - if name not in dtype + if name not in specified_dtypes } default_dtypes = {}