Skip to content

Commit

Permalink
feat: add Expr.dt methods to PySpark (#1835)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dhanunjaya-Elluri authored Jan 20, 2025
1 parent e7ca81e commit 973b499
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 2 deletions.
5 changes: 5 additions & 0 deletions narwhals/_spark_like/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Sequence

from narwhals._expression_parsing import infer_new_root_output_names
from narwhals._spark_like.expr_dt import SparkLikeExprDateTimeNamespace
from narwhals._spark_like.expr_name import SparkLikeExprNameNamespace
from narwhals._spark_like.expr_str import SparkLikeExprStringNamespace
from narwhals._spark_like.utils import get_column_name
Expand Down Expand Up @@ -541,3 +542,7 @@ def str(self: Self) -> SparkLikeExprStringNamespace:
@property
def name(self: Self) -> SparkLikeExprNameNamespace:
return SparkLikeExprNameNamespace(self)

@property
def dt(self: Self) -> SparkLikeExprDateTimeNamespace:
return SparkLikeExprDateTimeNamespace(self)
135 changes: 135 additions & 0 deletions narwhals/_spark_like/expr_dt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import Column
from typing_extensions import Self

from narwhals._spark_like.expr import SparkLikeExpr


class SparkLikeExprDateTimeNamespace:
def __init__(self: Self, expr: SparkLikeExpr) -> None:
self._compliant_expr = expr

def date(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.to_date,
"date",
returns_scalar=self._compliant_expr._returns_scalar,
)

def year(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.year,
"year",
returns_scalar=self._compliant_expr._returns_scalar,
)

def month(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.month,
"month",
returns_scalar=self._compliant_expr._returns_scalar,
)

def day(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.dayofmonth,
"day",
returns_scalar=self._compliant_expr._returns_scalar,
)

def hour(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.hour,
"hour",
returns_scalar=self._compliant_expr._returns_scalar,
)

def minute(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.minute,
"minute",
returns_scalar=self._compliant_expr._returns_scalar,
)

def second(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.second,
"second",
returns_scalar=self._compliant_expr._returns_scalar,
)

def millisecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _millisecond(_input: Column) -> Column:
return F.floor((F.unix_micros(_input) % 1_000_000) / 1000)

return self._compliant_expr._from_call(
_millisecond,
"millisecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def microsecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _microsecond(_input: Column) -> Column:
return F.unix_micros(_input) % 1_000_000

return self._compliant_expr._from_call(
_microsecond,
"microsecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def nanosecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _nanosecond(_input: Column) -> Column:
return (F.unix_micros(_input) % 1_000_000) * 1000

return self._compliant_expr._from_call(
_nanosecond,
"nanosecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def ordinal_day(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.dayofyear,
"ordinal_day",
returns_scalar=self._compliant_expr._returns_scalar,
)

def weekday(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _weekday(_input: Column) -> Column:
# PySpark's dayofweek returns 1-7 for Sunday-Saturday
return (F.dayofweek(_input) + 6) % 7

return self._compliant_expr._from_call(
_weekday,
"weekday",
returns_scalar=self._compliant_expr._returns_scalar,
)
2 changes: 0 additions & 2 deletions tests/expr_and_series/dt/datetime_attributes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ def test_datetime_attributes(
request.applymarker(pytest.mark.xfail)
if attribute == "date" and "cudf" in str(constructor):
request.applymarker(pytest.mark.xfail)
if "pyspark" in str(constructor):
request.applymarker(pytest.mark.xfail)

df = nw.from_native(constructor(data))
result = df.select(getattr(nw.col("a").dt, attribute)())
Expand Down

0 comments on commit 973b499

Please sign in to comment.