diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 5754994f412..7fdaff35525 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -354,9 +354,7 @@ ConfigureNVBench( # ################################################################################################## # * strings benchmark ------------------------------------------------------------------- -ConfigureBench( - STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/replace.cpp string/url_decode.cu -) +ConfigureBench(STRINGS_BENCH string/factory.cu string/repeat_strings.cpp string/url_decode.cu) ConfigureNVBench( STRINGS_NVBENCH @@ -380,6 +378,7 @@ ConfigureNVBench( string/lengths.cpp string/like.cpp string/make_strings_column.cu + string/replace.cpp string/replace_re.cpp string/reverse.cpp string/slice.cpp diff --git a/cpp/benchmarks/string/replace.cpp b/cpp/benchmarks/string/replace.cpp index 3d9d51bfd6d..643e857f356 100644 --- a/cpp/benchmarks/string/replace.cpp +++ b/cpp/benchmarks/string/replace.cpp @@ -14,11 +14,8 @@ * limitations under the License. */ -#include "string_bench_args.hpp" - #include #include -#include #include @@ -27,59 +24,51 @@ #include #include -#include - -class StringReplace : public cudf::benchmark {}; +#include enum replace_type { scalar, slice, multi }; -static void BM_replace(benchmark::State& state, replace_type rt) +static void bench_replace(nvbench::state& state) { - cudf::size_type const n_rows{static_cast(state.range(0))}; - cudf::size_type const max_str_length{static_cast(state.range(1))}; + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const min_width = static_cast(state.get_int64("min_width")); + auto const max_width = static_cast(state.get_int64("max_width")); + auto const api = state.get_string("api"); + data_profile const profile = data_profile_builder().distribution( - cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length); - auto const column = create_random_column(cudf::type_id::STRING, row_count{n_rows}, profile); - cudf::strings_column_view input(column->view()); - cudf::string_scalar target("+"); - cudf::string_scalar repl(""); - cudf::test::strings_column_wrapper targets({"+", "-"}); - cudf::test::strings_column_wrapper repls({"", ""}); + cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width); + auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile); - for (auto _ : state) { - cuda_event_timer raii(state, true, cudf::get_default_stream()); - switch (rt) { - case scalar: cudf::strings::replace(input, target, repl); break; - case slice: cudf::strings::replace_slice(input, repl, 1, 10); break; - case multi: - cudf::strings::replace_multiple( - input, cudf::strings_column_view(targets), cudf::strings_column_view(repls)); - break; - } - } + cudf::strings_column_view input(column->view()); - state.SetBytesProcessed(state.iterations() * input.chars_size(cudf::get_default_stream())); -} + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + auto const chars_size = input.chars_size(stream); + state.add_global_memory_reads(chars_size); + state.add_global_memory_writes(chars_size); -static void generate_bench_args(benchmark::internal::Benchmark* b) -{ - int const min_rows = 1 << 12; - int const max_rows = 1 << 24; - int const row_mult = 8; - int const min_rowlen = 1 << 5; - int const max_rowlen = 1 << 13; - int const len_mult = 2; - generate_string_bench_args(b, min_rows, max_rows, row_mult, min_rowlen, max_rowlen, len_mult); + if (api == "scalar") { + cudf::string_scalar target("+"); + cudf::string_scalar repl("-"); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::replace(input, target, repl); }); + } else if (api == "multi") { + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::test::strings_column_wrapper targets({"+", " "}); + cudf::test::strings_column_wrapper repls({"-", "_"}); + cudf::strings::replace_multiple( + input, cudf::strings_column_view(targets), cudf::strings_column_view(repls)); + }); + } else if (api == "slice") { + cudf::string_scalar repl("0123456789"); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::replace_slice(input, repl, 1, 10); }); + } } -#define STRINGS_BENCHMARK_DEFINE(name) \ - BENCHMARK_DEFINE_F(StringReplace, name) \ - (::benchmark::State & st) { BM_replace(st, replace_type::name); } \ - BENCHMARK_REGISTER_F(StringReplace, name) \ - ->Apply(generate_bench_args) \ - ->UseManualTime() \ - ->Unit(benchmark::kMillisecond); - -STRINGS_BENCHMARK_DEFINE(scalar) -STRINGS_BENCHMARK_DEFINE(slice) -STRINGS_BENCHMARK_DEFINE(multi) +NVBENCH_BENCH(bench_replace) + .set_name("replace") + .add_int64_axis("min_width", {0}) + .add_int64_axis("max_width", {32, 64, 128, 256}) + .add_int64_axis("num_rows", {32768, 262144, 2097152}) + .add_string_axis("api", {"scalar", "multi", "slice"}); diff --git a/cpp/src/binaryop/compiled/binary_ops.cuh b/cpp/src/binaryop/compiled/binary_ops.cuh index c6af0c3c58a..06987139188 100644 --- a/cpp/src/binaryop/compiled/binary_ops.cuh +++ b/cpp/src/binaryop/compiled/binary_ops.cuh @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include @@ -253,16 +253,11 @@ struct binary_op_double_device_dispatcher { template CUDF_KERNEL void for_each_kernel(cudf::size_type size, Functor f) { - int tid = threadIdx.x; - int blkid = blockIdx.x; - int blksz = blockDim.x; - int gridsz = gridDim.x; - - int start = tid + blkid * blksz; - int step = blksz * gridsz; + auto start = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); #pragma unroll - for (cudf::size_type i = start; i < size; i += step) { + for (auto i = start; i < size; i += stride) { f(i); } } @@ -282,9 +277,9 @@ void for_each(rmm::cuda_stream_view stream, cudf::size_type size, Functor f) int min_grid_size; CUDF_CUDA_TRY( cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, for_each_kernel)); - // 2 elements per thread. - int const grid_size = util::div_rounding_up_safe(size, 2 * block_size); - for_each_kernel<<>>(size, std::forward(f)); + auto grid = cudf::detail::grid_1d(size, block_size, 2 /* elements_per_thread */); + for_each_kernel<<>>( + size, std::forward(f)); } template diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 3307b4fa539..cea0ebad8f5 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -56,7 +56,8 @@ void set_up_kvikio() { static std::once_flag flag{}; std::call_once(flag, [] { - auto const compat_mode = kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", true); + auto const compat_mode = + kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON); kvikio::defaults::compat_mode_reset(compat_mode); auto const nthreads = getenv_or("KVIKIO_NTHREADS", 4u); diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 68377ad6d5f..b37a5ac900a 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -45,7 +45,7 @@ class file_sink : public data_sink { cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode {}.", - _kvikio_file.is_compat_mode_on() ? "on" : "off"); + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } else { _cufile_out = detail::make_cufile_output(filepath); } diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 0870e4a84a7..10814eea458 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -56,7 +56,7 @@ class file_source : public datasource { cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode {}.", - _kvikio_file.is_compat_mode_on() ? "on" : "off"); + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } else { _cufile_in = detail::make_cufile_input(filepath); } diff --git a/cpp/src/join/distinct_hash_join.cu b/cpp/src/join/distinct_hash_join.cu index 515d28201e8..ce4d2067b82 100644 --- a/cpp/src/join/distinct_hash_join.cu +++ b/cpp/src/join/distinct_hash_join.cu @@ -33,7 +33,9 @@ #include #include #include +#include #include +#include #include #include @@ -79,14 +81,9 @@ class build_keys_fn { /** * @brief Device output transform functor to construct `size_type` with `cuco::pair` or `cuco::pair` + * rhs_index_type>` */ struct output_fn { - __device__ constexpr cudf::size_type operator()( - cuco::pair const& x) const - { - return static_cast(x.second); - } __device__ constexpr cudf::size_type operator()( cuco::pair const& x) const { @@ -176,15 +173,33 @@ distinct_hash_join::inner_join(rmm::cuda_stream_view stream, auto const iter = cudf::detail::make_counting_transform_iterator( 0, build_keys_fn{d_probe_hasher}); - auto const build_indices_begin = - thrust::make_transform_output_iterator(build_indices->begin(), output_fn{}); - auto const probe_indices_begin = - thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{}); - - auto const [probe_indices_end, _] = this->_hash_table.retrieve( - iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, {stream.value()}); + auto found_indices = rmm::device_uvector(probe_table_num_rows, stream); + auto const found_begin = + thrust::make_transform_output_iterator(found_indices.begin(), output_fn{}); + + // TODO conditional find for nulls once `cuco::static_set::find_if` is added + // If `idx` is within the range `[0, probe_table_num_rows)` and `found_indices[idx]` is not equal + // to `JoinNoneValue`, then `idx` has a match in the hash set. + this->_hash_table.find_async(iter, iter + probe_table_num_rows, found_begin, stream.value()); + + auto const tuple_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type>( + [found_iter = found_indices.begin()] __device__(size_type idx) { + return thrust::tuple{*(found_iter + idx), idx}; + })); + auto const output_begin = + thrust::make_zip_iterator(build_indices->begin(), probe_indices->begin()); + auto const output_end = + thrust::copy_if(rmm::exec_policy_nosync(stream), + tuple_iter, + tuple_iter + probe_table_num_rows, + found_indices.begin(), + output_begin, + cuda::proclaim_return_type( + [] __device__(size_type idx) { return idx != JoinNoneValue; })); + auto const actual_size = std::distance(output_begin, output_end); - auto const actual_size = std::distance(probe_indices_begin, probe_indices_end); build_indices->resize(actual_size, stream); probe_indices->resize(actual_size, stream); diff --git a/cpp/tests/binaryop/binop-compiled-test.cpp b/cpp/tests/binaryop/binop-compiled-test.cpp index 3bd67001c16..7cdce1ff735 100644 --- a/cpp/tests/binaryop/binop-compiled-test.cpp +++ b/cpp/tests/binaryop/binop-compiled-test.cpp @@ -23,9 +23,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -820,4 +822,24 @@ TEST_F(BinaryOperationCompiledTest_NullOpsString, NullMin_Vector_Vector) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected, result->view()); } +TEST(BinaryOperationCompiledTest, LargeColumnNoOverflow) +{ + cudf::size_type num_rows{1'799'989'091}; + auto big = cudf::make_column_from_scalar( + cudf::numeric_scalar>{10, true}, num_rows); + auto small = cudf::make_column_from_scalar( + cudf::numeric_scalar>{1, true}, num_rows); + + auto mask = cudf::binary_operation(big->view(), + small->view(), + cudf::binary_operator::GREATER, + cudf::data_type{cudf::type_id::BOOL8}); + + auto agg = cudf::make_sum_aggregation(); + auto result = + cudf::reduce(mask->view(), *agg, cudf::data_type{cudf::type_to_id()}); + auto got = static_cast*>(result.get())->value(); + EXPECT_EQ(num_rows, got); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/transform.pyx b/python/cudf/cudf/_lib/transform.pyx index 1589e23f716..a163bb07888 100644 --- a/python/cudf/cudf/_lib/transform.pyx +++ b/python/cudf/cudf/_lib/transform.pyx @@ -3,12 +3,10 @@ from numba.np import numpy_support import cudf -from cudf.core._internals.expressions import parse_expression from cudf.core.buffer import acquire_spill_lock, as_buffer from cudf.utils import cudautils from pylibcudf cimport transform as plc_transform -from pylibcudf.expressions cimport Expression from pylibcudf.libcudf.types cimport size_type from cudf._lib.column cimport Column @@ -93,7 +91,7 @@ def one_hot_encode(Column input_column, Column categories): @acquire_spill_lock() -def compute_column(list columns, tuple column_names, expr: str): +def compute_column(list columns, tuple column_names, str expr): """Compute a new column by evaluating an expression on a set of columns. Parameters @@ -108,12 +106,8 @@ def compute_column(list columns, tuple column_names, expr: str): expr : str The expression to evaluate. """ - visitor = parse_expression(expr, column_names) - - # At the end, all the stack contains is the expression to evaluate. - cdef Expression cudf_expr = visitor.expression result = plc_transform.compute_column( plc.Table([col.to_pylibcudf(mode="read") for col in columns]), - cudf_expr, + plc.expressions.to_expression(expr, column_names), ) return Column.from_pylibcudf(result) diff --git a/python/cudf/cudf/core/_internals/expressions.py b/python/cudf/cudf/core/_internals/expressions.py deleted file mode 100644 index 90d9118027a..00000000000 --- a/python/cudf/cudf/core/_internals/expressions.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -from __future__ import annotations - -import ast -import functools - -import pyarrow as pa - -import pylibcudf as plc -from pylibcudf.expressions import ( - ASTOperator, - ColumnReference, - Expression, - Literal, - Operation, -) - -# This dictionary encodes the mapping from Python AST operators to their cudf -# counterparts. -python_cudf_operator_map = { - # Binary operators - ast.Add: ASTOperator.ADD, - ast.Sub: ASTOperator.SUB, - ast.Mult: ASTOperator.MUL, - ast.Div: ASTOperator.DIV, - ast.FloorDiv: ASTOperator.FLOOR_DIV, - ast.Mod: ASTOperator.PYMOD, - ast.Pow: ASTOperator.POW, - ast.Eq: ASTOperator.EQUAL, - ast.NotEq: ASTOperator.NOT_EQUAL, - ast.Lt: ASTOperator.LESS, - ast.Gt: ASTOperator.GREATER, - ast.LtE: ASTOperator.LESS_EQUAL, - ast.GtE: ASTOperator.GREATER_EQUAL, - ast.BitXor: ASTOperator.BITWISE_XOR, - # TODO: The mapping of logical/bitwise operators here is inconsistent with - # pandas. In pandas, Both `BitAnd` and `And` map to - # `ASTOperator.LOGICAL_AND` for booleans, while they map to - # `ASTOperator.BITWISE_AND` for integers. However, there is no good way to - # encode this at present because expressions can be arbitrarily nested so - # we won't know the dtype of the input without inserting a much more - # complex traversal of the expression tree to determine the output types at - # each node. For now, we'll rely on users to use the appropriate operator. - ast.BitAnd: ASTOperator.BITWISE_AND, - ast.BitOr: ASTOperator.BITWISE_OR, - ast.And: ASTOperator.LOGICAL_AND, - ast.Or: ASTOperator.LOGICAL_OR, - # Unary operators - ast.Invert: ASTOperator.BIT_INVERT, - ast.Not: ASTOperator.NOT, - # TODO: Missing USub, possibility other unary ops? -} - - -# Mapping between Python function names encode in an ast.Call node and the -# corresponding libcudf C++ AST operators. -python_cudf_function_map = { - # TODO: Operators listed on - # https://pandas.pydata.org/pandas-docs/stable/user_guide/enhancingperf.html#expression-evaluation-via-eval # noqa: E501 - # that we don't support yet: - # expm1, log1p, arctan2 and log10. - "isnull": ASTOperator.IS_NULL, - "isna": ASTOperator.IS_NULL, - "sin": ASTOperator.SIN, - "cos": ASTOperator.COS, - "tan": ASTOperator.TAN, - "arcsin": ASTOperator.ARCSIN, - "arccos": ASTOperator.ARCCOS, - "arctan": ASTOperator.ARCTAN, - "sinh": ASTOperator.SINH, - "cosh": ASTOperator.COSH, - "tanh": ASTOperator.TANH, - "arcsinh": ASTOperator.ARCSINH, - "arccosh": ASTOperator.ARCCOSH, - "arctanh": ASTOperator.ARCTANH, - "exp": ASTOperator.EXP, - "log": ASTOperator.LOG, - "sqrt": ASTOperator.SQRT, - "abs": ASTOperator.ABS, - "ceil": ASTOperator.CEIL, - "floor": ASTOperator.FLOOR, - # TODO: Operators supported by libcudf with no Python function analog. - # ast.rint: ASTOperator.RINT, - # ast.cbrt: ASTOperator.CBRT, -} - - -class libcudfASTVisitor(ast.NodeVisitor): - """A NodeVisitor specialized for constructing a libcudf expression tree. - - This visitor is designed to handle AST nodes that have libcudf equivalents. - It constructs column references from names and literals from constants, - then builds up operations. The final result can be accessed using the - `expression` property. The visitor must be kept in scope for as long as the - expression is needed because all of the underlying libcudf expressions will - be destroyed when the libcudfASTVisitor is. - - Parameters - ---------- - col_names : Tuple[str] - The column names used to map the names in an expression. - """ - - def __init__(self, col_names: tuple[str]): - self.stack: list[Expression] = [] - self.nodes: list[Expression] = [] - self.col_names = col_names - - @property - def expression(self): - """Expression: The result of parsing an AST.""" - assert len(self.stack) == 1 - return self.stack[-1] - - def visit_Name(self, node): - try: - col_id = self.col_names.index(node.id) - except ValueError: - raise ValueError(f"Unknown column name {node.id}") - self.stack.append(ColumnReference(col_id)) - - def visit_Constant(self, node): - if not isinstance(node.value, (float, int, str, complex)): - raise ValueError( - f"Unsupported literal {repr(node.value)} of type " - "{type(node.value).__name__}" - ) - self.stack.append( - Literal(plc.interop.from_arrow(pa.scalar(node.value))) - ) - - def visit_UnaryOp(self, node): - self.visit(node.operand) - self.nodes.append(self.stack.pop()) - if isinstance(node.op, ast.USub): - # TODO: Except for leaf nodes, we won't know the type of the - # operand, so there's no way to know whether this should be a float - # or an int. We should maybe see what Spark does, and this will - # probably require casting. - self.nodes.append(Literal(plc.interop.from_arrow(pa.scalar(-1)))) - op = ASTOperator.MUL - self.stack.append(Operation(op, self.nodes[-1], self.nodes[-2])) - elif isinstance(node.op, ast.UAdd): - self.stack.append(self.nodes[-1]) - else: - op = python_cudf_operator_map[type(node.op)] - self.stack.append(Operation(op, self.nodes[-1])) - - def visit_BinOp(self, node): - self.visit(node.left) - self.visit(node.right) - self.nodes.append(self.stack.pop()) - self.nodes.append(self.stack.pop()) - - op = python_cudf_operator_map[type(node.op)] - self.stack.append(Operation(op, self.nodes[-1], self.nodes[-2])) - - def _visit_BoolOp_Compare(self, operators, operands, has_multiple_ops): - # Helper function handling the common components of parsing BoolOp and - # Compare AST nodes. These two types of nodes both support chaining - # (e.g. `a > b > c` is equivalent to `a > b and b > c`, so this - # function helps standardize that. - - # TODO: Whether And/Or and BitAnd/BitOr actually correspond to - # logical or bitwise operators depends on the data types that they - # are applied to. We'll need to add logic to map to that. - inner_ops = [] - for op, (left, right) in zip(operators, operands): - # Note that this will lead to duplicate nodes, e.g. if - # the comparison is `a < b < c` that will be encoded as - # `a < b and b < c`. We could potentially optimize by caching - # expressions by name so that we only construct them once. - self.visit(left) - self.visit(right) - - self.nodes.append(self.stack.pop()) - self.nodes.append(self.stack.pop()) - - op = python_cudf_operator_map[type(op)] - inner_ops.append(Operation(op, self.nodes[-1], self.nodes[-2])) - - self.nodes.extend(inner_ops) - - # If we have more than one comparator, we need to link them - # together with LOGICAL_AND operators. - if has_multiple_ops: - op = ASTOperator.LOGICAL_AND - - def _combine_compare_ops(left, right): - self.nodes.append(Operation(op, left, right)) - return self.nodes[-1] - - functools.reduce(_combine_compare_ops, inner_ops) - - self.stack.append(self.nodes[-1]) - - def visit_BoolOp(self, node): - operators = [node.op] * (len(node.values) - 1) - operands = zip(node.values[:-1], node.values[1:]) - self._visit_BoolOp_Compare(operators, operands, len(node.values) > 2) - - def visit_Compare(self, node): - operands = (node.left, *node.comparators) - has_multiple_ops = len(operands) > 2 - operands = zip(operands[:-1], operands[1:]) - self._visit_BoolOp_Compare(node.ops, operands, has_multiple_ops) - - def visit_Call(self, node): - try: - op = python_cudf_function_map[node.func.id] - except KeyError: - raise ValueError(f"Unsupported function {node.func}.") - # Assuming only unary functions are supported, which is checked above. - if len(node.args) != 1 or node.keywords: - raise ValueError( - f"Function {node.func} only accepts one positional " - "argument." - ) - self.visit(node.args[0]) - - self.nodes.append(self.stack.pop()) - self.stack.append(Operation(op, self.nodes[-1])) - - -@functools.lru_cache(256) -def parse_expression(expr: str, col_names: tuple[str]): - visitor = libcudfASTVisitor(col_names) - visitor.visit(ast.parse(expr)) - return visitor diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index 4cbd7244751..166b7d98592 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -151,9 +151,14 @@ def read_gdf( "parquet": cudf.io.read_parquet, } - result = cudf_readers[message_format]( - kafka_datasource, engine="cudf", lines=True - ) + if message_format == "json": + result = cudf_readers[message_format]( + kafka_datasource, engine="cudf", lines=True + ) + else: + result = cudf_readers[message_format]( + kafka_datasource, engine="cudf" + ) # Close up the cudf datasource instance # TODO: Ideally the C++ destructor should handle the diff --git a/python/libcudf/libcudf/load.py b/python/libcudf/libcudf/load.py index a91fbb7aecf..c3ff5534e87 100644 --- a/python/libcudf/libcudf/load.py +++ b/python/libcudf/libcudf/load.py @@ -76,9 +76,15 @@ def load_library(): # Prefer the libraries bundled in this package. If they aren't found # (which might be the case in builds where the library was prebuilt before # packaging the wheel), look for a system installation. - libcudf_lib = _load_wheel_installation(soname) - if libcudf_lib is None: - libcudf_lib = _load_system_installation(soname) + try: + libcudf_lib = _load_wheel_installation(soname) + if libcudf_lib is None: + libcudf_lib = _load_system_installation(soname) + except OSError: + # If none of the searches above succeed, just silently return None + # and rely on other mechanisms (like RPATHs on other DSOs) to + # help the loader find the library. + pass # The caller almost never needs to do anything with this library, but no # harm in offering the option since this object at least provides a handle diff --git a/python/pylibcudf/pylibcudf/expressions.pyi b/python/pylibcudf/pylibcudf/expressions.pyi index 12b473d8605..4dcccaaa1fc 100644 --- a/python/pylibcudf/pylibcudf/expressions.pyi +++ b/python/pylibcudf/pylibcudf/expressions.pyi @@ -77,3 +77,5 @@ class Operation(Expression): left: Expression, right: Expression | None = None, ): ... + +def to_expression(expr: str, column_names: tuple[str, ...]) -> Expression: ... diff --git a/python/pylibcudf/pylibcudf/expressions.pyx b/python/pylibcudf/pylibcudf/expressions.pyx index 0f12cfe313c..31121785e27 100644 --- a/python/pylibcudf/pylibcudf/expressions.pyx +++ b/python/pylibcudf/pylibcudf/expressions.pyx @@ -1,4 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import ast +import functools + +import pyarrow as pa + from pylibcudf.libcudf.expressions import \ ast_operator as ASTOperator # no-cython-lint from pylibcudf.libcudf.expressions import \ @@ -46,6 +51,8 @@ from .scalar cimport Scalar from .traits cimport is_chrono, is_numeric from .types cimport DataType +from .interop import from_arrow + # Aliases for simplicity ctypedef unique_ptr[libcudf_exp.expression] expression_ptr @@ -57,6 +64,7 @@ __all__ = [ "Literal", "Operation", "TableReference", + "to_expression" ] # Define this class just to have a docstring for it @@ -261,3 +269,217 @@ cdef class ColumnNameReference(Expression): move(make_unique[libcudf_exp.column_name_reference]( (name.encode("utf-8")) )) + + +# This dictionary encodes the mapping from Python AST operators to their cudf +# counterparts. +_python_cudf_operator_map = { + # Binary operators + ast.Add: ASTOperator.ADD, + ast.Sub: ASTOperator.SUB, + ast.Mult: ASTOperator.MUL, + ast.Div: ASTOperator.DIV, + ast.FloorDiv: ASTOperator.FLOOR_DIV, + ast.Mod: ASTOperator.PYMOD, + ast.Pow: ASTOperator.POW, + ast.Eq: ASTOperator.EQUAL, + ast.NotEq: ASTOperator.NOT_EQUAL, + ast.Lt: ASTOperator.LESS, + ast.Gt: ASTOperator.GREATER, + ast.LtE: ASTOperator.LESS_EQUAL, + ast.GtE: ASTOperator.GREATER_EQUAL, + ast.BitXor: ASTOperator.BITWISE_XOR, + # TODO: The mapping of logical/bitwise operators here is inconsistent with + # pandas. In pandas, Both `BitAnd` and `And` map to + # `ASTOperator.LOGICAL_AND` for booleans, while they map to + # `ASTOperator.BITWISE_AND` for integers. However, there is no good way to + # encode this at present because expressions can be arbitrarily nested so + # we won't know the dtype of the input without inserting a much more + # complex traversal of the expression tree to determine the output types at + # each node. For now, we'll rely on users to use the appropriate operator. + ast.BitAnd: ASTOperator.BITWISE_AND, + ast.BitOr: ASTOperator.BITWISE_OR, + ast.And: ASTOperator.LOGICAL_AND, + ast.Or: ASTOperator.LOGICAL_OR, + # Unary operators + ast.Invert: ASTOperator.BIT_INVERT, + ast.Not: ASTOperator.NOT, + # TODO: Missing USub, possibility other unary ops? +} + + +# Mapping between Python function names encode in an ast.Call node and the +# corresponding libcudf C++ AST operators. +_python_cudf_function_map = { + # TODO: Operators listed on + # https://pandas.pydata.org/pandas-docs/stable/user_guide/enhancingperf.html#expression-evaluation-via-eval # noqa: E501 + # that we don't support yet: + # expm1, log1p, arctan2 and log10. + "isnull": ASTOperator.IS_NULL, + "isna": ASTOperator.IS_NULL, + "sin": ASTOperator.SIN, + "cos": ASTOperator.COS, + "tan": ASTOperator.TAN, + "arcsin": ASTOperator.ARCSIN, + "arccos": ASTOperator.ARCCOS, + "arctan": ASTOperator.ARCTAN, + "sinh": ASTOperator.SINH, + "cosh": ASTOperator.COSH, + "tanh": ASTOperator.TANH, + "arcsinh": ASTOperator.ARCSINH, + "arccosh": ASTOperator.ARCCOSH, + "arctanh": ASTOperator.ARCTANH, + "exp": ASTOperator.EXP, + "log": ASTOperator.LOG, + "sqrt": ASTOperator.SQRT, + "abs": ASTOperator.ABS, + "ceil": ASTOperator.CEIL, + "floor": ASTOperator.FLOOR, + # TODO: Operators supported by libcudf with no Python function analog. + # ast.rint: ASTOperator.RINT, + # ast.cbrt: ASTOperator.CBRT, +} + + +class ExpressionTransformer(ast.NodeVisitor): + """A NodeVisitor specialized for constructing a libcudf expression tree. + + This visitor is designed to handle AST nodes that have libcudf equivalents. + It constructs column references from names and literals from constants, + then builds up operations. The resulting expression is returned by the + `visit` method + + Parameters + ---------- + column_mapping : dict[str, ColumnNameReference | ColumnReference] + Mapping from names to column references or column name references. + The former can be used for `compute_column` the latter in IO filters. + """ + + def __init__(self, dict column_mapping): + self.column_mapping = column_mapping + + def generic_visit(self, node): + raise ValueError( + f"Not expecting expression to have node of type {node.__class__.__name__}" + ) + + def visit_Module(self, node): + try: + expr, = node.body + except ValueError: + raise ValueError( + f"Expecting exactly one expression, not {len(node.body)}" + ) + return self.visit(expr) + + def visit_Expr(self, node): + return self.visit(node.value) + + def visit_Name(self, node): + try: + return self.column_mapping[node.id] + except KeyError: + raise ValueError(f"Unknown column name {node.id}") + + def visit_Constant(self, node): + if not isinstance(node.value, (float, int, str, complex)): + raise ValueError( + f"Unsupported literal {repr(node.value)} of type " + "{type(node.value).__name__}" + ) + return Literal(from_arrow(pa.scalar(node.value))) + + def visit_UnaryOp(self, node): + operand = self.visit(node.operand) + if isinstance(node.op, ast.USub): + # TODO: Except for leaf nodes, we won't know the type of the + # operand, so there's no way to know whether this should be a float + # or an int. We should maybe see what Spark does, and this will + # probably require casting. + minus_one = Literal(from_arrow(pa.scalar(-1))) + return Operation(ASTOperator.MUL, minus_one, operand) + elif isinstance(node.op, ast.UAdd): + return operand + else: + op = _python_cudf_operator_map[type(node.op)] + return Operation(op, operand) + + def visit_BinOp(self, node): + left = self.visit(node.left) + right = self.visit(node.right) + op = _python_cudf_operator_map[type(node.op)] + return Operation(op, left, right) + + def visit_BoolOp(self, node): + return functools.reduce( + functools.partial(Operation, ASTOperator.LOGICAL_AND), + ( + Operation( + _python_cudf_operator_map[type(node.op)], + self.visit(left), + self.visit(right), + ) + for left, right in zip( + node.values[:-1], node.values[1:], strict=True + ) + ) + ) + + def visit_Compare(self, node): + operands = [node.left, *node.comparators] + return functools.reduce( + functools.partial(Operation, ASTOperator.LOGICAL_AND), + ( + Operation( + _python_cudf_operator_map[type(op)], + self.visit(left), + self.visit(right), + ) + for op, left, right in zip( + node.ops, operands[:-1], operands[1:], strict=True + ) + ) + ) + + def visit_Call(self, node): + try: + op = _python_cudf_function_map[node.func.id] + except KeyError: + raise ValueError(f"Unsupported function {node.func}.") + # Assuming only unary functions are supported, which is checked above. + if len(node.args) != 1 or node.keywords: + raise ValueError( + f"Function {node.func} only accepts one positional " + "argument." + ) + return Operation(op, self.visit(node.args[0])) + + +@functools.lru_cache(256) +def to_expression(str expr, tuple column_names): + """ + Create an expression for `pylibcudf.transform.compute_column`. + + Parameters + ---------- + expr : str + The expression to evaluate. In (restricted) Python syntax. + column_names : tuple[str] + Ordered tuple of names. When calling `compute_column` on the resulting + expression, the provided table must have columns in the same order + as given here. + + Notes + ----- + This function keeps a small cache of recently used expressions. + + Returns + ------- + Expression + Expression for the given expr and col_names + """ + visitor = ExpressionTransformer( + {name: ColumnReference(i) for i, name in enumerate(column_names)} + ) + return visitor.visit(ast.parse(expr))