Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into plc/io/parquet_writer_only
Browse files Browse the repository at this point in the history
  • Loading branch information
wence- authored Nov 19, 2024
2 parents 010c1da + 384abae commit da6b730
Show file tree
Hide file tree
Showing 61 changed files with 1,487 additions and 917 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand Down Expand Up @@ -153,8 +151,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand All @@ -164,8 +160,6 @@ jobs:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
Expand Down
19 changes: 19 additions & 0 deletions ci/run_cudf_polars_polars_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,27 @@ DESELECTED_TESTS=(
"tests/unit/streaming/test_streaming_sort.py::test_streaming_sort[True]" # relies on polars built in debug mode
"tests/unit/test_cpu_check.py::test_check_cpu_flags_skipped_no_flags" # Mock library error
"tests/docs/test_user_guide.py" # No dot binary in CI image
"tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14
)

if [[ $(arch) == "aarch64" ]]; then
# The binary used for TPC-H generation is compiled for x86_64, not aarch64.
DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh")
# The connectorx package is not available on arm
DESELECTED_TESTS+=("tests/unit/io/database/test_read.py::test_read_database")
# The necessary timezone information cannot be found in our CI image.
DESELECTED_TESTS+=("tests/unit/io/test_parquet.py::test_parametric_small_page_mask_filtering")
DESELECTED_TESTS+=("tests/unit/testing/test_assert_series_equal.py::test_assert_series_equal_parametric")
DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity")
else
# Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image.
glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2)
latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2)
if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then
DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh")
fi
fi

DESELECTED_TESTS=$(printf -- " --deselect %s" "${DESELECTED_TESTS[@]}")
python -m pytest \
--import-mode=importlib \
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.14
- polars>=1.11,<1.15
- pre-commit
- ptxcompiler
- pyarrow>=14.0.0,<19.0.0a0
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.14
- polars>=1.11,<1.15
- pre-commit
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf-polars/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ requirements:
run:
- python
- pylibcudf ={{ version }}
- polars >=1.11,<1.14
- polars >=1.11,<1.15
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}

test:
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ add_library(
src/io/avro/avro_gpu.cu
src/io/avro/reader_impl.cu
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
Expand Down
4 changes: 2 additions & 2 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ ConfigureNVBench(
# ##################################################################################################
# * strings benchmark -------------------------------------------------------------------
ConfigureBench(
STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/replace.cpp string/translate.cpp
string/url_decode.cu
STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/replace.cpp string/url_decode.cu
)

ConfigureNVBench(
Expand Down Expand Up @@ -386,6 +385,7 @@ ConfigureNVBench(
string/slice.cpp
string/split.cpp
string/split_re.cpp
string/translate.cpp
)

# ##################################################################################################
Expand Down
66 changes: 25 additions & 41 deletions cpp/benchmarks/string/translate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,32 @@
* limitations under the License.
*/

#include "string_bench_args.hpp"

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf_test/column_wrapper.hpp>

#include <cudf/strings/strings_column_view.hpp>
#include <cudf/strings/translate.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <nvbench/nvbench.cuh>

class StringTranslate : public cudf::benchmark {};
#include <algorithm>
#include <vector>

using entry_type = std::pair<cudf::char_utf8, cudf::char_utf8>;

static void BM_translate(benchmark::State& state, int entry_count)
static void bench_translate(nvbench::state& state)
{
cudf::size_type const n_rows{static_cast<cudf::size_type>(state.range(0))};
cudf::size_type const max_str_length{static_cast<cudf::size_type>(state.range(1))};
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const min_width = static_cast<cudf::size_type>(state.get_int64("min_width"));
auto const max_width = static_cast<cudf::size_type>(state.get_int64("max_width"));
auto const entry_count = static_cast<cudf::size_type>(state.get_int64("entries"));

data_profile const profile = data_profile_builder().distribution(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length);
auto const column = create_random_column(cudf::type_id::STRING, row_count{n_rows}, profile);
cudf::strings_column_view input(column->view());
cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width);
auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile);
auto const input = cudf::strings_column_view(column->view());

std::vector<entry_type> entries(entry_count);
std::transform(thrust::counting_iterator<int>(0),
Expand All @@ -51,33 +49,19 @@ static void BM_translate(benchmark::State& state, int entry_count)
return entry_type{'!' + idx, '~' - idx};
});

for (auto _ : state) {
cuda_event_timer raii(state, true, cudf::get_default_stream());
cudf::strings::translate(input, entries);
}
auto stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
auto chars_size = input.chars_size(stream);
state.add_global_memory_reads<nvbench::int8_t>(chars_size);
state.add_global_memory_writes<nvbench::int8_t>(chars_size);

state.SetBytesProcessed(state.iterations() * input.chars_size(cudf::get_default_stream()));
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { cudf::strings::translate(input, entries); });
}

