Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.12' into cudf/_lib/p…
Browse files Browse the repository at this point in the history
…artitioning
  • Loading branch information
mroeschke committed Nov 19, 2024
2 parents 3f36a89 + 56061bd commit 2584830
Show file tree
Hide file tree
Showing 17 changed files with 291 additions and 196 deletions.
1 change: 1 addition & 0 deletions ci/run_cudf_polars_polars_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ DESELECTED_TESTS=(
"tests/unit/streaming/test_streaming_sort.py::test_streaming_sort[True]" # relies on polars built in debug mode
"tests/unit/test_cpu_check.py::test_check_cpu_flags_skipped_no_flags" # Mock library error
"tests/docs/test_user_guide.py" # No dot binary in CI image
"tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14
)

if [[ $(arch) == "aarch64" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.14
- polars>=1.11,<1.15
- pre-commit
- ptxcompiler
- pyarrow>=14.0.0,<19.0.0a0
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.14
- polars>=1.11,<1.15
- pre-commit
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf-polars/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ requirements:
run:
- python
- pylibcudf ={{ version }}
- polars >=1.11,<1.14
- polars >=1.11,<1.15
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}

test:
Expand Down
5 changes: 2 additions & 3 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@ ConfigureNVBench(

# ##################################################################################################
# * strings benchmark -------------------------------------------------------------------
ConfigureBench(
STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/replace.cpp string/url_decode.cu
)
ConfigureBench(STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/url_decode.cu)

ConfigureNVBench(
STRINGS_NVBENCH
Expand All @@ -380,6 +378,7 @@ ConfigureNVBench(
string/lengths.cpp
string/like.cpp
string/make_strings_column.cu
string/replace.cpp
string/replace_re.cpp
string/reverse.cpp
string/slice.cpp
Expand Down
87 changes: 38 additions & 49 deletions cpp/benchmarks/string/replace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
* limitations under the License.
*/

#include "string_bench_args.hpp"

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf_test/column_wrapper.hpp>

Expand All @@ -27,59 +24,51 @@
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <limits>

class StringReplace : public cudf::benchmark {};
#include <nvbench/nvbench.cuh>

enum replace_type { scalar, slice, multi };

static void BM_replace(benchmark::State& state, replace_type rt)
static void bench_replace(nvbench::state& state)
{
cudf::size_type const n_rows{static_cast<cudf::size_type>(state.range(0))};
cudf::size_type const max_str_length{static_cast<cudf::size_type>(state.range(1))};
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const min_width = static_cast<cudf::size_type>(state.get_int64("min_width"));
auto const max_width = static_cast<cudf::size_type>(state.get_int64("max_width"));
auto const api = state.get_string("api");

data_profile const profile = data_profile_builder().distribution(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length);
auto const column = create_random_column(cudf::type_id::STRING, row_count{n_rows}, profile);
cudf::strings_column_view input(column->view());
cudf::string_scalar target("+");
cudf::string_scalar repl("");
cudf::test::strings_column_wrapper targets({"+", "-"});
cudf::test::strings_column_wrapper repls({"", ""});
cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width);
auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile);

for (auto _ : state) {
cuda_event_timer raii(state, true, cudf::get_default_stream());
switch (rt) {
case scalar: cudf::strings::replace(input, target, repl); break;
case slice: cudf::strings::replace_slice(input, repl, 1, 10); break;
case multi:
cudf::strings::replace_multiple(
input, cudf::strings_column_view(targets), cudf::strings_column_view(repls));
break;
}
}
cudf::strings_column_view input(column->view());

state.SetBytesProcessed(state.iterations() * input.chars_size(cudf::get_default_stream()));
}
auto stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
auto const chars_size = input.chars_size(stream);
state.add_global_memory_reads<nvbench::int8_t>(chars_size);
state.add_global_memory_writes<nvbench::int8_t>(chars_size);

static void generate_bench_args(benchmark::internal::Benchmark* b)
{
int const min_rows = 1 << 12;
int const max_rows = 1 << 24;
int const row_mult = 8;
int const min_rowlen = 1 << 5;
int const max_rowlen = 1 << 13;
int const len_mult = 2;
generate_string_bench_args(b, min_rows, max_rows, row_mult, min_rowlen, max_rowlen, len_mult);
if (api == "scalar") {
cudf::string_scalar target("+");
cudf::string_scalar repl("-");
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { cudf::strings::replace(input, target, repl); });
} else if (api == "multi") {
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
cudf::test::strings_column_wrapper targets({"+", " "});
cudf::test::strings_column_wrapper repls({"-", "_"});
cudf::strings::replace_multiple(
input, cudf::strings_column_view(targets), cudf::strings_column_view(repls));
});
} else if (api == "slice") {
cudf::string_scalar repl("0123456789");
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { cudf::strings::replace_slice(input, repl, 1, 10); });
}
}

