diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 3bf9d02b384..6c5f4a68a4c 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -337,7 +337,7 @@ ConfigureBench(TEXT_BENCH text/ngrams.cpp text/subword.cpp) ConfigureNVBench( TEXT_NVBENCH text/edit_distance.cpp text/hash_ngrams.cpp text/jaccard.cpp text/minhash.cpp - text/normalize.cpp text/replace.cpp text/tokenize.cpp text/vocab.cpp + text/normalize.cpp text/replace.cpp text/tokenize.cpp text/vocab.cpp text/word_minhash.cpp ) # ################################################################################################## diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index 3abd4280081..7121cb9f034 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -50,7 +50,7 @@ std::string get_label(std::string const& test_name, nvbench::state const& state) } std::tuple, size_t, size_t> write_file_data( - nvbench::state& state, std::vector const& d_types) + nvbench::state& state, std::vector const& d_types, io_type io_source_type) { cudf::size_type const cardinality = state.get_int64("cardinality"); cudf::size_type const run_length = state.get_int64("run_length"); @@ -63,7 +63,7 @@ std::tuple, size_t, size_t> write_file_data( size_t total_file_size = 0; for (size_t i = 0; i < num_files; ++i) { - cuio_source_sink_pair source_sink{io_type::HOST_BUFFER}; + cuio_source_sink_pair source_sink{io_source_type}; auto const tbl = create_random_table( cycle_dtypes(d_types, num_cols), @@ -92,11 +92,13 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, { size_t const data_size = state.get_int64("total_data_size"); auto const num_threads = state.get_int64("num_threads"); + auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); BS::thread_pool threads(num_threads); - auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + auto [source_sink_vector, total_file_size, num_files] = + write_file_data(state, d_types, source_type); std::vector source_info_vector; std::transform(source_sink_vector.begin(), source_sink_vector.end(), @@ -173,10 +175,12 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, auto const num_threads = state.get_int64("num_threads"); size_t const input_limit = state.get_int64("input_limit"); size_t const output_limit = state.get_int64("output_limit"); + auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); BS::thread_pool threads(num_threads); - auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + auto [source_sink_vector, total_file_size, num_files] = + write_file_data(state, d_types, source_type); std::vector source_info_vector; std::transform(source_sink_vector.begin(), source_sink_vector.end(), @@ -264,7 +268,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) .add_int64_axis("num_cols", {4}) - .add_int64_axis("run_length", {8}); + .add_int64_axis("run_length", {8}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) .set_name("parquet_multithreaded_read_decode_fixed_width") @@ -273,7 +278,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) .add_int64_axis("num_cols", {4}) - .add_int64_axis("run_length", {8}); + .add_int64_axis("run_length", {8}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_string) .set_name("parquet_multithreaded_read_decode_string") @@ -282,7 +288,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_string) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) .add_int64_axis("num_cols", {4}) - .add_int64_axis("run_length", {8}); + .add_int64_axis("run_length", {8}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_list) .set_name("parquet_multithreaded_read_decode_list") @@ -291,7 +298,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_list) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) .add_int64_axis("num_cols", {4}) - .add_int64_axis("run_length", {8}); + .add_int64_axis("run_length", {8}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); // mixed data types: fixed width, strings NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) @@ -303,7 +311,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) - .add_int64_axis("output_limit", {640 * 1024 * 1024}); + .add_int64_axis("output_limit", {640 * 1024 * 1024}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) .set_name("parquet_multithreaded_read_decode_chunked_fixed_width") @@ -314,7 +323,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) - .add_int64_axis("output_limit", {640 * 1024 * 1024}); + .add_int64_axis("output_limit", {640 * 1024 * 1024}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) .set_name("parquet_multithreaded_read_decode_chunked_string") @@ -325,7 +335,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) - .add_int64_axis("output_limit", {640 * 1024 * 1024}); + .add_int64_axis("output_limit", {640 * 1024 * 1024}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) .set_name("parquet_multithreaded_read_decode_chunked_list") @@ -336,4 +347,5 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) - .add_int64_axis("output_limit", {640 * 1024 * 1024}); + .add_int64_axis("output_limit", {640 * 1024 * 1024}) + .add_string_axis("io_type", {"PINNED_BUFFER"}); diff --git a/cpp/benchmarks/text/word_minhash.cpp b/cpp/benchmarks/text/word_minhash.cpp new file mode 100644 index 00000000000..adc3dddc59c --- /dev/null +++ b/cpp/benchmarks/text/word_minhash.cpp @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +#include + +#include + +#include + +static void bench_word_minhash(nvbench::state& state) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const row_width = static_cast(state.get_int64("row_width")); + auto const seed_count = static_cast(state.get_int64("seed_count")); + auto const base64 = state.get_int64("hash_type") == 64; + + data_profile const strings_profile = + data_profile_builder().distribution(cudf::type_id::STRING, distribution_id::NORMAL, 0, 5); + auto strings_table = + create_random_table({cudf::type_id::STRING}, row_count{num_rows}, strings_profile); + + auto const num_offsets = (num_rows / row_width) + 1; + auto offsets = cudf::sequence(num_offsets, + cudf::numeric_scalar(0), + cudf::numeric_scalar(row_width)); + + auto source = cudf::make_lists_column(num_offsets - 1, + std::move(offsets), + std::move(strings_table->release().front()), + 0, + rmm::device_buffer{}); + + data_profile const seeds_profile = data_profile_builder().no_validity().distribution( + cudf::type_to_id(), distribution_id::NORMAL, 0, 256); + auto const seed_type = base64 ? cudf::type_id::UINT64 : cudf::type_id::UINT32; + auto const seeds_table = create_random_table({seed_type}, row_count{seed_count}, seeds_profile); + auto seeds = seeds_table->get_column(0); + + state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value())); + + cudf::strings_column_view input(cudf::lists_column_view(source->view()).child()); + auto chars_size = input.chars_size(cudf::get_default_stream()); + state.add_global_memory_reads(chars_size); + state.add_global_memory_writes(num_rows); // output are hashes + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + auto result = base64 ? nvtext::word_minhash64(source->view(), seeds.view()) + : nvtext::word_minhash(source->view(), seeds.view()); + }); +} + +NVBENCH_BENCH(bench_word_minhash) + .set_name("word_minhash") + .add_int64_axis("num_rows", {131072, 262144, 524288, 1048576, 2097152}) + .add_int64_axis("row_width", {10, 100, 1000}) + .add_int64_axis("seed_count", {2, 25}) + .add_int64_axis("hash_type", {32, 64}); diff --git a/cpp/doxygen/regex.md b/cpp/doxygen/regex.md index 8d206f245dc..6d1c91a5752 100644 --- a/cpp/doxygen/regex.md +++ b/cpp/doxygen/regex.md @@ -17,6 +17,12 @@ The details are based on features documented at https://www.regular-expressions. **Note:** The alternation character is the pipe character `|` and not the character included in the tables on this page. There is an issue including the pipe character inside the table markdown that is rendered by doxygen. +By default, only the `\n` character is recognized as a line break. The [cudf::strings::regex_flags::EXT_NEWLINE](@ref cudf::strings::regex_flags) increases the set of line break characters to include: +- Paragraph separator (Unicode: `2029`, UTF-8: `E280A9`) +- Line separator (Unicode: `2028`, UTF-8: `E280A8`) +- Next line (Unicode: `0085`, UTF-8: `C285`) +- Carriage return (Unicode: `000D`, UTF-8: `0D`) + **Invalid regex patterns will result in undefined behavior**. This includes but is not limited to the following: - Unescaped special characters (listed in the third row of the Characters table below) when they are intended to match as literals. - Unmatched paired special characters like `()`, `[]`, and `{}`. diff --git a/cpp/include/cudf/strings/regex/flags.hpp b/cpp/include/cudf/strings/regex/flags.hpp index f7108129dee..4f3fc7086f2 100644 --- a/cpp/include/cudf/strings/regex/flags.hpp +++ b/cpp/include/cudf/strings/regex/flags.hpp @@ -35,10 +35,11 @@ namespace strings { * and to match the Python flag values. */ enum regex_flags : uint32_t { - DEFAULT = 0, ///< default - MULTILINE = 8, ///< the '^' and '$' honor new-line characters - DOTALL = 16, ///< the '.' matching includes new-line characters - ASCII = 256 ///< use only ASCII when matching built-in character classes + DEFAULT = 0, ///< default + MULTILINE = 8, ///< the '^' and '$' honor new-line characters + DOTALL = 16, ///< the '.' matching includes new-line characters + ASCII = 256, ///< use only ASCII when matching built-in character classes + EXT_NEWLINE = 512 ///< new-line matches extended characters }; /** @@ -74,6 +75,17 @@ constexpr bool is_ascii(regex_flags const f) return (f & regex_flags::ASCII) == regex_flags::ASCII; } +/** + * @brief Returns true if the given flags contain EXT_NEWLINE + * + * @param f Regex flags to check + * @return true if `f` includes EXT_NEWLINE + */ +constexpr bool is_ext_newline(regex_flags const f) +{ + return (f & regex_flags::EXT_NEWLINE) == regex_flags::EXT_NEWLINE; +} + /** * @brief Capture groups setting * diff --git a/cpp/include/cudf/strings/string_view.cuh b/cpp/include/cudf/strings/string_view.cuh index abb26d7ccb4..14695c3bb27 100644 --- a/cpp/include/cudf/strings/string_view.cuh +++ b/cpp/include/cudf/strings/string_view.cuh @@ -191,9 +191,14 @@ __device__ inline string_view::const_iterator& string_view::const_iterator::oper __device__ inline string_view::const_iterator& string_view::const_iterator::operator--() { - if (byte_pos > 0) - while (strings::detail::bytes_in_utf8_byte(static_cast(p[--byte_pos])) == 0) - ; + if (byte_pos > 0) { + if (byte_pos == char_pos) { + --byte_pos; + } else { + while (strings::detail::bytes_in_utf8_byte(static_cast(p[--byte_pos])) == 0) + ; + } + } --char_pos; return *this; } diff --git a/cpp/include/nvtext/minhash.hpp b/cpp/include/nvtext/minhash.hpp index c83a4260c19..7c909f1a948 100644 --- a/cpp/include/nvtext/minhash.hpp +++ b/cpp/include/nvtext/minhash.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -72,7 +73,7 @@ std::unique_ptr minhash( * * @throw std::invalid_argument if the width < 2 * @throw std::invalid_argument if seeds is empty - * @throw std::overflow_error if `seeds * input.size()` exceeds the column size limit + * @throw std::overflow_error if `seeds.size() * input.size()` exceeds the column size limit * * @param input Strings column to compute minhash * @param seeds Seed values used for the hash algorithm @@ -133,7 +134,7 @@ std::unique_ptr minhash64( * * @throw std::invalid_argument if the width < 2 * @throw std::invalid_argument if seeds is empty - * @throw std::overflow_error if `seeds * input.size()` exceeds the column size limit + * @throw std::overflow_error if `seeds.size() * input.size()` exceeds the column size limit * * @param input Strings column to compute minhash * @param seeds Seed values used for the hash algorithm @@ -150,5 +151,61 @@ std::unique_ptr minhash64( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); +/** + * @brief Returns the minhash values for each row of strings per seed + * + * Hash values are computed from each string in each row and the + * minimum hash value is returned for each row for each seed. + * Each row of the output list column are seed results for the corresponding + * input row. The order of the elements in each row match the order of + * the seeds provided in the `seeds` parameter. + * + * This function uses MurmurHash3_x86_32 for the hash algorithm. + * + * Any null row entries result in corresponding null output rows. + * + * @throw std::invalid_argument if seeds is empty + * @throw std::overflow_error if `seeds.size() * input.size()` exceeds the column size limit + * + * @param input Lists column of strings to compute minhash + * @param seeds Seed values used for the hash algorithm + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * @return List column of minhash values for each string per seed + */ +std::unique_ptr word_minhash( + cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +/** + * @brief Returns the minhash values for each row of strings per seed + * + * Hash values are computed from each string in each row and the + * minimum hash value is returned for each row for each seed. + * Each row of the output list column are seed results for the corresponding + * input row. The order of the elements in each row match the order of + * the seeds provided in the `seeds` parameter. + * + * This function uses MurmurHash3_x64_128 for the hash algorithm though + * only the first 64-bits of the hash are used in computing the output. + * + * Any null row entries result in corresponding null output rows. + * + * @throw std::invalid_argument if seeds is empty + * @throw std::overflow_error if `seeds.size() * input.size()` exceeds the column size limit + * + * @param input Lists column of strings to compute minhash + * @param seeds Seed values used for the hash algorithm + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * @return List column of minhash values for each string per seed + */ +std::unique_ptr word_minhash64( + cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @} */ // end of group } // namespace CUDF_EXPORT nvtext diff --git a/cpp/src/strings/regex/regcomp.cpp b/cpp/src/strings/regex/regcomp.cpp index adf650a4f27..7c4c89bd3fb 100644 --- a/cpp/src/strings/regex/regcomp.cpp +++ b/cpp/src/strings/regex/regcomp.cpp @@ -539,15 +539,26 @@ class regex_parser { : static_cast(LBRA); case ')': return RBRA; case '^': { - _chr = is_multiline(_flags) ? chr : '\n'; + if (is_ext_newline(_flags)) { + _chr = is_multiline(_flags) ? 'S' : 'N'; + } else { + _chr = is_multiline(_flags) ? chr : '\n'; + } return BOL; } case '$': { - _chr = is_multiline(_flags) ? chr : '\n'; + if (is_ext_newline(_flags)) { + _chr = is_multiline(_flags) ? 'S' : 'N'; + } else { + _chr = is_multiline(_flags) ? chr : '\n'; + } return EOL; } case '[': return build_cclass(); - case '.': return dot_type; + case '.': { + _chr = is_ext_newline(_flags) ? 'N' : chr; + return dot_type; + } } if (std::find(quantifiers.begin(), quantifiers.end(), static_cast(chr)) == @@ -959,7 +970,7 @@ class regex_compiler { _prog.inst_at(inst_id).u1.cls_id = class_id; } else if (token == CHAR) { _prog.inst_at(inst_id).u1.c = yy; - } else if (token == BOL || token == EOL) { + } else if (token == BOL || token == EOL || token == ANY) { _prog.inst_at(inst_id).u1.c = yy; } push_and(inst_id, inst_id); @@ -1194,7 +1205,7 @@ void reprog::print(regex_flags const flags) case STAR: printf(" STAR next=%d", inst.u2.next_id); break; case PLUS: printf(" PLUS next=%d", inst.u2.next_id); break; case QUEST: printf(" QUEST next=%d", inst.u2.next_id); break; - case ANY: printf(" ANY next=%d", inst.u2.next_id); break; + case ANY: printf(" ANY '%c', next=%d", inst.u1.c, inst.u2.next_id); break; case ANYNL: printf(" ANYNL next=%d", inst.u2.next_id); break; case NOP: printf(" NOP next=%d", inst.u2.next_id); break; case BOL: { diff --git a/cpp/src/strings/regex/regex.inl b/cpp/src/strings/regex/regex.inl index 3b899e4edc1..e34a1e12015 100644 --- a/cpp/src/strings/regex/regex.inl +++ b/cpp/src/strings/regex/regex.inl @@ -126,6 +126,16 @@ __device__ __forceinline__ void reprog_device::reljunk::swaplist() list2 = tmp; } +/** + * @brief Check for supported new-line characters + * + * '\n, \r, \u0085, \u2028, or \u2029' + */ +constexpr bool is_newline(char32_t const ch) +{ + return (ch == '\n' || ch == '\r' || ch == 0x00c285 || ch == 0x00e280a8 || ch == 0x00e280a9); +} + /** * @brief Utility to check a specific character against this class instance. * @@ -258,11 +268,14 @@ __device__ __forceinline__ match_result reprog_device::regexec(string_view const if (checkstart) { auto startchar = static_cast(jnk.startchar); switch (jnk.starttype) { - case BOL: - if (pos == 0) break; - if (jnk.startchar != '^') { return cuda::std::nullopt; } + case BOL: { + if (pos == 0) { break; } + if (startchar != '^' && startchar != 'S') { return cuda::std::nullopt; } + if (startchar != '\n') { break; } --itr; startchar = static_cast('\n'); + [[fallthrough]]; + } case CHAR: { auto const find_itr = find_char(startchar, dstr, itr); if (find_itr.byte_offset() >= dstr.size_bytes()) { return cuda::std::nullopt; } @@ -312,26 +325,34 @@ __device__ __forceinline__ match_result reprog_device::regexec(string_view const id_activate = inst.u2.next_id; expanded = true; break; - case BOL: - if ((pos == 0) || ((inst.u1.c == '^') && (dstr[pos - 1] == '\n'))) { + case BOL: { + auto titr = itr; + auto const prev_c = pos > 0 ? *(--titr) : 0; + if ((pos == 0) || ((inst.u1.c == '^') && (prev_c == '\n')) || + ((inst.u1.c == 'S') && (is_newline(prev_c)))) { id_activate = inst.u2.next_id; expanded = true; } break; - case EOL: + } + case EOL: { // after the last character OR: // - for MULTILINE, if current character is new-line // - for non-MULTILINE, the very last character of the string can also be a new-line + bool const nl = (inst.u1.c == 'S' || inst.u1.c == 'N') ? is_newline(c) : (c == '\n'); if (last_character || - ((c == '\n') && (inst.u1.c != 'Z') && - ((inst.u1.c == '$') || (itr.byte_offset() + 1 == dstr.size_bytes())))) { + (nl && (inst.u1.c != 'Z') && + ((inst.u1.c == '$' || inst.u1.c == 'S') || + (itr.byte_offset() + bytes_in_char_utf8(c) == dstr.size_bytes())))) { id_activate = inst.u2.next_id; expanded = true; } break; + } case BOW: case NBOW: { - auto const prev_c = pos > 0 ? dstr[pos - 1] : 0; + auto titr = itr; + auto const prev_c = pos > 0 ? *(--titr) : 0; auto const word_class = reclass_device{CCLASS_W}; bool const curr_is_word = word_class.is_match(c, _codepoint_flags); bool const prev_is_word = word_class.is_match(prev_c, _codepoint_flags); @@ -366,9 +387,10 @@ __device__ __forceinline__ match_result reprog_device::regexec(string_view const case CHAR: if (inst.u1.c == c) id_activate = inst.u2.next_id; break; - case ANY: - if (c != '\n') id_activate = inst.u2.next_id; - break; + case ANY: { + if ((c == '\n') || ((inst.u1.c == 'N') && is_newline(c))) { break; } + [[fallthrough]]; + } case ANYNL: id_activate = inst.u2.next_id; break; case NCCLASS: case CCLASS: { diff --git a/cpp/src/text/minhash.cu b/cpp/src/text/minhash.cu index 605582f28a6..a03a34f5fa7 100644 --- a/cpp/src/text/minhash.cu +++ b/cpp/src/text/minhash.cu @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include #include #include @@ -151,15 +153,111 @@ std::unique_ptr minhash_fn(cudf::strings_column_view const& input, mr); auto d_hashes = hashes->mutable_view().data(); - constexpr int block_size = 256; - cudf::detail::grid_1d grid{input.size() * cudf::detail::warp_size, block_size}; + constexpr cudf::thread_index_type block_size = 256; + cudf::detail::grid_1d grid{ + static_cast(input.size()) * cudf::detail::warp_size, block_size}; minhash_kernel<<>>( *d_strings, seeds, width, d_hashes); return hashes; } -std::unique_ptr build_list_result(cudf::strings_column_view const& input, +/** + * @brief Compute the minhash of each list row of strings for each seed + * + * This is a warp-per-row algorithm where parallel threads within a warp + * work on strings in a single list row. + * + * @tparam HashFunction hash function to use on each string + * + * @param d_input List of strings to process + * @param seeds Seeds for hashing each string + * @param d_hashes Minhash output values (one per row) + */ +template < + typename HashFunction, + typename hash_value_type = std:: + conditional_t, uint32_t, uint64_t>> +CUDF_KERNEL void minhash_word_kernel(cudf::detail::lists_column_device_view const d_input, + cudf::device_span seeds, + hash_value_type* d_hashes) +{ + auto const idx = cudf::detail::grid_1d::global_thread_id(); + auto const row_idx = idx / cudf::detail::warp_size; + + if (row_idx >= d_input.size()) { return; } + if (d_input.is_null(row_idx)) { return; } + + auto const d_row = cudf::list_device_view(d_input, row_idx); + auto const d_output = d_hashes + (row_idx * seeds.size()); + + // initialize hashes output for this row + auto const lane_idx = static_cast(idx % cudf::detail::warp_size); + if (lane_idx == 0) { + auto const init = d_row.size() == 0 ? 0 : std::numeric_limits::max(); + thrust::fill(thrust::seq, d_output, d_output + seeds.size(), init); + } + __syncwarp(); + + // each lane hashes a string from the input row + for (auto str_idx = lane_idx; str_idx < d_row.size(); str_idx += cudf::detail::warp_size) { + auto const hash_str = + d_row.is_null(str_idx) ? cudf::string_view{} : d_row.element(str_idx); + for (std::size_t seed_idx = 0; seed_idx < seeds.size(); ++seed_idx) { + auto const hasher = HashFunction(seeds[seed_idx]); + // hash string and store the min value + hash_value_type hv; + if constexpr (std::is_same_v) { + hv = hasher(hash_str); + } else { + // This code path assumes the use of MurmurHash3_x64_128 which produces 2 uint64 values + // but only uses the first uint64 value as requested by the LLM team. + hv = thrust::get<0>(hasher(hash_str)); + } + cuda::atomic_ref ref{*(d_output + seed_idx)}; + ref.fetch_min(hv, cuda::std::memory_order_relaxed); + } + } +} + +template < + typename HashFunction, + typename hash_value_type = std:: + conditional_t, uint32_t, uint64_t>> +std::unique_ptr word_minhash_fn(cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_EXPECTS(!seeds.empty(), "Parameter seeds cannot be empty", std::invalid_argument); + CUDF_EXPECTS((static_cast(input.size()) * seeds.size()) < + static_cast(std::numeric_limits::max()), + "The number of seeds times the number of input rows exceeds the column size limit", + std::overflow_error); + + auto const output_type = cudf::data_type{cudf::type_to_id()}; + if (input.is_empty()) { return cudf::make_empty_column(output_type); } + + auto const d_input = cudf::column_device_view::create(input.parent(), stream); + + auto hashes = cudf::make_numeric_column(output_type, + input.size() * static_cast(seeds.size()), + cudf::mask_state::UNALLOCATED, + stream, + mr); + auto d_hashes = hashes->mutable_view().data(); + auto lcdv = cudf::detail::lists_column_device_view(*d_input); + + constexpr cudf::thread_index_type block_size = 256; + cudf::detail::grid_1d grid{ + static_cast(input.size()) * cudf::detail::warp_size, block_size}; + minhash_word_kernel + <<>>(lcdv, seeds, d_hashes); + + return hashes; +} + +std::unique_ptr build_list_result(cudf::column_view const& input, std::unique_ptr&& hashes, cudf::size_type seeds_size, rmm::cuda_stream_view stream, @@ -176,7 +274,7 @@ std::unique_ptr build_list_result(cudf::strings_column_view const& std::move(offsets), std::move(hashes), input.null_count(), - cudf::detail::copy_bitmask(input.parent(), stream, mr), + cudf::detail::copy_bitmask(input, stream, mr), stream, mr); // expect this condition to be very rare @@ -208,7 +306,7 @@ std::unique_ptr minhash(cudf::strings_column_view const& input, { using HashFunction = cudf::hashing::detail::MurmurHash3_x86_32; auto hashes = detail::minhash_fn(input, seeds, width, stream, mr); - return build_list_result(input, std::move(hashes), seeds.size(), stream, mr); + return build_list_result(input.parent(), std::move(hashes), seeds.size(), stream, mr); } std::unique_ptr minhash64(cudf::strings_column_view const& input, @@ -232,7 +330,27 @@ std::unique_ptr minhash64(cudf::strings_column_view const& input, { using HashFunction = cudf::hashing::detail::MurmurHash3_x64_128; auto hashes = detail::minhash_fn(input, seeds, width, stream, mr); - return build_list_result(input, std::move(hashes), seeds.size(), stream, mr); + return build_list_result(input.parent(), std::move(hashes), seeds.size(), stream, mr); +} + +std::unique_ptr word_minhash(cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + using HashFunction = cudf::hashing::detail::MurmurHash3_x86_32; + auto hashes = detail::word_minhash_fn(input, seeds, stream, mr); + return build_list_result(input.parent(), std::move(hashes), seeds.size(), stream, mr); +} + +std::unique_ptr word_minhash64(cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + using HashFunction = cudf::hashing::detail::MurmurHash3_x64_128; + auto hashes = detail::word_minhash_fn(input, seeds, stream, mr); + return build_list_result(input.parent(), std::move(hashes), seeds.size(), stream, mr); } } // namespace detail @@ -276,4 +394,21 @@ std::unique_ptr minhash64(cudf::strings_column_view const& input, return detail::minhash64(input, seeds, width, stream, mr); } +std::unique_ptr word_minhash(cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::word_minhash(input, seeds, stream, mr); +} + +std::unique_ptr word_minhash64(cudf::lists_column_view const& input, + cudf::device_span seeds, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::word_minhash64(input, seeds, stream, mr); +} } // namespace nvtext diff --git a/cpp/tests/strings/contains_tests.cpp b/cpp/tests/strings/contains_tests.cpp index c816316d0ff..acf850c7a66 100644 --- a/cpp/tests/strings/contains_tests.cpp +++ b/cpp/tests/strings/contains_tests.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "special_chars.h" + #include #include #include @@ -613,6 +615,63 @@ TEST_F(StringsContainsTests, MultiLine) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected_count); } +TEST_F(StringsContainsTests, SpecialNewLines) +{ + auto input = cudf::test::strings_column_wrapper({"zzé" LINE_SEPARATOR "qqq" NEXT_LINE "zzé", + "qqq\rzzé" LINE_SEPARATOR "lll", + "zzé", + "", + "zzé" PARAGRAPH_SEPARATOR, + "abc\nzzé" NEXT_LINE}); + auto view = cudf::strings_column_view(input); + + auto pattern = std::string("^zzé$"); + auto prog = + cudf::strings::regex_program::create(pattern, cudf::strings::regex_flags::EXT_NEWLINE); + auto ml_flags = static_cast(cudf::strings::regex_flags::EXT_NEWLINE | + cudf::strings::regex_flags::MULTILINE); + auto prog_ml = cudf::strings::regex_program::create(pattern, ml_flags); + + auto expected = cudf::test::fixed_width_column_wrapper({0, 0, 1, 0, 1, 0}); + auto results = cudf::strings::contains_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + expected = cudf::test::fixed_width_column_wrapper({1, 1, 1, 0, 1, 1}); + results = cudf::strings::contains_re(view, *prog_ml); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + + expected = cudf::test::fixed_width_column_wrapper({0, 0, 1, 0, 1, 0}); + results = cudf::strings::matches_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + expected = cudf::test::fixed_width_column_wrapper({1, 0, 1, 0, 1, 0}); + results = cudf::strings::matches_re(view, *prog_ml); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + + auto counts = cudf::test::fixed_width_column_wrapper({0, 0, 1, 0, 1, 0}); + results = cudf::strings::count_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, counts); + counts = cudf::test::fixed_width_column_wrapper({2, 1, 1, 0, 1, 1}); + results = cudf::strings::count_re(view, *prog_ml); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, counts); + + pattern = std::string("q.*l"); + prog = cudf::strings::regex_program::create(pattern); + expected = cudf::test::fixed_width_column_wrapper({0, 1, 0, 0, 0, 0}); + results = cudf::strings::contains_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + // inst ANY will stop matching on first 'newline' and so should not match anything here + prog = cudf::strings::regex_program::create(pattern, cudf::strings::regex_flags::EXT_NEWLINE); + expected = cudf::test::fixed_width_column_wrapper({0, 0, 0, 0, 0, 0}); + results = cudf::strings::contains_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + // including the DOTALL flag accepts the newline characters + auto dot_flags = static_cast(cudf::strings::regex_flags::EXT_NEWLINE | + cudf::strings::regex_flags::DOTALL); + prog = cudf::strings::regex_program::create(pattern, dot_flags); + expected = cudf::test::fixed_width_column_wrapper({0, 1, 0, 0, 0, 0}); + results = cudf::strings::contains_re(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); +} + TEST_F(StringsContainsTests, EndOfString) { auto input = cudf::test::strings_column_wrapper( diff --git a/cpp/tests/strings/extract_tests.cpp b/cpp/tests/strings/extract_tests.cpp index b26cbd5a549..1491da758d5 100644 --- a/cpp/tests/strings/extract_tests.cpp +++ b/cpp/tests/strings/extract_tests.cpp @@ -14,9 +14,12 @@ * limitations under the License. */ +#include "special_chars.h" + #include #include #include +#include #include #include @@ -200,6 +203,43 @@ TEST_F(StringsExtractTests, DotAll) CUDF_TEST_EXPECT_TABLES_EQUAL(*results, expected); } +TEST_F(StringsExtractTests, SpecialNewLines) +{ + auto input = cudf::test::strings_column_wrapper({"zzé" NEXT_LINE "qqq" LINE_SEPARATOR "zzé", + "qqq" LINE_SEPARATOR "zzé\rlll", + "zzé", + "", + "zzé" NEXT_LINE, + "abc" PARAGRAPH_SEPARATOR "zzé\n"}); + auto view = cudf::strings_column_view(input); + + auto prog = + cudf::strings::regex_program::create("(^zzé$)", cudf::strings::regex_flags::EXT_NEWLINE); + auto results = cudf::strings::extract(view, *prog); + auto expected = + cudf::test::strings_column_wrapper({"", "", "zzé", "", "zzé", ""}, {0, 0, 1, 0, 1, 0}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view().column(0), expected); + + auto both_flags = static_cast( + cudf::strings::regex_flags::EXT_NEWLINE | cudf::strings::regex_flags::MULTILINE); + auto prog_ml = cudf::strings::regex_program::create("^(zzé)$", both_flags); + results = cudf::strings::extract(view, *prog_ml); + expected = + cudf::test::strings_column_wrapper({"zzé", "zzé", "zzé", "", "zzé", "zzé"}, {1, 1, 1, 0, 1, 1}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view().column(0), expected); + + prog = cudf::strings::regex_program::create("q(q.*l)l"); + expected = cudf::test::strings_column_wrapper({"", "qq" LINE_SEPARATOR "zzé\rll", "", "", "", ""}, + {0, 1, 0, 0, 0, 0}); + results = cudf::strings::extract(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view().column(0), expected); + // expect no matches here since the newline(s) interrupts the pattern + prog = cudf::strings::regex_program::create("q(q.*l)l", cudf::strings::regex_flags::EXT_NEWLINE); + expected = cudf::test::strings_column_wrapper({"", "", "", "", "", ""}, {0, 0, 0, 0, 0, 0}); + results = cudf::strings::extract(view, *prog); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view().column(0), expected); +} + TEST_F(StringsExtractTests, EmptyExtractTest) { std::vector h_strings{nullptr, "AAA", "AAA_A", "AAA_AAA_", "A__", ""}; diff --git a/cpp/tests/strings/findall_tests.cpp b/cpp/tests/strings/findall_tests.cpp index 4582dcb1e38..47606b9b3ed 100644 --- a/cpp/tests/strings/findall_tests.cpp +++ b/cpp/tests/strings/findall_tests.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "special_chars.h" + #include #include #include @@ -80,6 +82,32 @@ TEST_F(StringsFindallTests, DotAll) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(results->view(), expected); } +TEST_F(StringsFindallTests, SpecialNewLines) +{ + auto input = cudf::test::strings_column_wrapper({"zzé" PARAGRAPH_SEPARATOR "qqq\nzzé", + "qqq\nzzé" PARAGRAPH_SEPARATOR "lll", + "zzé", + "", + "zzé\r", + "zzé" LINE_SEPARATOR "zzé" NEXT_LINE}); + auto view = cudf::strings_column_view(input); + + auto prog = + cudf::strings::regex_program::create("(^zzé$)", cudf::strings::regex_flags::EXT_NEWLINE); + auto results = cudf::strings::findall(view, *prog); + using LCW = cudf::test::lists_column_wrapper; + LCW expected({LCW{}, LCW{}, LCW{"zzé"}, LCW{}, LCW{"zzé"}, LCW{}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); + + auto both_flags = static_cast( + cudf::strings::regex_flags::EXT_NEWLINE | cudf::strings::regex_flags::MULTILINE); + auto prog_ml = cudf::strings::regex_program::create("^(zzé)$", both_flags); + results = cudf::strings::findall(view, *prog_ml); + LCW expected_ml( + {LCW{"zzé", "zzé"}, LCW{"zzé"}, LCW{"zzé"}, LCW{}, LCW{"zzé"}, LCW{"zzé", "zzé"}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected_ml); +} + TEST_F(StringsFindallTests, MediumRegex) { // This results in 15 regex instructions and falls in the 'medium' range. diff --git a/cpp/tests/strings/replace_regex_tests.cpp b/cpp/tests/strings/replace_regex_tests.cpp index 8c0482653fb..9847d8d6bb5 100644 --- a/cpp/tests/strings/replace_regex_tests.cpp +++ b/cpp/tests/strings/replace_regex_tests.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "special_chars.h" + #include #include #include @@ -245,6 +247,53 @@ TEST_F(StringsReplaceRegexTest, Multiline) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, br_expected); } +TEST_F(StringsReplaceRegexTest, SpecialNewLines) +{ + auto input = cudf::test::strings_column_wrapper({"zzé" NEXT_LINE "qqq" NEXT_LINE "zzé", + "qqq" NEXT_LINE "zzé" NEXT_LINE "lll", + "zzé", + "", + "zzé" PARAGRAPH_SEPARATOR, + "abc\rzzé\r"}); + auto view = cudf::strings_column_view(input); + auto repl = cudf::string_scalar("_"); + auto pattern = std::string("^zzé$"); + auto prog = + cudf::strings::regex_program::create(pattern, cudf::strings::regex_flags::EXT_NEWLINE); + auto results = cudf::strings::replace_re(view, *prog, repl); + auto expected = cudf::test::strings_column_wrapper({"zzé" NEXT_LINE "qqq" NEXT_LINE "zzé", + "qqq" NEXT_LINE "zzé" NEXT_LINE "lll", + "_", + "", + "_" PARAGRAPH_SEPARATOR, + "abc\rzzé\r"}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); + + auto both_flags = static_cast( + cudf::strings::regex_flags::EXT_NEWLINE | cudf::strings::regex_flags::MULTILINE); + auto prog_ml = cudf::strings::regex_program::create(pattern, both_flags); + results = cudf::strings::replace_re(view, *prog_ml, repl); + expected = cudf::test::strings_column_wrapper({"_" NEXT_LINE "qqq" NEXT_LINE "_", + "qqq" NEXT_LINE "_" NEXT_LINE "lll", + "_", + "", + "_" PARAGRAPH_SEPARATOR, + "abc\r_\r"}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); + + auto repl_template = std::string("[\\1]"); + pattern = std::string("(^zzé$)"); + prog = cudf::strings::regex_program::create(pattern, both_flags); + results = cudf::strings::replace_with_backrefs(view, *prog, repl_template); + expected = cudf::test::strings_column_wrapper({"[zzé]" NEXT_LINE "qqq" NEXT_LINE "[zzé]", + "qqq" NEXT_LINE "[zzé]" NEXT_LINE "lll", + "[zzé]", + "", + "[zzé]" PARAGRAPH_SEPARATOR, + "abc\r[zzé]\r"}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); +} + TEST_F(StringsReplaceRegexTest, ReplaceBackrefsRegexTest) { std::vector h_strings{"the quick brown fox jumps over the lazy dog", diff --git a/cpp/tests/strings/special_chars.h b/cpp/tests/strings/special_chars.h new file mode 100644 index 00000000000..0d630f6bb52 --- /dev/null +++ b/cpp/tests/strings/special_chars.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace cudf::test { + +// special new-line characters for use with regex_flags::EXT_NEWLINE +#define NEXT_LINE "\xC2\x85" +#define LINE_SEPARATOR "\xE2\x80\xA8" +#define PARAGRAPH_SEPARATOR "\xE2\x80\xA9" + +} // namespace cudf::test diff --git a/cpp/tests/text/minhash_tests.cpp b/cpp/tests/text/minhash_tests.cpp index 7575a3ba846..e23f3f6e7d8 100644 --- a/cpp/tests/text/minhash_tests.cpp +++ b/cpp/tests/text/minhash_tests.cpp @@ -139,6 +139,41 @@ TEST_F(MinHashTest, MultiSeedWithNullInputRow) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results64, expected64); } +TEST_F(MinHashTest, WordsMinHash) +{ + using LCWS = cudf::test::lists_column_wrapper; + auto validity = cudf::test::iterators::null_at(1); + + LCWS input( + {LCWS({"hello", "abcdéfgh"}), + LCWS{}, + LCWS({"rapids", "moré", "test", "text"}), + LCWS({"The", "quick", "brown", "fox", "jumpéd", "over", "the", "lazy", "brown", "dog"})}, + validity); + + auto view = cudf::lists_column_view(input); + + auto seeds = cudf::test::fixed_width_column_wrapper({1, 2}); + auto results = nvtext::word_minhash(view, cudf::column_view(seeds)); + using LCW32 = cudf::test::lists_column_wrapper; + LCW32 expected({LCW32{2069617641u, 1975382903u}, + LCW32{}, + LCW32{657297235u, 1010955999u}, + LCW32{644643885u, 310002789u}}, + validity); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + + auto seeds64 = cudf::test::fixed_width_column_wrapper({11, 22}); + auto results64 = nvtext::word_minhash64(view, cudf::column_view(seeds64)); + using LCW64 = cudf::test::lists_column_wrapper; + LCW64 expected64({LCW64{1940333969930105370ul, 272615362982418219ul}, + LCW64{}, + LCW64{5331949571924938590ul, 2088583894581919741ul}, + LCW64{3400468157617183341ul, 2398577492366130055ul}}, + validity); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results64, expected64); +} + TEST_F(MinHashTest, EmptyTest) { auto input = cudf::make_empty_column(cudf::data_type{cudf::type_id::STRING}); diff --git a/dependencies.yaml b/dependencies.yaml index 483335c02ff..7a13043cc5f 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -710,7 +710,16 @@ dependencies: - numpy==1.23.* - pandas==2.0.* - pyarrow==14.0.0 - - cupy==12.0.0 # ignored as pip constraint + - matrix: + packages: + - output_types: conda + matrices: + - matrix: {dependencies: "oldest", arch: "aarch64", cuda: "12.*"} + packages: + - cupy==12.2.0 # cupy 12.2.0 is the earliest with CUDA 12 ARM packages. + - matrix: {dependencies: "oldest"} + packages: + - cupy==12.0.0 - matrix: packages: - output_types: requirements diff --git a/docs/cudf/source/user_guide/10min.ipynb b/docs/cudf/source/user_guide/10min.ipynb index 2eaa75b3189..95f5f9734dd 100644 --- a/docs/cudf/source/user_guide/10min.ipynb +++ b/docs/cudf/source/user_guide/10min.ipynb @@ -5,9 +5,9 @@ "id": "4c6c548b", "metadata": {}, "source": [ - "# 10 Minutes to cuDF and Dask-cuDF\n", + "# 10 Minutes to cuDF and Dask cuDF\n", "\n", - "Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly towards new users.\n", + "Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask cuDF, geared mainly towards new users.\n", "\n", "## What are these Libraries?\n", "\n", @@ -18,13 +18,14 @@ "[Dask cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) extends Dask where necessary to allow its DataFrame partitions to be processed using cuDF GPU DataFrames instead of Pandas DataFrames. For instance, when you call `dask_cudf.read_csv(...)`, your cluster's GPUs do the work of parsing the CSV file(s) by calling [`cudf.read_csv()`](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.read_csv.html).\n", "\n", "\n", - "> [!NOTE] \n", - "> This notebook uses the explicit Dask cuDF API (`dask_cudf`) for clarity. However, we strongly recommend that you use Dask's [configuration infrastructure](https://docs.dask.org/en/latest/configuration.html) to set the `\"dataframe.backend\"` to `\"cudf\"`, and work with the `dask.dataframe` API directly. Please see the [Dask cuDF documentation](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) for more information.\n", + "
\n", + "Note: This notebook uses the explicit Dask cuDF API (dask_cudf) for clarity. However, we strongly recommend that you use Dask's configuration infrastructure to set the \"dataframe.backend\" option to \"cudf\", and work with the Dask DataFrame API directly. Please see the Dask cuDF documentation for more information.\n", + "
\n", "\n", "\n", - "## When to use cuDF and Dask-cuDF\n", + "## When to use cuDF and Dask cuDF\n", "\n", - "If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF." + "If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask cuDF." ] }, { @@ -115,7 +116,7 @@ "source": [ "ds = dask_cudf.from_cudf(s, npartitions=2)\n", "# Note the call to head here to show the first few entries, unlike\n", - "# cuDF objects, dask-cuDF objects do not have a printing\n", + "# cuDF objects, Dask-cuDF objects do not have a printing\n", "# representation that shows values since they may not be in local\n", "# memory.\n", "ds.head(n=3)" @@ -331,11 +332,11 @@ "id": "b17db919", "metadata": {}, "source": [ - "Now we will convert our cuDF dataframe into a dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n", + "Now we will convert our cuDF dataframe into a Dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n", "\n", - "In this small case, we could call `ddf.compute()` to obtain a cuDF object from the dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n", + "In this small case, we could call `ddf.compute()` to obtain a cuDF object from the Dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a Dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n", "\n", - "*To understand more of the differences between how cuDF and dask-cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*" + "*To understand more of the differences between how cuDF and Dask cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*" ] }, { @@ -1680,7 +1681,7 @@ "id": "7aa0089f", "metadata": {}, "source": [ - "Note here we call `compute()` rather than `head()` on the dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)." + "Note here we call `compute()` rather than `head()` on the Dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)." ] }, { @@ -2393,7 +2394,7 @@ "id": "f6094cbe", "metadata": {}, "source": [ - "Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe." + "Applying functions to a `Series`. Note that applying user defined functions directly with Dask cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe." ] }, { @@ -3492,7 +3493,7 @@ "id": "5ac3b004", "metadata": {}, "source": [ - "Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF." + "Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask cuDF." ] }, { @@ -4181,7 +4182,7 @@ "id": "aa8a445b", "metadata": {}, "source": [ - "To convert the first few entries to pandas, we similarly call `.head()` on the dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert." + "To convert the first few entries to pandas, we similarly call `.head()` on the Dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert." ] }, { @@ -4899,7 +4900,7 @@ "id": "787eae14", "metadata": {}, "source": [ - "Note that for the dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU." + "Note that for the Dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU." ] }, { diff --git a/docs/dask_cudf/source/index.rst b/docs/dask_cudf/source/index.rst index 9a216690384..7fe6cbd45fa 100644 --- a/docs/dask_cudf/source/index.rst +++ b/docs/dask_cudf/source/index.rst @@ -3,39 +3,42 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -Welcome to dask-cudf's documentation! +Welcome to Dask cuDF's documentation! ===================================== -**Dask-cuDF** (pronounced "DASK KOO-dee-eff") is an extension +**Dask cuDF** (pronounced "DASK KOO-dee-eff") is an extension library for the `Dask `__ parallel computing -framework that provides a `cuDF -`__-backed distributed -dataframe with the same API as `Dask dataframes -`__. +framework. When installed, Dask cuDF is automatically registered +as the ``"cudf"`` dataframe backend for +`Dask DataFrame `__. + +.. note:: + Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU + or multi-node execution on their own. You must also deploy a + `dask.distributed ` cluster + to leverage multiple GPUs. We strongly recommend using `Dask-CUDA + `__ to simplify the + setup of the cluster, taking advantage of all features of the GPU + and networking hardware. If you are familiar with Dask and `pandas `__ or -`cuDF `__, then Dask-cuDF +`cuDF `__, then Dask cuDF should feel familiar to you. If not, we recommend starting with `10 minutes to Dask `__ followed -by `10 minutes to cuDF and Dask-cuDF +by `10 minutes to cuDF and Dask cuDF `__. -When running on multi-GPU systems, `Dask-CUDA -`__ is recommended to -simplify the setup of the cluster, taking advantage of all features of -the GPU and networking hardware. -Using Dask-cuDF +Using Dask cuDF --------------- -When installed, Dask-cuDF registers itself as a dataframe backend for -Dask. This means that in many cases, using cuDF-backed dataframes requires -only small changes to an existing workflow. The minimal change is to -select cuDF as the dataframe backend in :doc:`Dask's -configuration `. To do so, we must set the option -``dataframe.backend`` to ``cudf``. From Python, this can be achieved -like so:: +The Dask DataFrame API (Recommended) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Simply use the `Dask configuration ` system to +set the ``"dataframe.backend"`` option to ``"cudf"``. From Python, +this can be achieved like so:: import dask @@ -44,52 +47,157 @@ like so:: Alternatively, you can set ``DASK_DATAFRAME__BACKEND=cudf`` in the environment before running your code. -Dataframe creation from on-disk formats -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If your workflow creates Dask dataframes from on-disk formats -(for example using :func:`dask.dataframe.read_parquet`), then setting -the backend may well be enough to migrate your workflow. - -For example, consider reading a dataframe from parquet:: +Once this is done, the public Dask DataFrame API will leverage +``cudf`` automatically when a new DataFrame collection is created +from an on-disk format using any of the following ``dask.dataframe`` +functions:: - import dask.dataframe as dd +* :func:`dask.dataframe.read_parquet` +* :func:`dask.dataframe.read_json` +* :func:`dask.dataframe.read_csv` +* :func:`dask.dataframe.read_orc` +* :func:`dask.dataframe.read_hdf` +* :func:`dask.dataframe.from_dict` - # By default, we obtain a pandas-backed dataframe - df = dd.read_parquet("data.parquet", ...) +For example:: + import dask.dataframe as dd -To obtain a cuDF-backed dataframe, we must set the -``dataframe.backend`` configuration option:: + # By default, we obtain a pandas-backed dataframe + df = dd.read_parquet("data.parquet", ...) import dask - import dask.dataframe as dd dask.config.set({"dataframe.backend": "cudf"}) - # This gives us a cuDF-backed dataframe + # This now gives us a cuDF-backed dataframe df = dd.read_parquet("data.parquet", ...) -This code will use cuDF's GPU-accelerated :func:`parquet reader -` to read partitions of the data. +When other functions are used to create a new collection +(e.g. :func:`from_map`, :func:`from_pandas`, :func:`from_delayed`, +and :func:`from_array`), the backend of the new collection will +depend on the inputs to those functions. For example:: + + import pandas as pd + import cudf + + # This gives us a pandas-backed dataframe + dd.from_pandas(pd.DataFrame({"a": range(10)})) + + # This gives us a cuDF-backed dataframe + dd.from_pandas(cudf.DataFrame({"a": range(10)})) + +An existing collection can always be moved to a specific backend +using the :func:`dask.dataframe.DataFrame.to_backend` API:: + + # This ensures that we have a cuDF-backed dataframe + df = df.to_backend("cudf") + + # This ensures that we have a pandas-backed dataframe + df = df.to_backend("pandas") + +The explicit Dask cuDF API +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In addition to providing the ``"cudf"`` backend for Dask DataFrame, +Dask cuDF also provides an explicit ``dask_cudf`` API:: + + import dask_cudf + + # This always gives us a cuDF-backed dataframe + df = dask_cudf.read_parquet("data.parquet", ...) + +This API is used implicitly by the Dask DataFrame API when the ``"cudf"`` +backend is enabled. Therefore, using it directly will not provide any +performance benefit over the CPU/GPU-portable ``dask.dataframe`` API. +Also, using some parts of the explicit API are incompatible with +automatic query planning (see the next section). + +The explicit Dask cuDF API +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). +As long as the ``"dataframe.query-planning"`` configuration is set to +``True`` (the default) when ``dask.dataframe`` is first imported, `Dask +Expressions `__ will be used under the hood. + +For example, the following code will automatically benefit from predicate +pushdown when the result is computed:: + + df = dd.read_parquet("/my/parquet/dataset/") + result = df.sort_values('B')['A'] + +Unoptimized expression graph (``df.pprint()``):: + + Projection: columns='A' + SortValues: by=['B'] shuffle_method='tasks' options={} + ReadParquetFSSpec: path='/my/parquet/dataset/' ... + +Simplified expression graph (``df.simplify().pprint()``):: + + Projection: columns='A' + SortValues: by=['B'] shuffle_method='tasks' options={} + ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ... + +.. note:: + Dask will automatically simplify the expression graph (within + :func:`optimize`) when the result is converted to a task graph + (via :func:`compute` or :func:`persist`). You do not need to call + :func:`simplify` yourself. + + +Using Multiple GPUs and Multiple Nodes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try +to partition your data into small-enough tasks to fit comfortably in the +memory of a single GPU. This means the necessary compute tasks needed to +compute a query can often be streamed to a single GPU process for +out-of-core computing. This also means that the compute tasks can be +executed in parallel over a multi-GPU cluster. + +In order to execute your Dask workflow on multiple GPUs, you will +typically need to use `Dask-CUDA `__ +to deploy distributed Dask cluster, and +`Distributed `__ +to define a client object. For example:: + + from dask_cuda import LocalCUDACluster + from distributed import Client + + if __name__ == "__main__": + + client = Client( + LocalCUDACluster( + CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) + rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations + enable_cudf_spill=True, # Improve device memory stability + local_directory="/fast/scratch/", # Use fast local storage for spilling + ) + ) + + df = dd.read_parquet("/my/parquet/dataset/") + agg = df.groupby('B').sum() + agg.compute() # This will use the cluster defined above + +.. note:: + This example uses :func:`compute` to materialize a concrete + ``cudf.DataFrame`` object in local memory. Never call :func:`compute` + on a large collection that cannot fit comfortably in the memory of a + single GPU! See Dask's `documentation on managing computation + `__ + for more details. -Dataframe creation from in-memory formats -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Please see the `Dask-CUDA `__ +documentation for more information about deploying GPU-aware clusters +(including `best practices +`__). -If you already have a dataframe in memory and want to convert it to a -cuDF-backend one, there are two options depending on whether the -dataframe is already a Dask one or not. If you have a Dask dataframe, -then you can call :func:`dask.dataframe.to_backend` passing ``"cudf"`` -as the backend; if you have a pandas dataframe then you can either -call :func:`dask.dataframe.from_pandas` followed by -:func:`~dask.dataframe.to_backend` or first convert the dataframe with -:func:`cudf.from_pandas` and then parallelise this with -:func:`dask_cudf.from_cudf`. API Reference ------------- -Generally speaking, Dask-cuDF tries to offer exactly the same API as -Dask itself. There are, however, some minor differences mostly because +Generally speaking, Dask cuDF tries to offer exactly the same API as +Dask DataFrame. There are, however, some minor differences mostly because cuDF does not :doc:`perfectly mirror ` the pandas API, or because cuDF provides additional configuration flags (these mostly occur in data reading and writing interfaces). @@ -97,7 +205,7 @@ flags (these mostly occur in data reading and writing interfaces). As a result, straightforward workflows can be migrated without too much trouble, but more complex ones that utilise more features may need a bit of tweaking. The API documentation describes details of the -differences and all functionality that Dask-cuDF supports. +differences and all functionality that Dask cuDF supports. .. toctree:: :maxdepth: 2 diff --git a/java/src/main/java/ai/rapids/cudf/ColumnVector.java b/java/src/main/java/ai/rapids/cudf/ColumnVector.java index 5a0fbd224ad..6a0f0f6f169 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnVector.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnVector.java @@ -218,7 +218,13 @@ static long initViewHandle(DType type, int numRows, int nullCount, od, vd, nullCount, numRows, childHandles); } - static ColumnVector fromViewWithContiguousAllocation(long columnViewAddress, DeviceMemoryBuffer buffer) { + /** + * Creates a ColumnVector from a native column_view using a contiguous device allocation. + * + * @param columnViewAddress address of the native column_view + * @param buffer device buffer containing the data referenced by the column view + */ + public static ColumnVector fromViewWithContiguousAllocation(long columnViewAddress, DeviceMemoryBuffer buffer) { return new ColumnVector(columnViewAddress, buffer); } diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java index 7b58817550d..8c8180436e6 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,9 +24,13 @@ */ public final class ParquetWriterOptions extends CompressionMetadataWriterOptions { private final StatisticsFrequency statsGranularity; + private int rowGroupSizeRows; + private long rowGroupSizeBytes; private ParquetWriterOptions(Builder builder) { super(builder); + this.rowGroupSizeRows = builder.rowGroupSizeRows; + this.rowGroupSizeBytes = builder.rowGroupSizeBytes; this.statsGranularity = builder.statsGranularity; } @@ -51,18 +55,38 @@ public static Builder builder() { return new Builder(); } + public int getRowGroupSizeRows() { + return rowGroupSizeRows; + } + + public long getRowGroupSizeBytes() { + return rowGroupSizeBytes; + } + public StatisticsFrequency getStatisticsFrequency() { return statsGranularity; } public static class Builder extends CompressionMetadataWriterOptions.Builder { + private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group + private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP; public Builder() { super(); } + public Builder withRowGroupSizeRows(int rowGroupSizeRows) { + this.rowGroupSizeRows = rowGroupSizeRows; + return this; + } + + public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) { + this.rowGroupSizeBytes = rowGroupSizeBytes; + return this; + } + public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) { this.statsGranularity = statsGranularity; return this; diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index cbb126d7ee5..09da43374ae 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -332,20 +332,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames, /** * Setup everything to write parquet formatted data to a file. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param filename local output path + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param filename local output path * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetFileBegin(String[] columnNames, @@ -355,6 +357,8 @@ private static native long writeParquetFileBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -366,20 +370,22 @@ private static native long writeParquetFileBegin(String[] columnNames, /** * Setup everything to write parquet formatted data to a buffer. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param consumer consumer of host buffers produced. + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param consumer consumer of host buffers produced. * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetBufferBegin(String[] columnNames, @@ -389,6 +395,8 @@ private static native long writeParquetBufferBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -1820,6 +1828,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), @@ -1840,6 +1850,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 40a111209b0..92e213bcb60 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -2150,6 +2150,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2205,6 +2207,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) @@ -2227,6 +2231,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2280,6 +2286,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 56fe63598d9..830f2b33b32 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -9122,7 +9122,11 @@ void testParquetWriteToBufferChunked() { columns.add(Columns.STRUCT.name); WriteUtils.buildWriterOptions(optBuilder, columns); ParquetWriterOptions options = optBuilder.build(); - ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build(); + ParquetWriterOptions optionsNoCompress = + optBuilder.withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) + .build(); try (Table table0 = getExpectedFileTable(columns); MyBufferConsumer consumer = new MyBufferConsumer()) { try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { @@ -9208,6 +9212,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { .withDecimalColumn("_c7", 4) .withDecimalColumn("_c8", 6) .withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { diff --git a/python/cudf/cudf/_lib/nvtext/minhash.pyx b/python/cudf/cudf/_lib/nvtext/minhash.pyx index 5ee15d0e409..59cb8d51440 100644 --- a/python/cudf/cudf/_lib/nvtext/minhash.pyx +++ b/python/cudf/cudf/_lib/nvtext/minhash.pyx @@ -10,6 +10,8 @@ from pylibcudf.libcudf.column.column_view cimport column_view from pylibcudf.libcudf.nvtext.minhash cimport ( minhash as cpp_minhash, minhash64 as cpp_minhash64, + word_minhash as cpp_word_minhash, + word_minhash64 as cpp_word_minhash64, ) from pylibcudf.libcudf.types cimport size_type @@ -54,3 +56,39 @@ def minhash64(Column strings, Column seeds, int width): ) return Column.from_unique_ptr(move(c_result)) + + +@acquire_spill_lock() +def word_minhash(Column input, Column seeds): + + cdef column_view c_input = input.view() + cdef column_view c_seeds = seeds.view() + cdef unique_ptr[column] c_result + + with nogil: + c_result = move( + cpp_word_minhash( + c_input, + c_seeds + ) + ) + + return Column.from_unique_ptr(move(c_result)) + + +@acquire_spill_lock() +def word_minhash64(Column input, Column seeds): + + cdef column_view c_input = input.view() + cdef column_view c_seeds = seeds.view() + cdef unique_ptr[column] c_result + + with nogil: + c_result = move( + cpp_word_minhash64( + c_input, + c_seeds + ) + ) + + return Column.from_unique_ptr(move(c_result)) diff --git a/python/cudf/cudf/_lib/strings/__init__.py b/python/cudf/cudf/_lib/strings/__init__.py index 47a194c4fda..4bf8a9b1a8f 100644 --- a/python/cudf/cudf/_lib/strings/__init__.py +++ b/python/cudf/cudf/_lib/strings/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from cudf._lib.nvtext.edit_distance import edit_distance, edit_distance_matrix from cudf._lib.nvtext.generate_ngrams import ( generate_character_ngrams, @@ -6,7 +6,12 @@ hash_character_ngrams, ) from cudf._lib.nvtext.jaccard import jaccard_index -from cudf._lib.nvtext.minhash import minhash, minhash64 +from cudf._lib.nvtext.minhash import ( + minhash, + minhash64, + word_minhash, + word_minhash64, +) from cudf._lib.nvtext.ngrams_tokenize import ngrams_tokenize from cudf._lib.nvtext.normalize import normalize_characters, normalize_spaces from cudf._lib.nvtext.replace import filter_tokens, replace_tokens diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index 16e6908f308..e059917b0b8 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -5349,6 +5349,76 @@ def minhash64( libstrings.minhash64(self._column, seeds_column, width) ) + def word_minhash(self, seeds: ColumnLike | None = None) -> SeriesOrIndex: + """ + Compute the minhash of a list column of strings. + This uses the MurmurHash3_x86_32 algorithm for the hash function. + + Parameters + ---------- + seeds : ColumnLike + The seeds used for the hash algorithm. + Must be of type uint32. + + Examples + -------- + >>> import cudf + >>> import numpy as np + >>> ls = cudf.Series([["this", "is", "my"], ["favorite", "book"]]) + >>> seeds = cudf.Series([0, 1, 2], dtype=np.uint32) + >>> ls.str.word_minhash(seeds=seeds) + 0 [21141582, 1232889953, 1268336794] + 1 [962346254, 2321233602, 1354839212] + dtype: list + """ + if seeds is None: + seeds_column = column.as_column(0, dtype=np.uint32, length=1) + else: + seeds_column = column.as_column(seeds) + if seeds_column.dtype != np.uint32: + raise ValueError( + f"Expecting a Series with dtype uint32, got {type(seeds)}" + ) + return self._return_or_inplace( + libstrings.word_minhash(self._column, seeds_column) + ) + + def word_minhash64(self, seeds: ColumnLike | None = None) -> SeriesOrIndex: + """ + Compute the minhash of a list column of strings. + This uses the MurmurHash3_x64_128 algorithm for the hash function. + This function generates 2 uint64 values but only the first + uint64 value is used. + + Parameters + ---------- + seeds : ColumnLike + The seeds used for the hash algorithm. + Must be of type uint64. + + Examples + -------- + >>> import cudf + >>> import numpy as np + >>> ls = cudf.Series([["this", "is", "my"], ["favorite", "book"]]) + >>> seeds = cudf.Series([0, 1, 2], dtype=np.uint64) + >>> ls.str.word_minhash64(seeds) + 0 [2603139454418834912, 8644371945174847701, 5541030711534384340] + 1 [5240044617220523711, 5847101123925041457, 153762819128779913] + dtype: list + """ + if seeds is None: + seeds_column = column.as_column(0, dtype=np.uint64, length=1) + else: + seeds_column = column.as_column(seeds) + if seeds_column.dtype != np.uint64: + raise ValueError( + f"Expecting a Series with dtype uint64, got {type(seeds)}" + ) + return self._return_or_inplace( + libstrings.word_minhash64(self._column, seeds_column) + ) + def jaccard_index(self, input: cudf.Series, width: int) -> SeriesOrIndex: """ Compute the Jaccard index between this column and the given diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 3d205957126..c026579b8b5 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -738,7 +738,8 @@ def get_dummies( sparse : boolean, optional Right now this is NON-FUNCTIONAL argument in rapids. drop_first : boolean, optional - Right now this is NON-FUNCTIONAL argument in rapids. + Whether to get k-1 dummies out of k categorical levels by removing the + first level. columns : sequence of str, optional Names of columns to encode. If not provided, will attempt to encode all columns. Note this is different from pandas default behavior, which @@ -806,9 +807,6 @@ def get_dummies( if sparse: raise NotImplementedError("sparse is not supported yet") - if drop_first: - raise NotImplementedError("drop_first is not supported yet") - if isinstance(data, cudf.DataFrame): encode_fallback_dtypes = ["object", "category"] @@ -862,6 +860,7 @@ def get_dummies( prefix=prefix_map.get(name, prefix), prefix_sep=prefix_sep_map.get(name, prefix_sep), dtype=dtype, + drop_first=drop_first, ) result_data.update(col_enc_data) return cudf.DataFrame._from_data(result_data, index=data.index) @@ -874,6 +873,7 @@ def get_dummies( prefix=prefix, prefix_sep=prefix_sep, dtype=dtype, + drop_first=drop_first, ) return cudf.DataFrame._from_data(data, index=ser.index) @@ -1256,6 +1256,7 @@ def _one_hot_encode_column( prefix: str | None, prefix_sep: str | None, dtype: Dtype | None, + drop_first: bool, ) -> dict[str, ColumnBase]: """Encode a single column with one hot encoding. The return dictionary contains pairs of (category, encodings). The keys may be prefixed with @@ -1276,6 +1277,8 @@ def _one_hot_encode_column( ) data = one_hot_encode(column, categories) + if drop_first and len(data): + data.pop(next(iter(data))) if prefix is not None and prefix_sep is not None: data = {f"{prefix}{prefix_sep}{col}": enc for col, enc in data.items()} if dtype: diff --git a/python/cudf/cudf/tests/test_onehot.py b/python/cudf/cudf/tests/test_onehot.py index cc17dc46e0a..e054143b438 100644 --- a/python/cudf/cudf/tests/test_onehot.py +++ b/python/cudf/cudf/tests/test_onehot.py @@ -161,3 +161,20 @@ def test_get_dummies_cats_deprecated(): df = cudf.DataFrame(range(3)) with pytest.warns(FutureWarning): cudf.get_dummies(df, cats={0: [0, 1, 2]}) + + +def test_get_dummies_drop_first_series(): + result = cudf.get_dummies(cudf.Series(list("abcaa")), drop_first=True) + expected = pd.get_dummies(pd.Series(list("abcaa")), drop_first=True) + assert_eq(result, expected) + + +def test_get_dummies_drop_first_dataframe(): + result = cudf.get_dummies( + cudf.DataFrame({"A": list("abcaa"), "B": list("bcaab")}), + drop_first=True, + ) + expected = pd.get_dummies( + pd.DataFrame({"A": list("abcaa"), "B": list("bcaab")}), drop_first=True + ) + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/text/test_text_methods.py b/python/cudf/cudf/tests/text/test_text_methods.py index 52179f55da3..997ca357986 100644 --- a/python/cudf/cudf/tests/text/test_text_methods.py +++ b/python/cudf/cudf/tests/text/test_text_methods.py @@ -946,6 +946,66 @@ def test_minhash(): strings.str.minhash64(seeds=seeds) +def test_word_minhash(): + ls = cudf.Series([["this", "is", "my"], ["favorite", "book"]]) + + expected = cudf.Series( + [ + cudf.Series([21141582], dtype=np.uint32), + cudf.Series([962346254], dtype=np.uint32), + ] + ) + actual = ls.str.word_minhash() + assert_eq(expected, actual) + seeds = cudf.Series([0, 1, 2], dtype=np.uint32) + expected = cudf.Series( + [ + cudf.Series([21141582, 1232889953, 1268336794], dtype=np.uint32), + cudf.Series([962346254, 2321233602, 1354839212], dtype=np.uint32), + ] + ) + actual = ls.str.word_minhash(seeds=seeds) + assert_eq(expected, actual) + + expected = cudf.Series( + [ + cudf.Series([2603139454418834912], dtype=np.uint64), + cudf.Series([5240044617220523711], dtype=np.uint64), + ] + ) + actual = ls.str.word_minhash64() + assert_eq(expected, actual) + seeds = cudf.Series([0, 1, 2], dtype=np.uint64) + expected = cudf.Series( + [ + cudf.Series( + [ + 2603139454418834912, + 8644371945174847701, + 5541030711534384340, + ], + dtype=np.uint64, + ), + cudf.Series( + [5240044617220523711, 5847101123925041457, 153762819128779913], + dtype=np.uint64, + ), + ] + ) + actual = ls.str.word_minhash64(seeds=seeds) + assert_eq(expected, actual) + + # test wrong seed types + with pytest.raises(ValueError): + ls.str.word_minhash(seeds="a") + with pytest.raises(ValueError): + seeds = cudf.Series([0, 1, 2], dtype=np.int32) + ls.str.word_minhash(seeds=seeds) + with pytest.raises(ValueError): + seeds = cudf.Series([0, 1, 2], dtype=np.uint32) + ls.str.word_minhash64(seeds=seeds) + + def test_jaccard_index(): str1 = cudf.Series(["the brown dog", "jumped about"]) str2 = cudf.Series(["the black cat", "jumped around"]) diff --git a/python/dask_cudf/README.md b/python/dask_cudf/README.md index 6edb9f87d48..4655d2165f0 100644 --- a/python/dask_cudf/README.md +++ b/python/dask_cudf/README.md @@ -1,135 +1,63 @@ #
 Dask cuDF - A GPU Backend for Dask DataFrame
