diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 61d3bcbe24e..45e0fc345b5 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -22,7 +22,6 @@ set(cython_sources filling.pyx groupby.pyx interop.pyx - join.pyx json.pyx merge.pyx null_mask.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index efa437eebb7..c51db601985 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -9,7 +9,6 @@ filling, groupby, interop, - join, json, merge, null_mask, diff --git a/python/cudf/cudf/_lib/join.pyx b/python/cudf/cudf/_lib/join.pyx deleted file mode 100644 index 2559358c21f..00000000000 --- a/python/cudf/cudf/_lib/join.pyx +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from cudf._lib.column cimport Column - -import pylibcudf - -# The functions below return the *gathermaps* that represent -# the join result when joining on the keys `lhs` and `rhs`. - - -@acquire_spill_lock() -def join(list lhs, list rhs, how=None): - if how == "outer": - how = "full" - if (join_func := getattr(pylibcudf.join, f"{how}_join", None)) is None: - raise ValueError(f"Invalid join type {how}") - - left_rows, right_rows = join_func( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in lhs]), - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in rhs]), - pylibcudf.types.NullEquality.EQUAL - ) - return Column.from_pylibcudf(left_rows), Column.from_pylibcudf(right_rows) - - -@acquire_spill_lock() -def semi_join(list lhs, list rhs, how=None): - if ( - join_func := getattr( - pylibcudf.join, f"{how.replace('left', 'left_')}_join", None - ) - ) is None: - raise ValueError(f"Invalid join type {how}") - - return Column.from_pylibcudf( - join_func( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in lhs]), - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in rhs]), - pylibcudf.types.NullEquality.EQUAL - ) - ), None diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 53946be1c49..f0df4a3c1b3 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1424,8 +1424,6 @@ def _label_encoding( ] dtype: int8 """ - from cudf._lib.join import join as cpp_join - if na_sentinel is None or na_sentinel.value is cudf.NA: na_sentinel = cudf.Scalar(-1) @@ -1447,15 +1445,21 @@ def _return_sentinel_column(): except ValueError: return _return_sentinel_column() - left_gather_map, right_gather_map = cpp_join( - [self], [cats], how="left" + left_rows, right_rows = plc.join.left_join( + plc.Table([self.to_pylibcudf(mode="read")]), + plc.Table([cats.to_pylibcudf(mode="read")]), + plc.types.NullEquality.EQUAL, ) + left_gather_map = type(self).from_pylibcudf(left_rows) + right_gather_map = type(self).from_pylibcudf(right_rows) + codes = libcudf.copying.gather( [as_column(range(len(cats)), dtype=dtype)], right_gather_map, nullify=True, ) del right_gather_map + del right_rows # reorder `codes` so that its values correspond to the # values of `self`: (codes,) = libcudf.sort.sort_by_key( diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index b274bdea76d..315324c130c 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -772,9 +772,22 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): join_keys = map(list, zip(*join_keys)) # By construction, left and right keys are related by # a permutation, so we can use an inner join. - left_order, right_order = libcudf.join.join( - *join_keys, how="inner" - ) + with acquire_spill_lock(): + plc_tables = [ + plc.Table( + [col.to_pylibcudf(mode="read") for col in cols] + ) + for cols in join_keys + ] + left_plc, right_plc = plc.join.inner_join( + plc_tables[0], + plc_tables[1], + plc.types.NullEquality.EQUAL, + ) + left_order = libcudf.column.Column.from_pylibcudf(left_plc) + right_order = libcudf.column.Column.from_pylibcudf( + right_plc + ) # left order is some permutation of the ordering we # want, and right order is a matching gather map for # the result table. Get the correct order by sorting diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 0a2b15a16b9..80e037c36fd 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -16,6 +16,8 @@ import pyarrow as pa from typing_extensions import Self +import pylibcudf as plc + import cudf from cudf import _lib as libcudf from cudf._lib.filling import sequence @@ -32,6 +34,7 @@ from cudf.core._base_index import BaseIndex, _return_get_indexer_result from cudf.core._compat import PANDAS_LT_300 from cudf.core._internals.search import search_sorted +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( CategoricalColumn, ColumnBase, @@ -1360,7 +1363,14 @@ def get_indexer(self, target, method=None, limit=None, tolerance=None): except ValueError: return _return_get_indexer_result(result.values) - scatter_map, indices = libcudf.join.join([lcol], [rcol], how="inner") + with acquire_spill_lock(): + left_plc, right_plc = plc.join.inner_join( + plc.Table([lcol.to_pylibcudf(mode="read")]), + plc.Table([rcol.to_pylibcudf(mode="read")]), + plc.types.NullEquality.EQUAL, + ) + scatter_map = libcudf.column.Column.from_pylibcudf(left_plc) + indices = libcudf.column.Column.from_pylibcudf(right_plc) result = libcudf.copying.scatter([indices], scatter_map, [result])[0] result_series = cudf.Series._from_column(result) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index cfeaca00888..5c224176730 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -2,11 +2,14 @@ from __future__ import annotations import itertools -from typing import Any, ClassVar +from typing import Any + +import pylibcudf as plc import cudf from cudf import _lib as libcudf from cudf._lib.types import size_type_dtype +from cudf.core.buffer import acquire_spill_lock from cudf.core.copy_types import GatherMap from cudf.core.join._join_helpers import ( _coerce_to_tuple, @@ -17,19 +20,26 @@ class Merge: - # The joiner function must have the following signature: - # - # def joiner( - # lhs: Frame, - # rhs: Frame - # ) -> Tuple[Optional[Column], Optional[Column]]: - # ... - # - # where `lhs` and `rhs` are Frames composed of the left and right - # join key. The `joiner` returns a tuple of two Columns - # representing the rows to gather from the left- and right- side - # tables respectively. - _joiner: ClassVar[staticmethod] = staticmethod(libcudf.join.join) + @staticmethod + @acquire_spill_lock() + def _joiner( + lhs: list[libcudf.column.Column], + rhs: list[libcudf.column.Column], + how: str, + ) -> tuple[libcudf.column.Column, libcudf.column.Column]: + if how == "outer": + how = "full" + if (join_func := getattr(plc.join, f"{how}_join", None)) is None: + raise ValueError(f"Invalid join type {how}") + + left_rows, right_rows = join_func( + plc.Table([col.to_pylibcudf(mode="read") for col in lhs]), + plc.Table([col.to_pylibcudf(mode="read") for col in rhs]), + plc.types.NullEquality.EQUAL, + ) + return libcudf.column.Column.from_pylibcudf( + left_rows + ), libcudf.column.Column.from_pylibcudf(right_rows) def __init__( self, @@ -546,7 +556,27 @@ def _validate_merge_params( class MergeSemi(Merge): - _joiner: ClassVar[staticmethod] = staticmethod(libcudf.join.semi_join) + @staticmethod + @acquire_spill_lock() + def _joiner( + lhs: list[libcudf.column.Column], + rhs: list[libcudf.column.Column], + how: str, + ) -> tuple[libcudf.column.Column, None]: + if ( + join_func := getattr( + plc.join, f"{how.replace('left', 'left_')}_join", None + ) + ) is None: + raise ValueError(f"Invalid join type {how}") + + return libcudf.column.Column.from_pylibcudf( + join_func( + plc.Table([col.to_pylibcudf(mode="read") for col in lhs]), + plc.Table([col.to_pylibcudf(mode="read") for col in rhs]), + plc.types.NullEquality.EQUAL, + ) + ), None def _merge_results(self, lhs: cudf.DataFrame, rhs: cudf.DataFrame): # semi-join result includes only lhs columns diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index bfff62f0a89..19a53af018d 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -14,6 +14,8 @@ import numpy as np import pandas as pd +import pylibcudf as plc + import cudf import cudf._lib as libcudf from cudf._lib.types import size_type_dtype @@ -22,6 +24,7 @@ from cudf.core import column from cudf.core._base_index import _return_get_indexer_result from cudf.core.algorithms import factorize +from cudf.core.buffer import acquire_spill_lock from cudf.core.column_accessor import ColumnAccessor from cudf.core.frame import Frame from cudf.core.index import ( @@ -1919,10 +1922,18 @@ def get_indexer(self, target, method=None, limit=None, tolerance=None): for lcol, rcol in zip(target._columns, self._columns) ] join_keys = map(list, zip(*join_keys)) - scatter_map, indices = libcudf.join.join( - *join_keys, - how="inner", - ) + with acquire_spill_lock(): + plc_tables = [ + plc.Table([col.to_pylibcudf(mode="read") for col in cols]) + for cols in join_keys + ] + left_plc, right_plc = plc.join.inner_join( + plc_tables[0], + plc_tables[1], + plc.types.NullEquality.EQUAL, + ) + scatter_map = libcudf.column.Column.from_pylibcudf(left_plc) + indices = libcudf.column.Column.from_pylibcudf(right_plc) result = libcudf.copying.scatter([indices], scatter_map, [result])[0] result_series = cudf.Series._from_column(result)