Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementations: pyspark and ray #90

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
20 changes: 20 additions & 0 deletions .github/envs/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: test-environment
channels:
- conda-forge
dependencies:
- dask >=2025
- pandas
- polars
- pyspark
- pyarrow >=15
- numpy
- pytest
- pytest-cov
- numba
- awkward
- distributed
- openjdk ==20
- pip
- pip:
- ray[data]
- git+https://github.com/dask-contrib/dask-awkward
14 changes: 9 additions & 5 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ jobs:
fail-fast: false
matrix:
platform: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]
runs-on: ${{matrix.platform}}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: setup Python ${{matrix.python-version}}
uses: actions/setup-python@v4
- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v3
with:
python-version: ${{matrix.python-version}}
python-version: ${{ matrix.python-version }}
environment-file: .github/envs/environment.yml
activate-environment: test-environment
- name: install
shell: bash -l {0}
run: |
pip install pip wheel -U
pip install -q --no-cache-dir .[test]
pip install -q --no-cache-dir -e .[test]
pip list
- name: test
shell: bash -l {0}
run: |
python -m pytest -v --cov-config=.coveragerc --cov akimbo
6 changes: 5 additions & 1 deletion src/akimbo/apply_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def dec(
match: function to determine if a part of the data structure matches the type we want to
operate on
outtype: postprocessing function after transform
inmode: how ``func`` expects its inputs: as awkward arrays (ak), numpy or arrow
inmode: how ``func`` expects its inputs: as
- ak: awkward arrays,
- numpy
- arrow
- other: anything that can be cast to ak arrays, e.g., number literals
"""

@functools.wraps(func)
Expand Down
12 changes: 10 additions & 2 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,17 @@ def str(self):
try:
cast = dec_cu(libcudf.unary.cast, match=leaf)
except AttributeError:

def cast_inner(col, dtype):
return cudf.core.column.ColumnBase(col.data, size=len(col), dtype=np.dtype(dtype),
mask=None, offset=0, children=())
return cudf.core.column.ColumnBase(
col.data,
size=len(col),
dtype=np.dtype(dtype),
mask=None,
offset=0,
children=(),
)

cast = dec_cu(cast_inner, match=leaf)

@property
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run(self, *args, **kwargs):
ar = [self._to_tt(ar) if hasattr(ar, "ak") else ar for ar in ar]
out = op(tt, *ar, **kwargs)
meta = PandasAwkwardAccessor._to_output(
ak.typetracer.length_zero_if_typetracer(out)
ak.typetracer.length_one_if_typetracer(out)
)
except (ValueError, TypeError):
meta = None
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, accessor) -> None:
floor_temporal = dec_t(pc.floor_temporal)
reound_temporal = dec_t(pc.round_temporal)
strftime = dec_t(pc.strftime)
strptime = dec_t(pc.strptime)
# strptime = dec_t(pc.strptime) # this is in .str instead
day = dec_t(pc.day)
day_of_week = dec_t(pc.day_of_week)
day_of_year = dec_t(pc.day_of_year)
Expand Down
74 changes: 53 additions & 21 deletions src/akimbo/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Callable, Iterable

import awkward as ak
import numpy as np
import pyarrow.compute as pc

from akimbo.apply_tree import dec, match_any, numeric, run_with_transform
Expand Down Expand Up @@ -82,14 +83,6 @@ class ArithmeticMixin:
def _create_op(cls, op):
raise AbstractMethodError(cls)

@classmethod
def _create_op(cls, op):
raise AbstractMethodError(cls)

@classmethod
def _create_op(cls, op):
raise AbstractMethodError(cls)

@classmethod
def _add_arithmetic_ops(cls) -> None:
setattr(cls, "__add__", cls._create_op(operator.add))
Expand Down Expand Up @@ -158,7 +151,7 @@ def accessor(self):

@classmethod
def is_series(cls, data):
return isinstance(data, cls.series_type)
return isinstance(data, cls.series_type) if cls.series_type else False

@classmethod
def is_dataframe(cls, data):
Expand Down Expand Up @@ -210,6 +203,9 @@ def transform(
This process walks thought the data's schema tree, and applies the given
function only on the matching nodes.

The function input(s) and output depend on inmode and outttpe
arguments.

Parameters
----------
fn: the operation you want to perform. Typically unary or binary, and may take
Expand All @@ -228,10 +224,20 @@ def transform(
bits = tuple(where.split(".")) if isinstance(where, str) else where
arr = self.array
part = arr.__getitem__(bits)
# TODO: apply ``where`` to any arrays in others
# other = [to_ak_layout(ar) for ar in others]
others = (
_
if isinstance(_, (str, int, float, np.number))
else to_ak_layout(_).__getitem__(bits)
for _ in others
)
callkwargs = {
k: _
if isinstance(_, (str, int, float, np.number))
else to_ak_layout(_).__getitem__(bits)
for k, _ in kwargs.items()
}
out = run_with_transform(
part, fn, match=match, others=others, inmode=inmode, **kwargs
part, fn, match=match, others=others, inmode=inmode, **callkwargs
)
final = ak.with_field(arr, out, where=where)
else:
Expand All @@ -247,7 +253,7 @@ def __getitem__(self, item):
def __dir__(self) -> Iterable[str]:
attrs = (_ for _ in dir(self.array) if not _.startswith("_"))
meths = series_methods if self.is_series(self._obj) else df_methods
return sorted(set(attrs) | set(meths))
return sorted(set(attrs) | set(meths) | set(self.subaccessors))

def with_behavior(self, behavior, where=()):
"""Assign a behavior to this array-of-records"""
Expand All @@ -270,10 +276,33 @@ def with_behavior(self, behavior, where=()):
def __array_function__(self, *args, **kwargs):
return self.array.__array_function__(*args, **kwargs)

def __array_ufunc__(self, *args, **kwargs):
if args[1] == "__call__":
return self.to_output(args[0](self.array, *args[3:], **kwargs))
raise NotImplementedError
def __array_ufunc__(self, *args, where=None, out=None, **kwargs):
# includes operator overload like df.ak + 1
ufunc, call, inputs, *callargs = args
if out is not None or call != "__call__":
raise NotImplementedError
if where:
# called like np.add(df.ak, 1, where="...")
bits = tuple(where.split(".")) if isinstance(where, str) else where
arr = self.array
part = arr.__getitem__(bits)
callargs = (
_
if isinstance(_, (str, int, float, np.number))
else to_ak_layout(_).__getitem__(bits)
for _ in callargs
)
callkwargs = {
k: _
if isinstance(_, (str, int, float, np.number))
else to_ak_layout(_).__getitem__(bits)
for k, _ in kwargs.items()
}

out = self.to_output(ufunc(part, *callargs, **callkwargs))
return self.to_output(ak.with_field(arr, out, where=where))

return self.to_output(ufunc(self.array, *callargs, **kwargs))

@property
def arrow(self) -> ak.Array:
Expand All @@ -293,7 +322,7 @@ def array(self) -> ak.Array:
return ak.from_arrow(self.arrow)

@classmethod
def register_accessor(cls, name, klass):
def register_accessor(cls, name: str, klass: type):
# TODO: check clobber?
cls.subaccessors[name] = klass

Expand Down Expand Up @@ -413,12 +442,14 @@ def op2(*args, extra=None, **kw):
args = list(args) + list(extra or [])
return op(*args, **kw)

def f(self, *args, **kw):
def f(self, *args, where=None, **kw):
# TODO: test here is for literals, but really we want "don't know how to
# array that" condition
extra = (_ for _ in args if isinstance(_, (str, int, float)))
extra = [_ for _ in args if isinstance(_, (str, int, float, np.number))]
args = (
to_ak_layout(_) for _ in args if not isinstance(_, (str, int, float))
to_ak_layout(_)
for _ in args
if not isinstance(_, (str, int, float, np.number))
)
out = self.transform(
op2,
Expand All @@ -427,6 +458,7 @@ def f(self, *args, **kw):
inmode="numpy",
extra=extra,
outtype=ak.contents.NumpyArray,
where=where,
**kw,
)
if isinstance(self._obj, self.dataframe_type):
Expand Down
126 changes: 125 additions & 1 deletion src/akimbo/polars.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from typing import Callable, Dict

import polars as pl
import pyarrow as pa

from akimbo.apply_tree import match_any
from akimbo.mixin import Accessor


@pl.api.register_series_namespace("ak")
@pl.api.register_dataframe_namespace("ak")
class PolarsAwkwardAccessor(Accessor):
"""Perform awkward operations on a polars series or dataframe"""
"""Perform awkward operations on a polars series or dataframe

This is for *eager* operations. A Lazy version may eventually be made.
"""

series_type = pl.Series
dataframe_type = pl.DataFrame
Expand All @@ -22,3 +29,120 @@ def to_arrow(cls, data):
def pack(self):
# polars already implements this directly
return self._obj.to_struct()


@pl.api.register_lazyframe_namespace
class LazyPolarsAwkwardAccessor(Accessor):
dataframe_type = pl.LazyFrame
series_type = None # lazy is never series

def transform(
self, fn: Callable, *others, where=None, match=match_any, inmode="ak", **kwargs
):
# TODO determine schema from first-run, with df.collect_schema()
return pl.map_batches(
(self._obj,) + others,
lambda d: d.ak.transform(
fn, match=match, inmode=inmode, **kwargs
).ak.unpack(),
schema=None,
)


def arrow_to_polars_type(arrow_type: pa.DataType) -> pl.DataType:
type_mapping = {
pa.int8(): pl.Int8,
pa.int16(): pl.Int16,
pa.int32(): pl.Int32,
pa.int64(): pl.Int64,
pa.uint8(): pl.UInt8,
pa.uint16(): pl.UInt16,
pa.uint32(): pl.UInt32,
pa.uint64(): pl.UInt64,
pa.float32(): pl.Float32,
pa.float64(): pl.Float64,
pa.string(): pl.String,
pa.bool_(): pl.Boolean,
}

if arrow_type in type_mapping:
return type_mapping[arrow_type]

# parametrised types
if pa.types.is_timestamp(arrow_type):
return pl.Datetime(time_unit=arrow_type.unit, time_zone=arrow_type.tx)

if pa.types.is_decimal(arrow_type):
return pl.Decimal(precision=arrow_type.precision, scale=arrow_type.scale)

# Handle list type
if pa.types.is_list(arrow_type):
value_type = arrow_to_polars_type(arrow_type.value_type)
return pl.List(value_type)

# Handle struct type
if pa.types.is_struct(arrow_type):
fields = {}
for field in arrow_type:
fields[field.name] = arrow_to_polars_type(field.type)
return pl.Struct(fields)

raise ValueError(f"Unsupported Arrow type: {arrow_type}")


def polars_to_arrow_type(polars_type: pl.DataType) -> pa.DataType:
type_mapping = {
pl.Int8: pa.int8(),
pl.Int16: pa.int16(),
pl.Int32: pa.int32(),
pl.Int64: pa.int64(),
pl.UInt8: pa.uint8(),
pl.UInt16: pa.uint16(),
pl.UInt32: pa.uint32(),
pl.UInt64: pa.uint64(),
pl.Float32: pa.float32(),
pl.Float64: pa.float64(),
pl.String: pa.string(),
pl.Boolean: pa.bool_(),
pl.Date: pa.date32(),
}

if polars_type in type_mapping:
return type_mapping[polars_type]

# parametrised types
if isinstance(polars_type, pl.DataType):
return pa.timestamp(polars_type.unit, polars_type.time_zone)

if isinstance(polars_type, pl.Decimal):
return pa.decimal128(polars_type.precision, polars_type.scale)

# Handle list type
if isinstance(polars_type, pl.List):
value_type = polars_to_arrow_type(polars_type.inner)
return pa.list_(value_type)

# Handle struct type
if isinstance(polars_type, pl.Struct):
fields = []
for name, dtype in polars_type.fields.items():
arrow_type = polars_to_arrow_type(dtype)
fields.append(pa.field(name, arrow_type))
return pa.struct(fields)

raise ValueError(f"Unsupported Polars type: {polars_type}")


def arrow_to_polars_schema(arrow_schema: pa.Schema) -> Dict[str, pl.DataType]:
polars_schema = {}
for field in arrow_schema:
polars_schema[field.name] = arrow_to_polars_type(field.type)
return polars_schema


def polars_to_arrow_schema(polars_schema: Dict[str, pl.DataType]) -> pa.Schema:
fields = []
for name, dtype in polars_schema.items():
arrow_type = polars_to_arrow_type(dtype)
fields.append(pa.field(name, arrow_type))
return pa.schema(fields)
Loading
Loading