Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-25.02' into cleanup-mur…
Browse files Browse the repository at this point in the history
…murhash-32
  • Loading branch information
PointKernel committed Nov 26, 2024
2 parents 57e7061 + 776ef54 commit e8610df
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 72 deletions.
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ set(cython_sources
filling.pyx
groupby.pyx
interop.pyx
join.pyx
json.pyx
merge.pyx
null_mask.pyx
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
filling,
groupby,
interop,
join,
json,
merge,
null_mask,
Expand Down
43 changes: 0 additions & 43 deletions python/cudf/cudf/_lib/join.pyx

This file was deleted.

12 changes: 8 additions & 4 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,8 +1424,6 @@ def _label_encoding(
]
dtype: int8
"""
from cudf._lib.join import join as cpp_join

if na_sentinel is None or na_sentinel.value is cudf.NA:
na_sentinel = cudf.Scalar(-1)

Expand All @@ -1447,15 +1445,21 @@ def _return_sentinel_column():
except ValueError:
return _return_sentinel_column()

left_gather_map, right_gather_map = cpp_join(
[self], [cats], how="left"
left_rows, right_rows = plc.join.left_join(
plc.Table([self.to_pylibcudf(mode="read")]),
plc.Table([cats.to_pylibcudf(mode="read")]),
plc.types.NullEquality.EQUAL,
)
left_gather_map = type(self).from_pylibcudf(left_rows)
right_gather_map = type(self).from_pylibcudf(right_rows)

codes = libcudf.copying.gather(
[as_column(range(len(cats)), dtype=dtype)],
right_gather_map,
nullify=True,
)
del right_gather_map
del right_rows
# reorder `codes` so that its values correspond to the
# values of `self`:
(codes,) = libcudf.sort.sort_by_key(
Expand Down
19 changes: 16 additions & 3 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,22 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs):
join_keys = map(list, zip(*join_keys))
# By construction, left and right keys are related by
# a permutation, so we can use an inner join.
left_order, right_order = libcudf.join.join(
*join_keys, how="inner"
)
with acquire_spill_lock():
plc_tables = [
plc.Table(
[col.to_pylibcudf(mode="read") for col in cols]
)
for cols in join_keys
]
left_plc, right_plc = plc.join.inner_join(
plc_tables[0],
plc_tables[1],
plc.types.NullEquality.EQUAL,
)
left_order = libcudf.column.Column.from_pylibcudf(left_plc)
right_order = libcudf.column.Column.from_pylibcudf(
right_plc
)
# left order is some permutation of the ordering we
# want, and right order is a matching gather map for
# the result table. Get the correct order by sorting
Expand Down
12 changes: 11 additions & 1 deletion python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import pyarrow as pa
from typing_extensions import Self

import pylibcudf as plc

import cudf
from cudf import _lib as libcudf
from cudf._lib.filling import sequence
Expand All @@ -32,6 +34,7 @@
from cudf.core._base_index import BaseIndex, _return_get_indexer_result
from cudf.core._compat import PANDAS_LT_300
from cudf.core._internals.search import search_sorted
from cudf.core.buffer import acquire_spill_lock
from cudf.core.column import (
CategoricalColumn,
ColumnBase,
Expand Down Expand Up @@ -1360,7 +1363,14 @@ def get_indexer(self, target, method=None, limit=None, tolerance=None):
except ValueError:
return _return_get_indexer_result(result.values)

scatter_map, indices = libcudf.join.join([lcol], [rcol], how="inner")
with acquire_spill_lock():
left_plc, right_plc = plc.join.inner_join(
plc.Table([lcol.to_pylibcudf(mode="read")]),
plc.Table([rcol.to_pylibcudf(mode="read")]),
plc.types.NullEquality.EQUAL,
)
scatter_map = libcudf.column.Column.from_pylibcudf(left_plc)
indices = libcudf.column.Column.from_pylibcudf(right_plc)
result = libcudf.copying.scatter([indices], scatter_map, [result])[0]
result_series = cudf.Series._from_column(result)

Expand Down
60 changes: 45 additions & 15 deletions python/cudf/cudf/core/join/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from __future__ import annotations

import itertools
from typing import Any, ClassVar
from typing import Any

import pylibcudf as plc

import cudf
from cudf import _lib as libcudf
from cudf._lib.types import size_type_dtype
from cudf.core.buffer import acquire_spill_lock
from cudf.core.copy_types import GatherMap
from cudf.core.join._join_helpers import (
_coerce_to_tuple,
Expand All @@ -17,19 +20,26 @@


class Merge:
# The joiner function must have the following signature:
#
# def joiner(
# lhs: Frame,
# rhs: Frame
# ) -> Tuple[Optional[Column], Optional[Column]]:
# ...
#
# where `lhs` and `rhs` are Frames composed of the left and right
# join key. The `joiner` returns a tuple of two Columns
# representing the rows to gather from the left- and right- side
# tables respectively.
_joiner: ClassVar[staticmethod] = staticmethod(libcudf.join.join)
@staticmethod
@acquire_spill_lock()
def _joiner(
lhs: list[libcudf.column.Column],
rhs: list[libcudf.column.Column],
how: str,
) -> tuple[libcudf.column.Column, libcudf.column.Column]:
if how == "outer":
how = "full"
if (join_func := getattr(plc.join, f"{how}_join", None)) is None:
raise ValueError(f"Invalid join type {how}")

left_rows, right_rows = join_func(
plc.Table([col.to_pylibcudf(mode="read") for col in lhs]),
plc.Table([col.to_pylibcudf(mode="read") for col in rhs]),
plc.types.NullEquality.EQUAL,
)
return libcudf.column.Column.from_pylibcudf(
left_rows
), libcudf.column.Column.from_pylibcudf(right_rows)

def __init__(
self,
Expand Down Expand Up @@ -546,7 +556,27 @@ def _validate_merge_params(


class MergeSemi(Merge):
_joiner: ClassVar[staticmethod] = staticmethod(libcudf.join.semi_join)
@staticmethod
@acquire_spill_lock()
def _joiner(
lhs: list[libcudf.column.Column],
rhs: list[libcudf.column.Column],
how: str,
) -> tuple[libcudf.column.Column, None]:
if (
join_func := getattr(
plc.join, f"{how.replace('left', 'left_')}_join", None
)
) is None:
raise ValueError(f"Invalid join type {how}")

return libcudf.column.Column.from_pylibcudf(
join_func(
plc.Table([col.to_pylibcudf(mode="read") for col in lhs]),
plc.Table([col.to_pylibcudf(mode="read") for col in rhs]),
plc.types.NullEquality.EQUAL,
)
), None

def _merge_results(self, lhs: cudf.DataFrame, rhs: cudf.DataFrame):
# semi-join result includes only lhs columns
Expand Down
19 changes: 15 additions & 4 deletions python/cudf/cudf/core/multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import numpy as np
import pandas as pd

import pylibcudf as plc

import cudf
import cudf._lib as libcudf
from cudf._lib.types import size_type_dtype
Expand All @@ -22,6 +24,7 @@
from cudf.core import column
from cudf.core._base_index import _return_get_indexer_result
from cudf.core.algorithms import factorize
from cudf.core.buffer import acquire_spill_lock
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.frame import Frame
from cudf.core.index import (
Expand Down Expand Up @@ -1919,10 +1922,18 @@ def get_indexer(self, target, method=None, limit=None, tolerance=None):
for lcol, rcol in zip(target._columns, self._columns)
]
join_keys = map(list, zip(*join_keys))
scatter_map, indices = libcudf.join.join(
*join_keys,
how="inner",
)
with acquire_spill_lock():
plc_tables = [
plc.Table([col.to_pylibcudf(mode="read") for col in cols])
for cols in join_keys
]
left_plc, right_plc = plc.join.inner_join(
plc_tables[0],
plc_tables[1],
plc.types.NullEquality.EQUAL,
)
scatter_map = libcudf.column.Column.from_pylibcudf(left_plc)
indices = libcudf.column.Column.from_pylibcudf(right_plc)
result = libcudf.copying.scatter([indices], scatter_map, [result])[0]
result_series = cudf.Series._from_column(result)

Expand Down

0 comments on commit e8610df

Please sign in to comment.