Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into new-read-parquet-api
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk authored Nov 20, 2024
2 parents fa03396 + 6f83b58 commit 1b79270
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 324 deletions.
5 changes: 2 additions & 3 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@ ConfigureNVBench(

# ##################################################################################################
# * strings benchmark -------------------------------------------------------------------
ConfigureBench(
STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/replace.cpp string/url_decode.cu
)
ConfigureBench(STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/url_decode.cu)

ConfigureNVBench(
STRINGS_NVBENCH
Expand All @@ -380,6 +378,7 @@ ConfigureNVBench(
string/lengths.cpp
string/like.cpp
string/make_strings_column.cu
string/replace.cpp
string/replace_re.cpp
string/reverse.cpp
string/slice.cpp
Expand Down
87 changes: 38 additions & 49 deletions cpp/benchmarks/string/replace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
* limitations under the License.
*/

#include "string_bench_args.hpp"

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf_test/column_wrapper.hpp>

Expand All @@ -27,59 +24,51 @@
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <limits>

class StringReplace : public cudf::benchmark {};
#include <nvbench/nvbench.cuh>

enum replace_type { scalar, slice, multi };

static void BM_replace(benchmark::State& state, replace_type rt)
static void bench_replace(nvbench::state& state)
{
cudf::size_type const n_rows{static_cast<cudf::size_type>(state.range(0))};
cudf::size_type const max_str_length{static_cast<cudf::size_type>(state.range(1))};
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const min_width = static_cast<cudf::size_type>(state.get_int64("min_width"));
auto const max_width = static_cast<cudf::size_type>(state.get_int64("max_width"));
auto const api = state.get_string("api");

data_profile const profile = data_profile_builder().distribution(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length);
auto const column = create_random_column(cudf::type_id::STRING, row_count{n_rows}, profile);
cudf::strings_column_view input(column->view());
cudf::string_scalar target("+");
cudf::string_scalar repl("");
cudf::test::strings_column_wrapper targets({"+", "-"});
cudf::test::strings_column_wrapper repls({"", ""});
cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width);
auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile);

for (auto _ : state) {
cuda_event_timer raii(state, true, cudf::get_default_stream());
switch (rt) {
case scalar: cudf::strings::replace(input, target, repl); break;
case slice: cudf::strings::replace_slice(input, repl, 1, 10); break;
case multi:
cudf::strings::replace_multiple(
input, cudf::strings_column_view(targets), cudf::strings_column_view(repls));
break;
}
}
cudf::strings_column_view input(column->view());

state.SetBytesProcessed(state.iterations() * input.chars_size(cudf::get_default_stream()));
}
auto stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
auto const chars_size = input.chars_size(stream);
state.add_global_memory_reads<nvbench::int8_t>(chars_size);
state.add_global_memory_writes<nvbench::int8_t>(chars_size);

static void generate_bench_args(benchmark::internal::Benchmark* b)
{
int const min_rows = 1 << 12;
int const max_rows = 1 << 24;
int const row_mult = 8;
int const min_rowlen = 1 << 5;
int const max_rowlen = 1 << 13;
int const len_mult = 2;
generate_string_bench_args(b, min_rows, max_rows, row_mult, min_rowlen, max_rowlen, len_mult);
if (api == "scalar") {
cudf::string_scalar target("+");
cudf::string_scalar repl("-");
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { cudf::strings::replace(input, target, repl); });
} else if (api == "multi") {
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
cudf::test::strings_column_wrapper targets({"+", " "});
cudf::test::strings_column_wrapper repls({"-", "_"});
cudf::strings::replace_multiple(
input, cudf::strings_column_view(targets), cudf::strings_column_view(repls));
});
} else if (api == "slice") {
cudf::string_scalar repl("0123456789");
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { cudf::strings::replace_slice(input, repl, 1, 10); });
}
}

#define STRINGS_BENCHMARK_DEFINE(name) \
BENCHMARK_DEFINE_F(StringReplace, name) \
(::benchmark::State & st) { BM_replace(st, replace_type::name); } \
BENCHMARK_REGISTER_F(StringReplace, name) \
->Apply(generate_bench_args) \
->UseManualTime() \
->Unit(benchmark::kMillisecond);

STRINGS_BENCHMARK_DEFINE(scalar)
STRINGS_BENCHMARK_DEFINE(slice)
STRINGS_BENCHMARK_DEFINE(multi)
NVBENCH_BENCH(bench_replace)
.set_name("replace")
.add_int64_axis("min_width", {0})
.add_int64_axis("max_width", {32, 64, 128, 256})
.add_int64_axis("num_rows", {32768, 262144, 2097152})
.add_string_axis("api", {"scalar", "multi", "slice"});
19 changes: 7 additions & 12 deletions cpp/src/binaryop/compiled/binary_ops.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/unary.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -253,16 +253,11 @@ struct binary_op_double_device_dispatcher {
template <typename Functor>
CUDF_KERNEL void for_each_kernel(cudf::size_type size, Functor f)
{
int tid = threadIdx.x;
int blkid = blockIdx.x;
int blksz = blockDim.x;
int gridsz = gridDim.x;

int start = tid + blkid * blksz;
int step = blksz * gridsz;
auto start = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();

#pragma unroll
for (cudf::size_type i = start; i < size; i += step) {
for (auto i = start; i < size; i += stride) {
f(i);
}
}
Expand All @@ -282,9 +277,9 @@ void for_each(rmm::cuda_stream_view stream, cudf::size_type size, Functor f)
int min_grid_size;
CUDF_CUDA_TRY(
cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, for_each_kernel<decltype(f)>));
// 2 elements per thread.
int const grid_size = util::div_rounding_up_safe(size, 2 * block_size);
for_each_kernel<<<grid_size, block_size, 0, stream.value()>>>(size, std::forward<Functor&&>(f));
auto grid = cudf::detail::grid_1d(size, block_size, 2 /* elements_per_thread */);
for_each_kernel<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
size, std::forward<Functor&&>(f));
}