-Dask cuDF (a.k.a. dask-cudf or `dask_cudf`) is an extension library for [Dask DataFrame](https://docs.dask.org/en/stable/dataframe.html). When installed, Dask cuDF is automatically registered as the `"cudf"` [dataframe backend](https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html) for Dask DataFrame. - -## Using Dask cuDF - -### The Dask DataFrame API (Recommended) - -Simply set the `"dataframe.backend"` [configuration](https://docs.dask.org/en/stable/configuration.html) to `"cudf"` in Dask, and the public Dask DataFrame API will leverage `cudf` automatically: - -```python -import dask -dask.config.set({"dataframe.backend": "cudf"}) - -import dask.dataframe as dd -# This gives us a cuDF-backed dataframe -df = dd.read_parquet("data.parquet", ...) -``` +Dask cuDF (a.k.a. dask-cudf or `dask_cudf`) is an extension library for [Dask DataFrame](https://docs.dask.org/en/stable/dataframe.html) that provides a Pandas-like API for parallel and larger-than-memory DataFrame computing on GPUs. When installed, Dask cuDF is automatically registered as the `"cudf"` [dataframe backend](https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html) for Dask DataFrame. > [!IMPORTANT] -> The `"dataframe.backend"` configuration will only be used for collection creation when the following APIs are used: `read_parquet`, `read_json`, `read_csv`, `read_orc`, `read_hdf`, and `from_dict`. For example, if `from_map`, `from_pandas`, `from_delayed`, or `from_array` are used, the backend of the new collection will depend on the input to the function: - -```python -import pandas as pd -import cudf - -# This gives us a Pandas-backed dataframe -dd.from_pandas(pd.DataFrame({"a": range(10)})) - -# This gives us a cuDF-backed dataframe -dd.from_pandas(cudf.DataFrame({"a": range(10)})) -``` - -A cuDF-backed DataFrame collection can be moved to the `"pandas"` backend: - -```python -df = df.to_backend("pandas") -``` - -Similarly, a Pandas-backed DataFrame collection can be moved to the `"cudf"` backend: - -```python -df = df.to_backend("cudf") -``` - -### The Explicit Dask cuDF API - -In addition to providing the `"cudf"` backend for Dask DataFrame, Dask cuDF also provides an explicit `dask_cudf` API: - -```python -import dask_cudf - -# This always gives us a cuDF-backed dataframe -df = dask_cudf.read_parquet("data.parquet", ...) -``` - -> [!NOTE] -> This API is used implicitly by the Dask DataFrame API when the `"cudf"` backend is enabled. Therefore, using it directly will not provide any performance benefit over the CPU/GPU-portable `dask.dataframe` API. Also, using some parts of the explicit API are incompatible with automatic query planning (see the next section). +> Dask cuDF does not provide support for multi-GPU or multi-node execution on its own. You must also deploy a distributed cluster (ideally with [Dask-CUDA](https://docs.rapids.ai/api/dask-cuda/stable/)) to leverage multiple GPUs efficiently. -See the [Dask cuDF's API documentation](https://docs.rapids.ai/api/dask-cudf/stable/) for further information. - -## Query Planning - -Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). As long as the `"dataframe.query-planning"` configuration is set to `True` (the default) when `dask.dataframe` is first imported, [Dask Expressions](https://github.com/dask/dask-expr) will be used under the hood. - -For example, the following user code will automatically benefit from predicate pushdown when the result is computed. - -```python -df = dd.read_parquet("/my/parquet/dataset/") -result = df.sort_values('B')['A'] -``` - -Unoptimized expression graph (`df.pprint()`): -``` -Projection: columns='A' - SortValues: by=['B'] shuffle_method='tasks' options={} - ReadParquetFSSpec: path='/my/parquet/dataset/' ... -``` +## Using Dask cuDF -Simplified expression graph (`df.simplify().pprint()`): -``` -Projection: columns='A' - SortValues: by=['B'] shuffle_method='tasks' options={} - ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ... -``` +Please visit [the official documentation page](https://docs.rapids.ai/api/dask-cudf/stable/) for detailed information about using Dask cuDF. -> [!NOTE] -> Dask will automatically simplify the expression graph (within `optimize`) when the result is converted to a task graph (via `compute` or `persist`). The user does not need to call `simplify` themself. +## Installation +See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to-date information and commands for installing Dask cuDF and other RAPIDS packages. -## Using Multiple GPUs and Multiple Nodes +## Resources -Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try to partition your data into small-enough tasks to fit comfortably in the memory of a single GPU. This means the necessary compute tasks needed to compute a query can often be streamed to a single GPU process for out-of-core computing. This also means that the compute tasks can be executed in parallel over a multi-GPU cluster. +- [Dask cuDF documentation](https://docs.rapids.ai/api/dask-cudf/stable/) +- [cuDF documentation](https://docs.rapids.ai/api/cudf/stable/) +- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/) +- [Dask-CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/) +- [Deployment](https://docs.rapids.ai/deployment/stable/) +- [RAPIDS Community](https://rapids.ai/learn-more/#get-involved): Get help, contribute, and collaborate. -> [!IMPORTANT] -> Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU or multi-node execution on their own. You must deploy a distributed cluster (ideally with [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/)) to leverage multiple GPUs. +### Quick-start example -In order to execute your Dask workflow on multiple GPUs, you will typically need to use [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/) to deploy distributed Dask cluster, and [Distributed](https://distributed.dask.org/en/stable/client.html) to define a `client` object. For example: +A very common Dask cuDF use case is single-node multi-GPU data processing. These workflows typically use the following pattern: ```python - +import dask +import dask.dataframe as dd from dask_cuda import LocalCUDACluster from distributed import Client -client = Client( +if __name__ == "__main__": + + # Define a GPU-aware cluster to leverage multiple GPUs + client = Client( LocalCUDACluster( - CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) - rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations - enable_cudf_spill=True, # Improve device memory stability - local_directory="/fast/scratch/", # Use fast local storage for spilling + CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) + rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations + enable_cudf_spill=True, # Improve device memory stability + local_directory="/fast/scratch/", # Use fast local storage for spilling ) -) + ) -df = dd.read_parquet("/my/parquet/dataset/") -agg = df.groupby('B').sum() -agg.compute() # This will use the cluster defined above -``` + # Set the default dataframe backend to "cudf" + dask.config.set({"dataframe.backend": "cudf"}) -> [!NOTE] -> This example uses `compute` to materialize a concrete `cudf.DataFrame` object in local memory. Never call `compute` on a large collection that cannot fit comfortably in the memory of a single GPU! See Dask's [documentation on managing computation](https://distributed.dask.org/en/stable/manage-computation.html) for more details. + # Create your DataFrame collection from on-disk + # or in-memory data + df = dd.read_parquet("/my/parquet/dataset/") -Please see the [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/) documentation for more information about deploying GPU-aware clusters (including [best practices](https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/)). + # Use cudf-like syntax to transform and/or query your data + query = df.groupby('item')['price'].mean() -## Install - -See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to-date information and commands for installing Dask cuDF and other RAPIDS packages. + # Compute, persist, or write out the result + query.head() +``` -## Resources +If you do not have multiple GPUs available, using `LocalCUDACluster` is optional. However, it is still a good idea to [enable cuDF spilling](https://docs.rapids.ai/api/cudf/stable/developer_guide/library_design/#spilling-to-host-memory). -- [Dask cuDF API documentation](https://docs.rapids.ai/api/dask-cudf/stable/) -- [cuDF API documentation](https://docs.rapids.ai/api/cudf/stable/) -- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/) -- [Dask CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/) -- [Deployment](https://docs.rapids.ai/deployment/stable/) -- [RAPIDS Community](https://rapids.ai/learn-more/#get-involved): Get help, contribute, and collaborate. +If you wish to scale across multiple nodes, you will need to use a different mechanism to deploy your Dask-CUDA workers. Please see [the RAPIDS deployment documentation](https://docs.rapids.ai/deployment/stable/) for more instructions. diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py index f60e4ff81ef..97e1dffc65b 100644 --- a/python/dask_cudf/dask_cudf/expr/_collection.py +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -49,8 +49,24 @@ def to_dask_dataframe(self, **kwargs): return self.to_backend("pandas", **kwargs) + def _prepare_cov_corr(self, min_periods, numeric_only): + # Upstream version of this method sets min_periods + # to 2 by default (which is not supported by cudf) + # TODO: Remove when cudf supports both min_periods + # and numeric_only + # See: https://github.com/rapidsai/cudf/issues/12626 + # See: https://github.com/rapidsai/cudf/issues/9009 + self._meta.cov(min_periods=min_periods) + + frame = self + if numeric_only: + numerics = self._meta._get_numeric_data() + if len(numerics.columns) != len(self.columns): + frame = frame[list(numerics.columns)] + return frame, min_periods + # var can be removed if cudf#15179 is addressed. - # See: https://github.com/rapidsai/cudf/issues/15179 + # See: https://github.com/rapidsai/cudf/issues/14935 def var( self, axis=0, diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 905d8c08135..7aa0f6320f2 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -1007,3 +1007,20 @@ def test_to_backend_simplify(): df2 = df.to_backend("cudf")[["y"]].simplify() df3 = df[["y"]].to_backend("cudf").to_backend("cudf").simplify() assert df2._name == df3._name + + +@pytest.mark.parametrize("numeric_only", [True, False]) +@pytest.mark.parametrize("op", ["corr", "cov"]) +def test_cov_corr(op, numeric_only): + df = cudf.DataFrame.from_dict( + { + "x": np.random.randint(0, 5, size=10), + "y": np.random.normal(size=10), + } + ) + ddf = dd.from_pandas(df, npartitions=2) + res = getattr(ddf, op)(numeric_only=numeric_only) + # Use to_pandas until cudf supports numeric_only + # (See: https://github.com/rapidsai/cudf/issues/12626) + expect = getattr(df.to_pandas(), op)(numeric_only=numeric_only) + dd.assert_eq(res, expect) diff --git a/python/pylibcudf/pylibcudf/libcudf/nvtext/minhash.pxd b/python/pylibcudf/pylibcudf/libcudf/nvtext/minhash.pxd index 0c352a5068b..f2dd22f43aa 100644 --- a/python/pylibcudf/pylibcudf/libcudf/nvtext/minhash.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/nvtext/minhash.pxd @@ -19,3 +19,13 @@ cdef extern from "nvtext/minhash.hpp" namespace "nvtext" nogil: const column_view &seeds, const size_type width, ) except + + + cdef unique_ptr[column] word_minhash( + const column_view &input, + const column_view &seeds + ) except + + + cdef unique_ptr[column] word_minhash64( + const column_view &input, + const column_view &seeds + ) except +