diff --git a/python/cudf/cudf/_lib/transform.pyx b/python/cudf/cudf/_lib/transform.pyx index 1589e23f716..a163bb07888 100644 --- a/python/cudf/cudf/_lib/transform.pyx +++ b/python/cudf/cudf/_lib/transform.pyx @@ -3,12 +3,10 @@ from numba.np import numpy_support import cudf -from cudf.core._internals.expressions import parse_expression from cudf.core.buffer import acquire_spill_lock, as_buffer from cudf.utils import cudautils from pylibcudf cimport transform as plc_transform -from pylibcudf.expressions cimport Expression from pylibcudf.libcudf.types cimport size_type from cudf._lib.column cimport Column @@ -93,7 +91,7 @@ def one_hot_encode(Column input_column, Column categories): @acquire_spill_lock() -def compute_column(list columns, tuple column_names, expr: str): +def compute_column(list columns, tuple column_names, str expr): """Compute a new column by evaluating an expression on a set of columns. Parameters @@ -108,12 +106,8 @@ def compute_column(list columns, tuple column_names, expr: str): expr : str The expression to evaluate. """ - visitor = parse_expression(expr, column_names) - - # At the end, all the stack contains is the expression to evaluate. - cdef Expression cudf_expr = visitor.expression result = plc_transform.compute_column( plc.Table([col.to_pylibcudf(mode="read") for col in columns]), - cudf_expr, + plc.expressions.to_expression(expr, column_names), ) return Column.from_pylibcudf(result) diff --git a/python/cudf/cudf/core/_internals/expressions.py b/python/cudf/cudf/core/_internals/expressions.py deleted file mode 100644 index 90d9118027a..00000000000 --- a/python/cudf/cudf/core/_internals/expressions.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -from __future__ import annotations - -import ast -import functools - -import pyarrow as pa - -import pylibcudf as plc -from pylibcudf.expressions import ( - ASTOperator, - ColumnReference, - Expression, - Literal, - Operation, -) - -# This dictionary encodes the mapping from Python AST operators to their cudf -# counterparts. -python_cudf_operator_map = { - # Binary operators - ast.Add: ASTOperator.ADD, - ast.Sub: ASTOperator.SUB, - ast.Mult: ASTOperator.MUL, - ast.Div: ASTOperator.DIV, - ast.FloorDiv: ASTOperator.FLOOR_DIV, - ast.Mod: ASTOperator.PYMOD, - ast.Pow: ASTOperator.POW, - ast.Eq: ASTOperator.EQUAL, - ast.NotEq: ASTOperator.NOT_EQUAL, - ast.Lt: ASTOperator.LESS, - ast.Gt: ASTOperator.GREATER, - ast.LtE: ASTOperator.LESS_EQUAL, - ast.GtE: ASTOperator.GREATER_EQUAL, - ast.BitXor: ASTOperator.BITWISE_XOR, - # TODO: The mapping of logical/bitwise operators here is inconsistent with - # pandas. In pandas, Both `BitAnd` and `And` map to - # `ASTOperator.LOGICAL_AND` for booleans, while they map to - # `ASTOperator.BITWISE_AND` for integers. However, there is no good way to - # encode this at present because expressions can be arbitrarily nested so - # we won't know the dtype of the input without inserting a much more - # complex traversal of the expression tree to determine the output types at - # each node. For now, we'll rely on users to use the appropriate operator. - ast.BitAnd: ASTOperator.BITWISE_AND, - ast.BitOr: ASTOperator.BITWISE_OR, - ast.And: ASTOperator.LOGICAL_AND, - ast.Or: ASTOperator.LOGICAL_OR, - # Unary operators - ast.Invert: ASTOperator.BIT_INVERT, - ast.Not: ASTOperator.NOT, - # TODO: Missing USub, possibility other unary ops? -} - - -# Mapping between Python function names encode in an ast.Call node and the -# corresponding libcudf C++ AST operators. -python_cudf_function_map = { - # TODO: Operators listed on - # https://pandas.pydata.org/pandas-docs/stable/user_guide/enhancingperf.html#expression-evaluation-via-eval # noqa: E501 - # that we don't support yet: - # expm1, log1p, arctan2 and log10. - "isnull": ASTOperator.IS_NULL, - "isna": ASTOperator.IS_NULL, - "sin": ASTOperator.SIN, - "cos": ASTOperator.COS, - "tan": ASTOperator.TAN, - "arcsin": ASTOperator.ARCSIN, - "arccos": ASTOperator.ARCCOS, - "arctan": ASTOperator.ARCTAN, - "sinh": ASTOperator.SINH, - "cosh": ASTOperator.COSH, - "tanh": ASTOperator.TANH, - "arcsinh": ASTOperator.ARCSINH, - "arccosh": ASTOperator.ARCCOSH, - "arctanh": ASTOperator.ARCTANH, - "exp": ASTOperator.EXP, - "log": ASTOperator.LOG, - "sqrt": ASTOperator.SQRT, - "abs": ASTOperator.ABS, - "ceil": ASTOperator.CEIL, - "floor": ASTOperator.FLOOR, - # TODO: Operators supported by libcudf with no Python function analog. - # ast.rint: ASTOperator.RINT, - # ast.cbrt: ASTOperator.CBRT, -} - - -class libcudfASTVisitor(ast.NodeVisitor): - """A NodeVisitor specialized for constructing a libcudf expression tree. - - This visitor is designed to handle AST nodes that have libcudf equivalents. - It constructs column references from names and literals from constants, - then builds up operations. The final result can be accessed using the - `expression` property. The visitor must be kept in scope for as long as the - expression is needed because all of the underlying libcudf expressions will - be destroyed when the libcudfASTVisitor is. - - Parameters - ---------- - col_names : Tuple[str] - The column names used to map the names in an expression. - """ - - def __init__(self, col_names: tuple[str]): - self.stack: list[Expression] = [] - self.nodes: list[Expression] = [] - self.col_names = col_names - - @property - def expression(self): - """Expression: The result of parsing an AST.""" - assert len(self.stack) == 1 - return self.stack[-1] - - def visit_Name(self, node): - try: - col_id = self.col_names.index(node.id) - except ValueError: - raise ValueError(f"Unknown column name {node.id}") - self.stack.append(ColumnReference(col_id)) - - def visit_Constant(self, node): - if not isinstance(node.value, (float, int, str, complex)): - raise ValueError( - f"Unsupported literal {repr(node.value)} of type " - "{type(node.value).__name__}" - ) - self.stack.append( - Literal(plc.interop.from_arrow(pa.scalar(node.value))) - ) - - def visit_UnaryOp(self, node): - self.visit(node.operand) - self.nodes.append(self.stack.pop()) - if isinstance(node.op, ast.USub): - # TODO: Except for leaf nodes, we won't know the type of the - # operand, so there's no way to know whether this should be a float - # or an int. We should maybe see what Spark does, and this will - # probably require casting. - self.nodes.append(Literal(plc.interop.from_arrow(pa.scalar(-1)))) - op = ASTOperator.MUL - self.stack.append(Operation(op, self.nodes[-1], self.nodes[-2])) - elif isinstance(node.op, ast.UAdd): - self.stack.append(self.nodes[-1]) - else: - op = python_cudf_operator_map[type(node.op)] - self.stack.append(Operation(op, self.nodes[-1])) - - def visit_BinOp(self, node): - self.visit(node.left) - self.visit(node.right) - self.nodes.append(self.stack.pop()) - self.nodes.append(self.stack.pop()) - - op = python_cudf_operator_map[type(node.op)] - self.stack.append(Operation(op, self.nodes[-1], self.nodes[-2])) - - def _visit_BoolOp_Compare(self, operators, operands, has_multiple_ops): - # Helper function handling the common components of parsing BoolOp and - # Compare AST nodes. These two types of nodes both support chaining - # (e.g. `a > b > c` is equivalent to `a > b and b > c`, so this - # function helps standardize that. - - # TODO: Whether And/Or and BitAnd/BitOr actually correspond to - # logical or bitwise operators depends on the data types that they - # are applied to. We'll need to add logic to map to that. - inner_ops = [] - for op, (left, right) in zip(operators, operands): - # Note that this will lead to duplicate nodes, e.g. if - # the comparison is `a < b < c` that will be encoded as - # `a < b and b < c`. We could potentially optimize by caching - # expressions by name so that we only construct them once. - self.visit(left) - self.visit(right) - - self.nodes.append(self.stack.pop()) - self.nodes.append(self.stack.pop()) - - op = python_cudf_operator_map[type(op)] - inner_ops.append(Operation(op, self.nodes[-1], self.nodes[-2])) - - self.nodes.extend(inner_ops) - - # If we have more than one comparator, we need to link them - # together with LOGICAL_AND operators. - if has_multiple_ops: - op = ASTOperator.LOGICAL_AND - - def _combine_compare_ops(left, right): - self.nodes.append(Operation(op, left, right)) - return self.nodes[-1] - - functools.reduce(_combine_compare_ops, inner_ops) - - self.stack.append(self.nodes[-1]) - - def visit_BoolOp(self, node): - operators = [node.op] * (len(node.values) - 1) - operands = zip(node.values[:-1], node.values[1:]) - self._visit_BoolOp_Compare(operators, operands, len(node.values) > 2) - - def visit_Compare(self, node): - operands = (node.left, *node.comparators) - has_multiple_ops = len(operands) > 2 - operands = zip(operands[:-1], operands[1:]) - self._visit_BoolOp_Compare(node.ops, operands, has_multiple_ops) - - def visit_Call(self, node): - try: - op = python_cudf_function_map[node.func.id] - except KeyError: - raise ValueError(f"Unsupported function {node.func}.") - # Assuming only unary functions are supported, which is checked above. - if len(node.args) != 1 or node.keywords: - raise ValueError( - f"Function {node.func} only accepts one positional " - "argument." - ) - self.visit(node.args[0]) - - self.nodes.append(self.stack.pop()) - self.stack.append(Operation(op, self.nodes[-1])) - - -@functools.lru_cache(256) -def parse_expression(expr: str, col_names: tuple[str]): - visitor = libcudfASTVisitor(col_names) - visitor.visit(ast.parse(expr)) - return visitor diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index 4cbd7244751..166b7d98592 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -151,9 +151,14 @@ def read_gdf( "parquet": cudf.io.read_parquet, } - result = cudf_readers[message_format]( - kafka_datasource, engine="cudf", lines=True - ) + if message_format == "json": + result = cudf_readers[message_format]( + kafka_datasource, engine="cudf", lines=True + ) + else: + result = cudf_readers[message_format]( + kafka_datasource, engine="cudf" + ) # Close up the cudf datasource instance # TODO: Ideally the C++ destructor should handle the diff --git a/python/pylibcudf/pylibcudf/expressions.pyi b/python/pylibcudf/pylibcudf/expressions.pyi index 12b473d8605..4dcccaaa1fc 100644 --- a/python/pylibcudf/pylibcudf/expressions.pyi +++ b/python/pylibcudf/pylibcudf/expressions.pyi @@ -77,3 +77,5 @@ class Operation(Expression): left: Expression, right: Expression | None = None, ): ... + +def to_expression(expr: str, column_names: tuple[str, ...]) -> Expression: ... diff --git a/python/pylibcudf/pylibcudf/expressions.pyx b/python/pylibcudf/pylibcudf/expressions.pyx index 0f12cfe313c..31121785e27 100644 --- a/python/pylibcudf/pylibcudf/expressions.pyx +++ b/python/pylibcudf/pylibcudf/expressions.pyx @@ -1,4 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import ast +import functools + +import pyarrow as pa + from pylibcudf.libcudf.expressions import \ ast_operator as ASTOperator # no-cython-lint from pylibcudf.libcudf.expressions import \ @@ -46,6 +51,8 @@ from .scalar cimport Scalar from .traits cimport is_chrono, is_numeric from .types cimport DataType +from .interop import from_arrow + # Aliases for simplicity ctypedef unique_ptr[libcudf_exp.expression] expression_ptr @@ -57,6 +64,7 @@ __all__ = [ "Literal", "Operation", "TableReference", + "to_expression" ] # Define this class just to have a docstring for it @@ -261,3 +269,217 @@ cdef class ColumnNameReference(Expression): move(make_unique[libcudf_exp.column_name_reference]( (name.encode("utf-8")) )) + + +# This dictionary encodes the mapping from Python AST operators to their cudf +# counterparts. +_python_cudf_operator_map = { + # Binary operators + ast.Add: ASTOperator.ADD, + ast.Sub: ASTOperator.SUB, + ast.Mult: ASTOperator.MUL, + ast.Div: ASTOperator.DIV, + ast.FloorDiv: ASTOperator.FLOOR_DIV, + ast.Mod: ASTOperator.PYMOD, + ast.Pow: ASTOperator.POW, + ast.Eq: ASTOperator.EQUAL, + ast.NotEq: ASTOperator.NOT_EQUAL, + ast.Lt: ASTOperator.LESS, + ast.Gt: ASTOperator.GREATER, + ast.LtE: ASTOperator.LESS_EQUAL, + ast.GtE: ASTOperator.GREATER_EQUAL, + ast.BitXor: ASTOperator.BITWISE_XOR, + # TODO: The mapping of logical/bitwise operators here is inconsistent with + # pandas. In pandas, Both `BitAnd` and `And` map to + # `ASTOperator.LOGICAL_AND` for booleans, while they map to + # `ASTOperator.BITWISE_AND` for integers. However, there is no good way to + # encode this at present because expressions can be arbitrarily nested so + # we won't know the dtype of the input without inserting a much more + # complex traversal of the expression tree to determine the output types at + # each node. For now, we'll rely on users to use the appropriate operator. + ast.BitAnd: ASTOperator.BITWISE_AND, + ast.BitOr: ASTOperator.BITWISE_OR, + ast.And: ASTOperator.LOGICAL_AND, + ast.Or: ASTOperator.LOGICAL_OR, + # Unary operators + ast.Invert: ASTOperator.BIT_INVERT, + ast.Not: ASTOperator.NOT, + # TODO: Missing USub, possibility other unary ops? +} + + +# Mapping between Python function names encode in an ast.Call node and the +# corresponding libcudf C++ AST operators. +_python_cudf_function_map = { + # TODO: Operators listed on + # https://pandas.pydata.org/pandas-docs/stable/user_guide/enhancingperf.html#expression-evaluation-via-eval # noqa: E501 + # that we don't support yet: + # expm1, log1p, arctan2 and log10. + "isnull": ASTOperator.IS_NULL, + "isna": ASTOperator.IS_NULL, + "sin": ASTOperator.SIN, + "cos": ASTOperator.COS, + "tan": ASTOperator.TAN, + "arcsin": ASTOperator.ARCSIN, + "arccos": ASTOperator.ARCCOS, + "arctan": ASTOperator.ARCTAN, + "sinh": ASTOperator.SINH, + "cosh": ASTOperator.COSH, + "tanh": ASTOperator.TANH, + "arcsinh": ASTOperator.ARCSINH, + "arccosh": ASTOperator.ARCCOSH, + "arctanh": ASTOperator.ARCTANH, + "exp": ASTOperator.EXP, + "log": ASTOperator.LOG, + "sqrt": ASTOperator.SQRT, + "abs": ASTOperator.ABS, + "ceil": ASTOperator.CEIL, + "floor": ASTOperator.FLOOR, + # TODO: Operators supported by libcudf with no Python function analog. + # ast.rint: ASTOperator.RINT, + # ast.cbrt: ASTOperator.CBRT, +} + + +class ExpressionTransformer(ast.NodeVisitor): + """A NodeVisitor specialized for constructing a libcudf expression tree. + + This visitor is designed to handle AST nodes that have libcudf equivalents. + It constructs column references from names and literals from constants, + then builds up operations. The resulting expression is returned by the + `visit` method + + Parameters + ---------- + column_mapping : dict[str, ColumnNameReference | ColumnReference] + Mapping from names to column references or column name references. + The former can be used for `compute_column` the latter in IO filters. + """ + + def __init__(self, dict column_mapping): + self.column_mapping = column_mapping + + def generic_visit(self, node): + raise ValueError( + f"Not expecting expression to have node of type {node.__class__.__name__}" + ) + + def visit_Module(self, node): + try: + expr, = node.body + except ValueError: + raise ValueError( + f"Expecting exactly one expression, not {len(node.body)}" + ) + return self.visit(expr) + + def visit_Expr(self, node): + return self.visit(node.value) + + def visit_Name(self, node): + try: + return self.column_mapping[node.id] + except KeyError: + raise ValueError(f"Unknown column name {node.id}") + + def visit_Constant(self, node): + if not isinstance(node.value, (float, int, str, complex)): + raise ValueError( + f"Unsupported literal {repr(node.value)} of type " + "{type(node.value).__name__}" + ) + return Literal(from_arrow(pa.scalar(node.value))) + + def visit_UnaryOp(self, node): + operand = self.visit(node.operand) + if isinstance(node.op, ast.USub): + # TODO: Except for leaf nodes, we won't know the type of the + # operand, so there's no way to know whether this should be a float + # or an int. We should maybe see what Spark does, and this will + # probably require casting. + minus_one = Literal(from_arrow(pa.scalar(-1))) + return Operation(ASTOperator.MUL, minus_one, operand) + elif isinstance(node.op, ast.UAdd): + return operand + else: + op = _python_cudf_operator_map[type(node.op)] + return Operation(op, operand) + + def visit_BinOp(self, node): + left = self.visit(node.left) + right = self.visit(node.right) + op = _python_cudf_operator_map[type(node.op)] + return Operation(op, left, right) + + def visit_BoolOp(self, node): + return functools.reduce( + functools.partial(Operation, ASTOperator.LOGICAL_AND), + ( + Operation( + _python_cudf_operator_map[type(node.op)], + self.visit(left), + self.visit(right), + ) + for left, right in zip( + node.values[:-1], node.values[1:], strict=True + ) + ) + ) + + def visit_Compare(self, node): + operands = [node.left, *node.comparators] + return functools.reduce( + functools.partial(Operation, ASTOperator.LOGICAL_AND), + ( + Operation( + _python_cudf_operator_map[type(op)], + self.visit(left), + self.visit(right), + ) + for op, left, right in zip( + node.ops, operands[:-1], operands[1:], strict=True + ) + ) + ) + + def visit_Call(self, node): + try: + op = _python_cudf_function_map[node.func.id] + except KeyError: + raise ValueError(f"Unsupported function {node.func}.") + # Assuming only unary functions are supported, which is checked above. + if len(node.args) != 1 or node.keywords: + raise ValueError( + f"Function {node.func} only accepts one positional " + "argument." + ) + return Operation(op, self.visit(node.args[0])) + + +@functools.lru_cache(256) +def to_expression(str expr, tuple column_names): + """ + Create an expression for `pylibcudf.transform.compute_column`. + + Parameters + ---------- + expr : str + The expression to evaluate. In (restricted) Python syntax. + column_names : tuple[str] + Ordered tuple of names. When calling `compute_column` on the resulting + expression, the provided table must have columns in the same order + as given here. + + Notes + ----- + This function keeps a small cache of recently used expressions. + + Returns + ------- + Expression + Expression for the given expr and col_names + """ + visitor = ExpressionTransformer( + {name: ColumnReference(i) for i, name in enumerate(column_names)} + ) + return visitor.visit(ast.parse(expr))