static void generate_bench_args(benchmark::internal::Benchmark* b)
{
int const min_rows = 1 << 12;
int const max_rows = 1 << 24;
int const row_mult = 8;
int const min_rowlen = 1 << 5;
int const max_rowlen = 1 << 13;
int const len_mult = 4;
generate_string_bench_args(b, min_rows, max_rows, row_mult, min_rowlen, max_rowlen, len_mult);
}

#define STRINGS_BENCHMARK_DEFINE(name, entries) \
BENCHMARK_DEFINE_F(StringTranslate, name) \
(::benchmark::State & st) { BM_translate(st, entries); } \
BENCHMARK_REGISTER_F(StringTranslate, name) \
->Apply(generate_bench_args) \
->UseManualTime() \
->Unit(benchmark::kMillisecond);

STRINGS_BENCHMARK_DEFINE(translate_small, 5)
STRINGS_BENCHMARK_DEFINE(translate_medium, 25)
STRINGS_BENCHMARK_DEFINE(translate_large, 50)
NVBENCH_BENCH(bench_translate)
.set_name("translate")
.add_int64_axis("min_width", {0})
.add_int64_axis("max_width", {32, 64, 128, 256})
.add_int64_axis("num_rows", {32768, 262144, 2097152})
.add_int64_axis("entries", {5, 25, 50});
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ class json_writer_options_builder;
class json_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format of the sink
compression_type _compression = compression_type::NONE;
// maximum number of rows to write in each chunk (limits memory use)
size_type _rows_per_chunk = std::numeric_limits<size_type>::max();
// Set of columns to output
Expand Down Expand Up @@ -1022,6 +1024,13 @@ class json_writer_options {
*/
[[nodiscard]] std::string const& get_na_rep() const { return _na_rep; }

/**
* @brief Returns compression type used for sink
*
* @return compression type for sink
*/
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether to output nulls as 'null'.
*
Expand Down Expand Up @@ -1066,6 +1075,13 @@ class json_writer_options {
*/
void set_table(table_view tbl) { _table = tbl; }

/**
* @brief Sets compression type to be used
*
* @param comptype Compression type for sink
*/
void set_compression(compression_type comptype) { _compression = comptype; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1153,6 +1169,18 @@ class json_writer_options_builder {
return *this;
}

/**
* @brief Sets compression type of output sink
*
* @param comptype Compression type used
* @return this for chaining
*/
json_writer_options_builder& compression(compression_type comptype)
{
options._compression = comptype;
return *this;
}

/**
* @brief Sets optional metadata (with column names).
*
Expand Down
116 changes: 116 additions & 0 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "comp.hpp"

#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

namespace {

/**
* @brief GZIP host compressor (includes header)
*/
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst;
zs.avail_out = 0;
zs.next_out = nullptr;

int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

uint32_t estcomplen = deflateBound(&zs, src.size());
dst.resize(estcomplen);
zs.avail_out = estcomplen;
zs.next_out = dst.data();

ret = deflate(&zs, Z_FINISH);
CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!");
dst.resize(std::distance(dst.data(), zs.next_out));

ret = deflateEnd(&zs);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation");

return dst;
}

/**
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

auto dst_size = compress_max_output_chunk_size(nvcomp::compression_type::SNAPPY, src.size());
rmm::device_uvector<uint8_t> d_dst(dst_size, stream);
cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

} // namespace

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
switch (compression) {
case compression_type::GZIP: return compress_gzip(src);
case compression_type::SNAPPY: return compress_snappy(src, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

} // namespace cudf::io::detail
Loading

0 comments on commit da6b730

Please sign in to comment.