template <class BinaryOperator>
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void set_up_kvikio()
{
static std::once_flag flag{};
std::call_once(flag, [] {
auto const compat_mode = kvikio::detail::getenv_or<bool>("KVIKIO_COMPAT_MODE", true);
auto const compat_mode =
kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON);
kvikio::defaults::compat_mode_reset(compat_mode);

auto const nthreads = getenv_or<unsigned int>("KVIKIO_NTHREADS", 4u);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class file_sink : public data_sink {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode {}.",
_kvikio_file.is_compat_mode_on() ? "on" : "off");
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class file_source : public datasource {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath);
CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode {}.",
_kvikio_file.is_compat_mode_on() ? "on" : "off");
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_in = detail::make_cufile_input(filepath);
}
Expand Down
43 changes: 29 additions & 14 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
#include <cub/block/block_scan.cuh>
#include <cuco/static_set.cuh>
#include <thrust/fill.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/sequence.h>

#include <cstddef>
Expand Down Expand Up @@ -79,14 +81,9 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
* rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
Expand Down Expand Up @@ -176,15 +173,33 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), lhs_index_type>{d_probe_hasher});

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, {stream.value()});
auto found_indices = rmm::device_uvector<size_type>(probe_table_num_rows, stream);
auto const found_begin =
thrust::make_transform_output_iterator(found_indices.begin(), output_fn{});

// TODO conditional find for nulls once `cuco::static_set::find_if` is added
// If `idx` is within the range `[0, probe_table_num_rows)` and `found_indices[idx]` is not equal
// to `JoinNoneValue`, then `idx` has a match in the hash set.
this->_hash_table.find_async(iter, iter + probe_table_num_rows, found_begin, stream.value());

auto const tuple_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<thrust::tuple<size_type, size_type>>(
[found_iter = found_indices.begin()] __device__(size_type idx) {
return thrust::tuple{*(found_iter + idx), idx};
}));
auto const output_begin =
thrust::make_zip_iterator(build_indices->begin(), probe_indices->begin());
auto const output_end =
thrust::copy_if(rmm::exec_policy_nosync(stream),
tuple_iter,
tuple_iter + probe_table_num_rows,
found_indices.begin(),
output_begin,
cuda::proclaim_return_type<bool>(
[] __device__(size_type idx) { return idx != JoinNoneValue; }));
auto const actual_size = std::distance(output_begin, output_end);

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down
22 changes: 22 additions & 0 deletions cpp/tests/binaryop/binop-compiled-test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include <cudf_test/testing_main.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/aggregation.hpp>
#include <cudf/binaryop.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/fixed_point/fixed_point.hpp>
#include <cudf/reduction.hpp>
#include <cudf/types.hpp>

#include <thrust/iterator/counting_iterator.h>
Expand Down Expand Up @@ -820,4 +822,24 @@ TEST_F(BinaryOperationCompiledTest_NullOpsString, NullMin_Vector_Vector)
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected, result->view());
}

TEST(BinaryOperationCompiledTest, LargeColumnNoOverflow)
{
cudf::size_type num_rows{1'799'989'091};
auto big = cudf::make_column_from_scalar(
cudf::numeric_scalar<cudf::id_to_type<cudf::type_id::INT8>>{10, true}, num_rows);
auto small = cudf::make_column_from_scalar(
cudf::numeric_scalar<cudf::id_to_type<cudf::type_id::INT8>>{1, true}, num_rows);

auto mask = cudf::binary_operation(big->view(),
small->view(),
cudf::binary_operator::GREATER,
cudf::data_type{cudf::type_id::BOOL8});

auto agg = cudf::make_sum_aggregation<cudf::reduce_aggregation>();
auto result =
cudf::reduce(mask->view(), *agg, cudf::data_type{cudf::type_to_id<cudf::size_type>()});
auto got = static_cast<cudf::numeric_scalar<cudf::size_type>*>(result.get())->value();
EXPECT_EQ(num_rows, got);
}

CUDF_TEST_PROGRAM_MAIN()
10 changes: 2 additions & 8 deletions python/cudf/cudf/_lib/transform.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from numba.np import numpy_support

import cudf
from cudf.core._internals.expressions import parse_expression
from cudf.core.buffer import acquire_spill_lock, as_buffer
from cudf.utils import cudautils

from pylibcudf cimport transform as plc_transform
from pylibcudf.expressions cimport Expression
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.column cimport Column
Expand Down Expand Up @@ -93,7 +91,7 @@ def one_hot_encode(Column input_column, Column categories):


@acquire_spill_lock()
def compute_column(list columns, tuple column_names, expr: str):
def compute_column(list columns, tuple column_names, str expr):
"""Compute a new column by evaluating an expression on a set of columns.
Parameters
Expand All @@ -108,12 +106,8 @@ def compute_column(list columns, tuple column_names, expr: str):
expr : str
The expression to evaluate.
"""
visitor = parse_expression(expr, column_names)

# At the end, all the stack contains is the expression to evaluate.
cdef Expression cudf_expr = visitor.expression
result = plc_transform.compute_column(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
cudf_expr,
plc.expressions.to_expression(expr, column_names),
)
return Column.from_pylibcudf(result)
Loading

0 comments on commit 1b79270

Please sign in to comment.