Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into strings-urldecode-nvbench
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Nov 20, 2024
2 parents a0a0f4d + 3111aa4 commit 76cefdd
Show file tree
Hide file tree
Showing 18 changed files with 1,100 additions and 460 deletions.
2 changes: 2 additions & 0 deletions ci/run_cudf_polars_polars_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ DESELECTED_TESTS=(
"tests/unit/test_cpu_check.py::test_check_cpu_flags_skipped_no_flags" # Mock library error
"tests/docs/test_user_guide.py" # No dot binary in CI image
"tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14
"tests/unit/operations/test_join.py::test_join_4_columns_with_validity" # fails in some systems, see https://github.com/pola-rs/polars/issues/19870
"tests/unit/io/test_csv.py::test_read_web_file" # fails in rockylinux8 due to SSL CA issues
)

if [[ $(arch) == "aarch64" ]]; then
Expand Down
56 changes: 46 additions & 10 deletions cpp/benchmarks/io/json/json_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@

#include <nvbench/nvbench.cuh>

// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to
// run on most GPUs, but large enough to allow highest throughput
constexpr size_t data_size = 512 << 20;
// Default size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks
// to run on most GPUs, but large enough to allow highest throughput
constexpr size_t default_data_size = 512 << 20;
constexpr cudf::size_type num_cols = 64;

