Skip to content

Commit

Permalink
Rework cudf::find_and_replace_all to use gather-based make_strings_co…
Browse files Browse the repository at this point in the history
…lumn (#15305)

Reworks `cudf::find_and_replace_all` for strings to work with long strings and enable it to support large strings.
The custom kernels were replaced with a gather-based `make_strings_column` already optimized for long and short strings.
Large strings will automatically be supported in `make_strings_column` in a future PR.

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Paul Mattione (https://github.com/pmattione-nvidia)
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)
  - Bradley Dice (https://github.com/bdice)

URL: #15305
  • Loading branch information
davidwendt authored Mar 22, 2024
1 parent 80a02c6 commit b29fc1d
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 215 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ add_library(
src/strings/regex/regex_program.cpp
src/strings/repeat_strings.cu
src/strings/replace/backref_re.cu
src/strings/replace/find_replace.cu
src/strings/replace/multi.cu
src/strings/replace/multi_re.cu
src/strings/replace/replace.cu
Expand Down
18 changes: 18 additions & 0 deletions cpp/include/cudf/strings/detail/replace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ std::unique_ptr<column> replace_slice(strings_column_view const& strings,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Return a copy of `input` replacing any `values_to_replace[i]`
* found with `replacement_values[i]`
*
* @param input The column to find and replace values
* @param values_to_replace The values to find
* @param replacement_values The corresponding replacement values
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return Copy of `input` with specified values replaced
*/
std::unique_ptr<cudf::column> find_and_replace_all(
cudf::strings_column_view const& input,
cudf::strings_column_view const& values_to_replace,
cudf::strings_column_view const& replacement_values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace strings
} // namespace cudf
212 changes: 3 additions & 209 deletions cpp/src/replace/replace.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/replace.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/replace.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/type_dispatcher.hpp>
Expand All @@ -57,7 +57,6 @@
#include <thrust/execution_policy.h>
#include <thrust/find.h>
#include <thrust/pair.h>
#include <thrust/tuple.h>

namespace { // anonymous

Expand Down Expand Up @@ -87,140 +86,6 @@ __device__ auto get_new_value(cudf::size_type idx,
return thrust::make_pair(new_value, output_is_valid);
}

__device__ int get_new_string_value(cudf::size_type idx,
cudf::column_device_view& input,
cudf::column_device_view& values_to_replace,
cudf::column_device_view&)
{
cudf::string_view input_string = input.element<cudf::string_view>(idx);
int match = -1;
for (int i = 0; i < values_to_replace.size(); i++) {
cudf::string_view value_string = values_to_replace.element<cudf::string_view>(i);
if (input_string == value_string) {
match = i;
break;
}
}
return match;
}

/**
* @brief Kernel which does the first pass of strings replace.
*
* It computes the output null_mask, null_count, and the offsets.
*
* @param input The input column to replace strings in.
* @param values_to_replace The string values to replace.
* @param replacement The replacement values.
* @param offsets The column which will contain the offsets of the new string column
* @param indices Temporary column used to store the replacement indices
* @param output_valid The output null_mask
* @param output_valid_count The output valid count
*/
template <bool input_has_nulls, bool replacement_has_nulls>
CUDF_KERNEL void replace_strings_first_pass(cudf::column_device_view input,
cudf::column_device_view values_to_replace,
cudf::column_device_view replacement,
cudf::mutable_column_device_view offsets,
cudf::mutable_column_device_view indices,
cudf::bitmask_type* output_valid,
cudf::size_type* __restrict__ output_valid_count)
{
cudf::size_type nrows = input.size();
auto tid = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();
uint32_t active_mask = 0xffff'ffffu;
active_mask = __ballot_sync(active_mask, tid < nrows);
auto const lane_id{threadIdx.x % cudf::detail::warp_size};
uint32_t valid_sum{0};

while (tid < nrows) {
auto const idx = static_cast<cudf::size_type>(tid);
bool input_is_valid = true;

if (input_has_nulls) input_is_valid = input.is_valid_nocheck(idx);
bool output_is_valid = input_is_valid;

if (input_is_valid) {
int result = get_new_string_value(idx, input, values_to_replace, replacement);
cudf::string_view output = (result == -1) ? input.element<cudf::string_view>(idx)
: replacement.element<cudf::string_view>(result);
offsets.data<cudf::size_type>()[idx] = output.size_bytes();
indices.data<cudf::size_type>()[idx] = result;
if (replacement_has_nulls && result != -1) {
output_is_valid = replacement.is_valid_nocheck(result);
}
} else {
offsets.data<cudf::size_type>()[idx] = 0;
indices.data<cudf::size_type>()[idx] = -1;
}

uint32_t bitmask = __ballot_sync(active_mask, output_is_valid);
if (0 == lane_id) {
output_valid[cudf::word_index(idx)] = bitmask;
valid_sum += __popc(bitmask);
}

tid += stride;
active_mask = __ballot_sync(active_mask, tid < nrows);
}

// Compute total valid count for this block and add it to global count
uint32_t block_valid_count = cudf::detail::single_lane_block_sum_reduce<BLOCK_SIZE, 0>(valid_sum);
// one thread computes and adds to output_valid_count
if (threadIdx.x == 0) {
atomicAdd(output_valid_count, static_cast<cudf::size_type>(block_valid_count));
}
}

/**
* @brief Kernel which does the second pass of strings replace.
*
* It copies the string data needed from input and replacement into the new strings column chars
* column.
*
* @param input The input column
* @param replacement The replacement values
* @param offsets The offsets column of the new strings column
* @param strings The chars column of the new strings column
* @param indices Temporary column used to store the replacement indices.
*/
template <bool input_has_nulls, bool replacement_has_nulls>
CUDF_KERNEL void replace_strings_second_pass(cudf::column_device_view input,
cudf::column_device_view replacement,
cudf::mutable_column_device_view offsets,
char* strings,
cudf::mutable_column_device_view indices)
{
cudf::size_type nrows = input.size();
auto tid = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();

while (tid < nrows) {
auto const idx = static_cast<cudf::size_type>(tid);
auto const replace_idx = indices.element<cudf::size_type>(idx);
bool output_is_valid = true;
bool input_is_valid = true;

if (input_has_nulls) {
input_is_valid = input.is_valid_nocheck(idx);
output_is_valid = input_is_valid;
}
if (replacement_has_nulls && replace_idx != -1) {
output_is_valid = replacement.is_valid_nocheck(replace_idx);
}
if (output_is_valid) {
cudf::string_view output = (replace_idx == -1)
? input.element<cudf::string_view>(idx)
: replacement.element<cudf::string_view>(replace_idx);
std::memcpy(
strings + offsets.data<cudf::size_type>()[idx], output.data(), output.size_bytes());
}

tid += stride;
}
}

/**
* @brief Kernel that replaces elements from `output_data` given the following
* rule: replace all `values_to_replace[i]` in [values_to_replace_begin`,
Expand Down Expand Up @@ -375,79 +240,8 @@ std::unique_ptr<cudf::column> replace_kernel_forwarder::operator()<cudf::string_
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
rmm::device_scalar<cudf::size_type> valid_counter(0, stream);
cudf::size_type* valid_count = valid_counter.data();

auto replace_first = replace_strings_first_pass<true, false>;
auto replace_second = replace_strings_second_pass<true, false>;
if (input_col.has_nulls()) {
if (replacement_values.has_nulls()) {
replace_first = replace_strings_first_pass<true, true>;
replace_second = replace_strings_second_pass<true, true>;
}
} else {
if (replacement_values.has_nulls()) {
replace_first = replace_strings_first_pass<false, true>;
replace_second = replace_strings_second_pass<false, true>;
} else {
replace_first = replace_strings_first_pass<false, false>;
replace_second = replace_strings_second_pass<false, false>;
}
}

// Create new offsets column to use in kernel
std::unique_ptr<cudf::column> sizes =
cudf::make_numeric_column(cudf::data_type{cudf::type_to_id<cudf::size_type>()},
input_col.size(),
cudf::mask_state::UNALLOCATED,
stream);
std::unique_ptr<cudf::column> indices =
cudf::make_numeric_column(cudf::data_type{cudf::type_to_id<cudf::size_type>()},
input_col.size(),
cudf::mask_state::UNALLOCATED,
stream);

auto sizes_view = sizes->mutable_view();
auto indices_view = indices->mutable_view();

auto device_in = cudf::column_device_view::create(input_col, stream);
auto device_values_to_replace = cudf::column_device_view::create(values_to_replace, stream);
auto device_replacement = cudf::column_device_view::create(replacement_values, stream);
auto device_sizes = cudf::mutable_column_device_view::create(sizes_view, stream);
auto device_indices = cudf::mutable_column_device_view::create(indices_view, stream);

rmm::device_buffer valid_bits =
cudf::detail::create_null_mask(input_col.size(), cudf::mask_state::UNINITIALIZED, stream, mr);

// Call first pass kernel to get sizes in offsets
cudf::detail::grid_1d grid{input_col.size(), BLOCK_SIZE, 1};
replace_first<<<grid.num_blocks, BLOCK_SIZE, 0, stream.value()>>>(
*device_in,
*device_values_to_replace,
*device_replacement,
*device_sizes,
*device_indices,
reinterpret_cast<cudf::bitmask_type*>(valid_bits.data()),
valid_count);

auto [offsets, bytes] = cudf::detail::make_offsets_child_column(
sizes_view.begin<cudf::size_type>(), sizes_view.end<cudf::size_type>(), stream, mr);
auto offsets_view = offsets->mutable_view();
auto device_offsets = cudf::mutable_column_device_view::create(offsets_view, stream);

// Allocate chars array and output null mask
cudf::size_type null_count = input_col.size() - valid_counter.value(stream);
rmm::device_uvector<char> output_chars(bytes, stream, mr);
auto d_chars = output_chars.data();

replace_second<<<grid.num_blocks, BLOCK_SIZE, 0, stream.value()>>>(
*device_in, *device_replacement, *device_offsets, d_chars, *device_indices);

return cudf::make_strings_column(input_col.size(),
std::move(offsets),
output_chars.release(),
null_count,
std::move(valid_bits));
return cudf::strings::detail::find_and_replace_all(
input_col, values_to_replace, replacement_values, stream, mr);
}

template <>
Expand Down
87 changes: 87 additions & 0 deletions cpp/src/strings/replace/find_replace.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/replace.hpp>
#include <cudf/strings/detail/strings_column_factories.cuh>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>

#include <thrust/execution_policy.h>
#include <thrust/transform.h>

namespace cudf {
namespace strings {
namespace detail {
namespace {
struct find_replace_fn {
column_device_view d_input;
column_device_view d_values;
column_device_view d_replacements;

__device__ string_index_pair get_replacement(size_type idx)
{
if (d_replacements.is_null(idx)) { return string_index_pair{nullptr, 0}; }
auto const d_str = d_replacements.element<string_view>(idx);
return string_index_pair{d_str.data(), d_str.size_bytes()};
}

__device__ string_index_pair operator()(size_type idx)
{
if (d_input.is_null(idx)) { return string_index_pair{nullptr, 0}; }
auto const d_str = d_input.element<string_view>(idx);
// find d_str in d_values
// if found return corresponding replacement
// if not found, return d_str
auto const begin = thrust::counting_iterator<size_type>(0);
auto const end = thrust::counting_iterator<size_type>(d_values.size());
auto const itr =
thrust::find_if(thrust::seq, begin, end, [d_values = d_values, d_str](size_type i) -> bool {
return d_str == d_values.element<string_view>(i);
});
return itr == end ? string_index_pair{d_str.data(), d_str.size_bytes()} : get_replacement(*itr);
}
};

} // namespace

std::unique_ptr<cudf::column> find_and_replace_all(
cudf::strings_column_view const& input,
cudf::strings_column_view const& values_to_replace,
cudf::strings_column_view const& replacement_values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto d_input = cudf::column_device_view::create(input.parent(), stream);
auto d_values_to_replace = cudf::column_device_view::create(values_to_replace.parent(), stream);
auto d_replacements = cudf::column_device_view::create(replacement_values.parent(), stream);

auto indices = rmm::device_uvector<string_index_pair>(input.size(), stream);

thrust::transform(rmm::exec_policy_nosync(stream),
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(input.size()),
indices.begin(),
find_replace_fn{*d_input, *d_values_to_replace, *d_replacements});

return make_strings_column(indices.begin(), indices.end(), stream, mr);
}

} // namespace detail
} // namespace strings
} // namespace cudf
8 changes: 2 additions & 6 deletions cpp/tests/replace/replace_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ TEST_F(ReplaceStringsTest, Strings)
ASSERT_NO_THROW(result = cudf::find_and_replace_all(
input_wrapper, values_to_replace_wrapper, replacement_wrapper));
std::vector<std::string> expected{"z", "b", "c", "d", "e", "f", "g", "h"};
std::vector<cudf::valid_type> ex_valid{1, 1, 1, 1, 1, 1, 1, 1};
cudf::test::strings_column_wrapper expected_wrapper{
expected.begin(), expected.end(), ex_valid.begin()};
cudf::test::strings_column_wrapper expected_wrapper{expected.begin(), expected.end()};

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_wrapper);
}
Expand Down Expand Up @@ -160,7 +158,6 @@ TEST_F(ReplaceStringsTest, StringsResultAllEmpty)
std::vector<std::string> replacement{"a", ""};
std::vector<cudf::valid_type> replacement_valid{1, 1};
std::vector<std::string> expected{"", "", "", "", "", "", "", ""};
std::vector<cudf::valid_type> ex_valid{1, 1, 1, 1, 1, 1, 1, 1};
cudf::test::strings_column_wrapper input_wrapper{input.begin(), input.end()};
cudf::test::strings_column_wrapper values_to_replace_wrapper{values_to_replace.begin(),
values_to_replace.end()};
Expand All @@ -170,8 +167,7 @@ TEST_F(ReplaceStringsTest, StringsResultAllEmpty)
std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::find_and_replace_all(
input_wrapper, values_to_replace_wrapper, replacement_wrapper));
cudf::test::strings_column_wrapper expected_wrapper{
expected.begin(), expected.end(), ex_valid.begin()};
cudf::test::strings_column_wrapper expected_wrapper{expected.begin(), expected.end()};

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_wrapper);
}
Expand Down

0 comments on commit b29fc1d

Please sign in to comment.