#define STRINGS_BENCHMARK_DEFINE(name) \
BENCHMARK_DEFINE_F(StringReplace, name) \
(::benchmark::State & st) { BM_replace(st, replace_type::name); } \
BENCHMARK_REGISTER_F(StringReplace, name) \
->Apply(generate_bench_args) \
->UseManualTime() \
->Unit(benchmark::kMillisecond);

STRINGS_BENCHMARK_DEFINE(scalar)
STRINGS_BENCHMARK_DEFINE(slice)
STRINGS_BENCHMARK_DEFINE(multi)
NVBENCH_BENCH(bench_replace)
.set_name("replace")
.add_int64_axis("min_width", {0})
.add_int64_axis("max_width", {32, 64, 128, 256})
.add_int64_axis("num_rows", {32768, 262144, 2097152})
.add_string_axis("api", {"scalar", "multi", "slice"});
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ class json_writer_options_builder;
class json_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format of the sink
compression_type _compression = compression_type::NONE;
// maximum number of rows to write in each chunk (limits memory use)
size_type _rows_per_chunk = std::numeric_limits<size_type>::max();
// Set of columns to output
Expand Down Expand Up @@ -1022,6 +1024,13 @@ class json_writer_options {
*/
[[nodiscard]] std::string const& get_na_rep() const { return _na_rep; }

/**
* @brief Returns compression type used for sink
*
* @return compression type for sink
*/
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether to output nulls as 'null'.
*
Expand Down Expand Up @@ -1066,6 +1075,13 @@ class json_writer_options {
*/
void set_table(table_view tbl) { _table = tbl; }

/**
* @brief Sets compression type to be used
*
* @param comptype Compression type for sink
*/
void set_compression(compression_type comptype) { _compression = comptype; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1153,6 +1169,18 @@ class json_writer_options_builder {
return *this;
}

/**
* @brief Sets compression type of output sink
*
* @param comptype Compression type used
* @return this for chaining
*/
json_writer_options_builder& compression(compression_type comptype)
{
options._compression = comptype;
return *this;
}

/**
* @brief Sets optional metadata (with column names).
*
Expand Down
19 changes: 7 additions & 12 deletions cpp/src/binaryop/compiled/binary_ops.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/unary.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -253,16 +253,11 @@ struct binary_op_double_device_dispatcher {
template <typename Functor>
CUDF_KERNEL void for_each_kernel(cudf::size_type size, Functor f)
{
int tid = threadIdx.x;
int blkid = blockIdx.x;
int blksz = blockDim.x;
int gridsz = gridDim.x;

int start = tid + blkid * blksz;
int step = blksz * gridsz;
auto start = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();

#pragma unroll
for (cudf::size_type i = start; i < size; i += step) {
for (auto i = start; i < size; i += stride) {
f(i);
}
}
Expand All @@ -282,9 +277,9 @@ void for_each(rmm::cuda_stream_view stream, cudf::size_type size, Functor f)
int min_grid_size;
CUDF_CUDA_TRY(
cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, for_each_kernel<decltype(f)>));
// 2 elements per thread.
int const grid_size = util::div_rounding_up_safe(size, 2 * block_size);
for_each_kernel<<<grid_size, block_size, 0, stream.value()>>>(size, std::forward<Functor&&>(f));
auto grid = cudf::detail::grid_1d(size, block_size, 2 /* elements_per_thread */);
for_each_kernel<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
size, std::forward<Functor&&>(f));
}

template <class BinaryOperator>
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress
#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

Expand Down Expand Up @@ -77,12 +77,12 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

auto dst_size = compress_max_output_chunk_size(nvcomp::compression_type::SNAPPY, src.size());
rmm::device_uvector<uint8_t> d_dst(dst_size, stream);
cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);
Expand All @@ -93,13 +93,10 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

} // namespace
Expand Down
29 changes: 25 additions & 4 deletions cpp/src/io/json/write_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @brief cuDF-IO JSON writer implementation
*/

#include "io/comp/comp.hpp"
#include "io/csv/durations.hpp"
#include "io/utilities/parsing_utils.cuh"
#include "lists/utilities.hpp"
Expand Down Expand Up @@ -828,10 +829,10 @@ void write_chunked(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
void write_json_uncompressed(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
std::vector<column_name_info> user_column_names = [&]() {
Expand Down Expand Up @@ -934,4 +935,24 @@ void write_json(data_sink* out_sink,
}
}

void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
if (options.get_compression() != compression_type::NONE) {
std::vector<char> hbuf;
auto hbuf_sink_ptr = data_sink::create(&hbuf);
write_json_uncompressed(hbuf_sink_ptr.get(), table, options, stream);
stream.synchronize();
auto comp_hbuf = cudf::io::detail::compress(
options.get_compression(),
host_span<uint8_t>(reinterpret_cast<uint8_t*>(hbuf.data()), hbuf.size()),
stream);
out_sink->host_write(comp_hbuf.data(), comp_hbuf.size());
return;
}
write_json_uncompressed(out_sink, table, options, stream);
}

} // namespace cudf::io::json::detail
Loading

0 comments on commit 2584830

Please sign in to comment.