void json_read_common(cuio_source_sink_pair& source_sink,
cudf::size_type num_rows_to_read,
nvbench::state& state)
nvbench::state& state,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE,
size_t data_size = default_data_size)
{
cudf::io::json_reader_options read_opts =
cudf::io::json_reader_options::builder(source_sink.make_source_info());
cudf::io::json_reader_options::builder(source_sink.make_source_info()).compression(comptype);

auto mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
Expand All @@ -57,15 +59,21 @@ void json_read_common(cuio_source_sink_pair& source_sink,
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

cudf::size_type json_write_bm_data(cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes)
cudf::size_type json_write_bm_data(
cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE,
size_t data_size = default_data_size)
{
auto const tbl = create_random_table(
cycle_dtypes(dtypes, num_cols), table_size_bytes{data_size}, data_profile_builder());
auto const view = tbl->view();

cudf::io::json_writer_options const write_opts =
cudf::io::json_writer_options::builder(sink, view).na_rep("null").rows_per_chunk(100'000);
cudf::io::json_writer_options::builder(sink, view)
.na_rep("null")
.rows_per_chunk(100'000)
.compression(comptype);
cudf::io::write_json(write_opts);
return view.num_rows();
}
Expand All @@ -87,6 +95,26 @@ void BM_json_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_typ
json_read_common(source_sink, num_rows, state);
}

template <cudf::io::compression_type comptype, io_type IO>
void BM_json_read_compressed_io(
nvbench::state& state, nvbench::type_list<nvbench::enum_type<comptype>, nvbench::enum_type<IO>>)
{
size_t const data_size = state.get_int64("data_size");
cuio_source_sink_pair source_sink(IO);
auto const d_type = get_type_or_group({static_cast<int32_t>(data_type::INTEGRAL),
static_cast<int32_t>(data_type::FLOAT),
static_cast<int32_t>(data_type::DECIMAL),
static_cast<int32_t>(data_type::TIMESTAMP),
static_cast<int32_t>(data_type::DURATION),
static_cast<int32_t>(data_type::STRING),
static_cast<int32_t>(data_type::LIST),
static_cast<int32_t>(data_type::STRUCT)});
auto const num_rows =
json_write_bm_data(source_sink.make_sink_info(), d_type, comptype, data_size);

json_read_common(source_sink, num_rows, state, comptype, data_size);
}

template <data_type DataType, io_type IO>
void BM_json_read_data_type(
nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>, nvbench::enum_type<IO>>)
Expand All @@ -110,8 +138,9 @@ using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
using io_list =
nvbench::enum_type_list<io_type::FILEPATH, io_type::HOST_BUFFER, io_type::DEVICE_BUFFER>;

using compression_list =
nvbench::enum_type_list<cudf::io::compression_type::SNAPPY, cudf::io::compression_type::NONE>;
using compression_list = nvbench::enum_type_list<cudf::io::compression_type::GZIP,
cudf::io::compression_type::SNAPPY,
cudf::io::compression_type::NONE>;

NVBENCH_BENCH_TYPES(BM_json_read_data_type,
NVBENCH_TYPE_AXES(d_type_list, nvbench::enum_type_list<io_type::DEVICE_BUFFER>))
Expand All @@ -123,3 +152,10 @@ NVBENCH_BENCH_TYPES(BM_json_read_io, NVBENCH_TYPE_AXES(io_list))
.set_name("json_read_io")
.set_type_axes_names({"io"})
.set_min_samples(4);

NVBENCH_BENCH_TYPES(BM_json_read_compressed_io,
NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list<io_type::FILEPATH>))
.set_name("json_read_compressed_io")
.set_type_axes_names({"compression_type", "io"})
.add_int64_power_of_two_axis("data_size", nvbench::range(20, 29, 1))
.set_min_samples(4);
1 change: 1 addition & 0 deletions cpp/benchmarks/io/nvbench_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
[](auto value) {
switch (value) {
case cudf::io::compression_type::SNAPPY: return "SNAPPY";
case cudf::io::compression_type::GZIP: return "GZIP";
case cudf::io::compression_type::NONE: return "NONE";
default: return "Unknown";
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void set_up_kvikio()
{
static std::once_flag flag{};
std::call_once(flag, [] {
auto const compat_mode = kvikio::detail::getenv_or<bool>("KVIKIO_COMPAT_MODE", true);
auto const compat_mode =
kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON);
kvikio::defaults::compat_mode_reset(compat_mode);

auto const nthreads = getenv_or<unsigned int>("KVIKIO_NTHREADS", 4u);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class file_sink : public data_sink {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode {}.",
_kvikio_file.is_compat_mode_on() ? "on" : "off");
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class file_source : public datasource {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath);
CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode {}.",
_kvikio_file.is_compat_mode_on() ? "on" : "off");
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_in = detail::make_cufile_input(filepath);
}
Expand Down
43 changes: 29 additions & 14 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
#include <cub/block/block_scan.cuh>
#include <cuco/static_set.cuh>
#include <thrust/fill.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/sequence.h>

#include <cstddef>
Expand Down Expand Up @@ -79,14 +81,9 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
* rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
Expand Down Expand Up @@ -176,15 +173,33 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), lhs_index_type>{d_probe_hasher});

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, {stream.value()});
auto found_indices = rmm::device_uvector<size_type>(probe_table_num_rows, stream);
auto const found_begin =
thrust::make_transform_output_iterator(found_indices.begin(), output_fn{});

// TODO conditional find for nulls once `cuco::static_set::find_if` is added
// If `idx` is within the range `[0, probe_table_num_rows)` and `found_indices[idx]` is not equal
// to `JoinNoneValue`, then `idx` has a match in the hash set.
this->_hash_table.find_async(iter, iter + probe_table_num_rows, found_begin, stream.value());

auto const tuple_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<thrust::tuple<size_type, size_type>>(
[found_iter = found_indices.begin()] __device__(size_type idx) {
return thrust::tuple{*(found_iter + idx), idx};
}));
auto const output_begin =
thrust::make_zip_iterator(build_indices->begin(), probe_indices->begin());
auto const output_end =
thrust::copy_if(rmm::exec_policy_nosync(stream),
tuple_iter,
tuple_iter + probe_table_num_rows,
found_indices.begin(),
output_begin,
cuda::proclaim_return_type<bool>(
[] __device__(size_type idx) { return idx != JoinNoneValue; }));
auto const actual_size = std::distance(output_begin, output_end);

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down
10 changes: 2 additions & 8 deletions python/cudf/cudf/_lib/transform.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from numba.np import numpy_support

import cudf
from cudf.core._internals.expressions import parse_expression
from cudf.core.buffer import acquire_spill_lock, as_buffer
from cudf.utils import cudautils

from pylibcudf cimport transform as plc_transform
from pylibcudf.expressions cimport Expression
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.column cimport Column
Expand Down Expand Up @@ -93,7 +91,7 @@ def one_hot_encode(Column input_column, Column categories):


@acquire_spill_lock()
def compute_column(list columns, tuple column_names, expr: str):
def compute_column(list columns, tuple column_names, str expr):
"""Compute a new column by evaluating an expression on a set of columns.
Parameters
Expand All @@ -108,12 +106,8 @@ def compute_column(list columns, tuple column_names, expr: str):
expr : str
The expression to evaluate.
"""
visitor = parse_expression(expr, column_names)

# At the end, all the stack contains is the expression to evaluate.
cdef Expression cudf_expr = visitor.expression
result = plc_transform.compute_column(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
cudf_expr,
plc.expressions.to_expression(expr, column_names),
)
return Column.from_pylibcudf(result)
Loading

0 comments on commit 76cefdd

Please sign in to comment.