From 364a6256c0c57abae825a0c070fa4c19d6430d82 Mon Sep 17 00:00:00 2001 From: Francesco Bruzzesi <42817048+FBruzzesi@users.noreply.github.com> Date: Sun, 26 Jan 2025 18:00:39 +0100 Subject: [PATCH] feat: pyspark and duckdb selectors (#1853) --------- Co-authored-by: Marco Gorelli <33491632+MarcoGorelli@users.noreply.github.com> --- narwhals/_arrow/selectors.py | 2 + narwhals/_dask/selectors.py | 2 + narwhals/_duckdb/namespace.py | 7 + narwhals/_duckdb/selectors.py | 212 ++++++++++++++++++++++++++++ narwhals/_pandas_like/selectors.py | 2 + narwhals/_spark_like/namespace.py | 7 + narwhals/_spark_like/selectors.py | 216 +++++++++++++++++++++++++++++ pyproject.toml | 2 + tests/selectors_test.py | 36 +++-- 9 files changed, 472 insertions(+), 14 deletions(-) create mode 100644 narwhals/_duckdb/selectors.py create mode 100644 narwhals/_spark_like/selectors.py diff --git a/narwhals/_arrow/selectors.py b/narwhals/_arrow/selectors.py index 3fee25472..c83f4a6ac 100644 --- a/narwhals/_arrow/selectors.py +++ b/narwhals/_arrow/selectors.py @@ -47,10 +47,12 @@ def numeric(self: Self) -> ArrowSelector: dtypes = import_dtypes_module(self._version) return self.by_dtype( [ + dtypes.Int128, dtypes.Int64, dtypes.Int32, dtypes.Int16, dtypes.Int8, + dtypes.UInt128, dtypes.UInt64, dtypes.UInt32, dtypes.UInt16, diff --git a/narwhals/_dask/selectors.py b/narwhals/_dask/selectors.py index 62083c676..96f3feb08 100644 --- a/narwhals/_dask/selectors.py +++ b/narwhals/_dask/selectors.py @@ -52,10 +52,12 @@ def numeric(self: Self) -> DaskSelector: dtypes = import_dtypes_module(self._version) return self.by_dtype( [ + dtypes.Int128, dtypes.Int64, dtypes.Int32, dtypes.Int16, dtypes.Int8, + dtypes.UInt128, dtypes.UInt64, dtypes.UInt32, dtypes.UInt16, diff --git a/narwhals/_duckdb/namespace.py b/narwhals/_duckdb/namespace.py index d3b70826e..51f09e5e2 100644 --- a/narwhals/_duckdb/namespace.py +++ b/narwhals/_duckdb/namespace.py @@ -17,6 +17,7 @@ from duckdb import FunctionExpression from narwhals._duckdb.expr import DuckDBExpr +from narwhals._duckdb.selectors import DuckDBSelectorNamespace from narwhals._duckdb.utils import narwhals_to_native_dtype from narwhals._expression_parsing import combine_alias_output_names from narwhals._expression_parsing import combine_evaluate_output_names @@ -38,6 +39,12 @@ def __init__( self._backend_version = backend_version self._version = version + @property + def selectors(self: Self) -> DuckDBSelectorNamespace: + return DuckDBSelectorNamespace( + backend_version=self._backend_version, version=self._version + ) + def all(self: Self) -> DuckDBExpr: def _all(df: DuckDBLazyFrame) -> list[duckdb.Expression]: return [ColumnExpression(col_name) for col_name in df.columns] diff --git a/narwhals/_duckdb/selectors.py b/narwhals/_duckdb/selectors.py new file mode 100644 index 000000000..d72e297fa --- /dev/null +++ b/narwhals/_duckdb/selectors.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any +from typing import Sequence + +from duckdb import ColumnExpression + +from narwhals._duckdb.expr import DuckDBExpr +from narwhals.utils import import_dtypes_module + +if TYPE_CHECKING: + import duckdb + from typing_extensions import Self + + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals.dtypes import DType + from narwhals.utils import Version + + +class DuckDBSelectorNamespace: + def __init__( + self: Self, *, backend_version: tuple[int, ...], version: Version + ) -> None: + self._backend_version = backend_version + self._version = version + + def by_dtype(self: Self, dtypes: list[DType | type[DType]]) -> DuckDBSelector: + def func(df: DuckDBLazyFrame) -> list[duckdb.Expression]: + return [ + ColumnExpression(col) for col in df.columns if df.schema[col] in dtypes + ] + + def evalute_output_names(df: DuckDBLazyFrame) -> Sequence[str]: + return [col for col in df.columns if df.schema[col] in dtypes] + + return DuckDBSelector( + func, + depth=0, + function_name="selector", + evaluate_output_names=evalute_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=False, + version=self._version, + kwargs={}, + ) + + def numeric(self: Self) -> DuckDBSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype( + [ + dtypes.Int128, + dtypes.Int64, + dtypes.Int32, + dtypes.Int16, + dtypes.Int8, + dtypes.UInt128, + dtypes.UInt64, + dtypes.UInt32, + dtypes.UInt16, + dtypes.UInt8, + dtypes.Float64, + dtypes.Float32, + ], + ) + + def categorical(self: Self) -> DuckDBSelector: # pragma: no cover + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.Categorical]) + + def string(self: Self) -> DuckDBSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.String]) + + def boolean(self: Self) -> DuckDBSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.Boolean]) + + def all(self: Self) -> DuckDBSelector: + def func(df: DuckDBLazyFrame) -> list[duckdb.Expression]: + return [ColumnExpression(col) for col in df.columns] + + return DuckDBSelector( + func, + depth=0, + function_name="selector", + evaluate_output_names=lambda df: df.columns, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=False, + version=self._version, + kwargs={}, + ) + + +class DuckDBSelector(DuckDBExpr): + def __repr__(self: Self) -> str: # pragma: no cover + return ( + f"DuckDBSelector(" + f"depth={self._depth}, " + f"function_name={self._function_name})" + ) + + def _to_expr(self: Self) -> DuckDBExpr: + return DuckDBExpr( + self._call, + depth=self._depth, + function_name=self._function_name, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + + def __sub__(self: Self, other: DuckDBSelector | Any) -> DuckDBSelector | Any: + if isinstance(other, DuckDBSelector): + + def call(df: DuckDBLazyFrame) -> list[duckdb.Expression]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + return [x for x, name in zip(lhs, lhs_names) if name not in rhs_names] + + def evaluate_output_names(df: DuckDBLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [x for x in lhs_names if x not in rhs_names] + + return DuckDBSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() - other + + def __or__(self: Self, other: DuckDBSelector | Any) -> DuckDBSelector | Any: + if isinstance(other, DuckDBSelector): + + def call(df: DuckDBLazyFrame) -> list[duckdb.Expression]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + rhs = other._call(df) + return [ + *(x for x, name in zip(lhs, lhs_names) if name not in rhs_names), + *rhs, + ] + + def evaluate_output_names(df: DuckDBLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [*(x for x in lhs_names if x not in rhs_names), *rhs_names] + + return DuckDBSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() | other + + def __and__(self: Self, other: DuckDBSelector | Any) -> DuckDBSelector | Any: + if isinstance(other, DuckDBSelector): + + def call(df: DuckDBLazyFrame) -> list[duckdb.Expression]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + return [x for x, name in zip(lhs, lhs_names) if name in rhs_names] + + def evaluate_output_names(df: DuckDBLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [x for x in lhs_names if x in rhs_names] + + return DuckDBSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() & other + + def __invert__(self: Self) -> DuckDBSelector: + return ( + DuckDBSelectorNamespace( + backend_version=self._backend_version, version=self._version + ).all() + - self + ) diff --git a/narwhals/_pandas_like/selectors.py b/narwhals/_pandas_like/selectors.py index 17abc541c..d7e18a2e8 100644 --- a/narwhals/_pandas_like/selectors.py +++ b/narwhals/_pandas_like/selectors.py @@ -52,10 +52,12 @@ def numeric(self: Self) -> PandasSelector: dtypes = import_dtypes_module(self._version) return self.by_dtype( [ + dtypes.Int128, dtypes.Int64, dtypes.Int32, dtypes.Int16, dtypes.Int8, + dtypes.UInt128, dtypes.UInt64, dtypes.UInt32, dtypes.UInt16, diff --git a/narwhals/_spark_like/namespace.py b/narwhals/_spark_like/namespace.py index c17500e62..48289aa70 100644 --- a/narwhals/_spark_like/namespace.py +++ b/narwhals/_spark_like/namespace.py @@ -16,6 +16,7 @@ from narwhals._expression_parsing import combine_evaluate_output_names from narwhals._spark_like.dataframe import SparkLikeLazyFrame from narwhals._spark_like.expr import SparkLikeExpr +from narwhals._spark_like.selectors import SparkLikeSelectorNamespace from narwhals.typing import CompliantNamespace if TYPE_CHECKING: @@ -34,6 +35,12 @@ def __init__( self._backend_version = backend_version self._version = version + @property + def selectors(self: Self) -> SparkLikeSelectorNamespace: + return SparkLikeSelectorNamespace( + backend_version=self._backend_version, version=self._version + ) + def all(self: Self) -> SparkLikeExpr: def _all(df: SparkLikeLazyFrame) -> list[Column]: return [F.col(col_name) for col_name in df.columns] diff --git a/narwhals/_spark_like/selectors.py b/narwhals/_spark_like/selectors.py new file mode 100644 index 000000000..911457bd2 --- /dev/null +++ b/narwhals/_spark_like/selectors.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any +from typing import Sequence + +from pyspark.sql import functions as F # noqa: N812 + +from narwhals._spark_like.expr import SparkLikeExpr +from narwhals.utils import import_dtypes_module + +if TYPE_CHECKING: + from pyspark.sql import Column + from typing_extensions import Self + + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + from narwhals.dtypes import DType + from narwhals.utils import Version + + +class SparkLikeSelectorNamespace: + def __init__( + self: Self, *, backend_version: tuple[int, ...], version: Version + ) -> None: + self._backend_version = backend_version + self._version = version + + def by_dtype(self: Self, dtypes: list[DType | type[DType]]) -> SparkLikeSelector: + def func(df: SparkLikeLazyFrame) -> list[Column]: + return [F.col(col) for col in df.columns if df.schema[col] in dtypes] + + def evalute_output_names(df: SparkLikeLazyFrame) -> Sequence[str]: + return [col for col in df.columns if df.schema[col] in dtypes] + + return SparkLikeSelector( + func, + depth=0, + function_name="selector", + evaluate_output_names=evalute_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=False, + version=self._version, + kwargs={}, + ) + + def numeric(self: Self) -> SparkLikeSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype( + [ + dtypes.Int128, + dtypes.Int64, + dtypes.Int32, + dtypes.Int16, + dtypes.Int8, + dtypes.UInt128, + dtypes.UInt64, + dtypes.UInt32, + dtypes.UInt16, + dtypes.UInt8, + dtypes.Float64, + dtypes.Float32, + ], + ) + + def categorical(self: Self) -> SparkLikeSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.Categorical]) + + def string(self: Self) -> SparkLikeSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.String]) + + def boolean(self: Self) -> SparkLikeSelector: + dtypes = import_dtypes_module(self._version) + return self.by_dtype([dtypes.Boolean]) + + def all(self: Self) -> SparkLikeSelector: + def func(df: SparkLikeLazyFrame) -> list[Column]: + return [F.col(col) for col in df.columns] + + return SparkLikeSelector( + func, + depth=0, + function_name="selector", + evaluate_output_names=lambda df: df.columns, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=False, + version=self._version, + kwargs={}, + ) + + +class SparkLikeSelector(SparkLikeExpr): + def __repr__(self: Self) -> str: # pragma: no cover + return ( + f"SparkLikeSelector(" + f"depth={self._depth}, " + f"function_name={self._function_name})" + ) + + def _to_expr(self: Self) -> SparkLikeExpr: + return SparkLikeExpr( + self._call, + depth=self._depth, + function_name=self._function_name, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + + def __sub__(self: Self, other: SparkLikeSelector | Any) -> SparkLikeSelector | Any: + if isinstance(other, SparkLikeSelector): + + def call(df: SparkLikeLazyFrame) -> list[Column]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + return [x for x, name in zip(lhs, lhs_names) if name not in rhs_names] + + def evaluate_output_names(df: SparkLikeLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [x for x in lhs_names if x not in rhs_names] + + return SparkLikeSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() - ( + other if isinstance(other, SparkLikeExpr) else F.lit(other) + ) + + def __or__(self: Self, other: SparkLikeSelector | Any) -> SparkLikeSelector | Any: + if isinstance(other, SparkLikeSelector): + + def call(df: SparkLikeLazyFrame) -> list[Column]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + rhs = other._call(df) + return [ + *(x for x, name in zip(lhs, lhs_names) if name not in rhs_names), + *rhs, + ] + + def evaluate_output_names(df: SparkLikeLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [*(x for x in lhs_names if x not in rhs_names), *rhs_names] + + return SparkLikeSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() | ( + other if isinstance(other, SparkLikeExpr) else F.lit(other) + ) + + def __and__(self: Self, other: SparkLikeSelector | Any) -> SparkLikeSelector | Any: + if isinstance(other, SparkLikeSelector): + + def call(df: SparkLikeLazyFrame) -> list[Column]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + lhs = self._call(df) + return [x for x, name in zip(lhs, lhs_names) if name in rhs_names] + + def evaluate_output_names(df: SparkLikeLazyFrame) -> list[str]: + lhs_names = self._evaluate_output_names(df) + rhs_names = other._evaluate_output_names(df) + return [x for x in lhs_names if x in rhs_names] + + return SparkLikeSelector( + call, + depth=0, + function_name="selector", + evaluate_output_names=evaluate_output_names, + alias_output_names=None, + backend_version=self._backend_version, + returns_scalar=self._returns_scalar, + version=self._version, + kwargs={}, + ) + else: + return self._to_expr() & ( + other if isinstance(other, SparkLikeExpr) else F.lit(other) + ) + + def __invert__(self: Self) -> SparkLikeSelector: + return ( + SparkLikeSelectorNamespace( + backend_version=self._backend_version, version=self._version + ).all() + - self + ) diff --git a/pyproject.toml b/pyproject.toml index d283f4d59..dbf21809e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -168,6 +168,8 @@ filterwarnings = [ 'ignore:.*The distutils package is deprecated and slated for removal in Python 3.12:DeprecationWarning:pyspark', 'ignore:.*distutils Version classes are deprecated. Use packaging.version instead.*:DeprecationWarning:pyspark', 'ignore:.*is_datetime64tz_dtype is deprecated and will be removed in a future version.*:DeprecationWarning:pyspark', + # Warning raised by PyArrow nightly just by importing pandas + 'ignore:.*Python binding for RankQuantileOptions not exposed:RuntimeWarning:pyarrow' ] xfail_strict = true diff --git a/tests/selectors_test.py b/tests/selectors_test.py index 36ea15a4f..99740d227 100644 --- a/tests/selectors_test.py +++ b/tests/selectors_test.py @@ -11,6 +11,7 @@ from narwhals.stable.v1.selectors import categorical from narwhals.stable.v1.selectors import numeric from narwhals.stable.v1.selectors import string +from tests.utils import POLARS_VERSION from tests.utils import PYARROW_VERSION from tests.utils import Constructor from tests.utils import assert_equal_data @@ -23,36 +24,28 @@ } -def test_selectors(constructor: Constructor, request: pytest.FixtureRequest) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): - request.applymarker(pytest.mark.xfail) +def test_selectors(constructor: Constructor) -> None: df = nw.from_native(constructor(data)) result = df.select(by_dtype([nw.Int64, nw.Float64]) + 1) expected = {"a": [2, 2, 3], "c": [5.1, 6.0, 7.0]} assert_equal_data(result, expected) -def test_numeric(constructor: Constructor, request: pytest.FixtureRequest) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): - request.applymarker(pytest.mark.xfail) +def test_numeric(constructor: Constructor) -> None: df = nw.from_native(constructor(data)) result = df.select(numeric() + 1) expected = {"a": [2, 2, 3], "c": [5.1, 6.0, 7.0]} assert_equal_data(result, expected) -def test_boolean(constructor: Constructor, request: pytest.FixtureRequest) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): - request.applymarker(pytest.mark.xfail) +def test_boolean(constructor: Constructor) -> None: df = nw.from_native(constructor(data)) result = df.select(boolean()) expected = {"d": [True, False, True]} assert_equal_data(result, expected) -def test_string(constructor: Constructor, request: pytest.FixtureRequest) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): - request.applymarker(pytest.mark.xfail) +def test_string(constructor: Constructor) -> None: df = nw.from_native(constructor(data)) result = df.select(string()) expected = {"b": ["a", "b", "c"]} @@ -67,7 +60,7 @@ def test_categorical( 15, ): # pragma: no cover request.applymarker(pytest.mark.xfail) - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): + if "pyspark" in str(constructor) or "duckdb" in str(constructor): request.applymarker(pytest.mark.xfail) expected = {"b": ["a", "b", "c"]} @@ -96,13 +89,28 @@ def test_set_ops( expected: list[str], request: pytest.FixtureRequest, ) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): + if "duckdb" in str(constructor) and not expected: request.applymarker(pytest.mark.xfail) df = nw.from_native(constructor(data)) result = df.select(selector).collect_schema().names() assert sorted(result) == expected +def test_subtract_expr( + constructor: Constructor, + request: pytest.FixtureRequest, +) -> None: + if "polars" in str(constructor) and POLARS_VERSION < (0, 20, 27): + # In old Polars versions, cs.numeric() - col('a') + # would exclude column 'a' from the result, as opposed to + # subtracting it. + request.applymarker(pytest.mark.xfail) + df = nw.from_native(constructor(data)) + result = df.select(numeric() - nw.col("a")) + expected = {"a": [0, 0, 0], "c": [3.1, 4.0, 4.0]} + assert_equal_data(result, expected) + + def test_set_ops_invalid(constructor: Constructor) -> None: df = nw.from_native(constructor(data)) with pytest.raises((NotImplementedError, ValueError)):