Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning APIs to pylibcudf #16781

Merged
merged 11 commits into from
Sep 26, 2024
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ This page provides API documentation for pylibcudf.
lists
merge
null_mask
partitioning
quantiles
reduce
replace
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
============
partitioning
============

.. automodule:: pylibcudf.partitioning
:members:
35 changes: 9 additions & 26 deletions python/cudf/cudf/_lib/hash.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
from cudf.core.buffer import acquire_spill_lock

from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.hash cimport (
md5,
Expand All @@ -19,37 +16,23 @@ from pylibcudf.libcudf.hash cimport (
sha512,
xxhash_64,
)
from pylibcudf.libcudf.partitioning cimport (
hash_partition as cpp_hash_partition,
)
from pylibcudf.libcudf.table.table cimport table
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns
from cudf._lib.utils cimport table_view_from_columns

import pylibcudf as plc


@acquire_spill_lock()
def hash_partition(list source_columns, object columns_to_hash,
def hash_partition(list source_columns, list columns_to_hash,
int num_partitions):
cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash
cdef int c_num_partitions = num_partitions
cdef table_view c_source_view = table_view_from_columns(source_columns)

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
with nogil:
c_result = move(
cpp_hash_partition(
c_source_view,
c_columns_to_hash,
c_num_partitions
)
)

return (
columns_from_unique_ptr(move(c_result.first)),
list(c_result.second)
plc_table, offsets = plc.partitioning.hash_partition(
plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]),
columns_to_hash,
num_partitions
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets


@acquire_spill_lock()
Expand Down
35 changes: 7 additions & 28 deletions python/cudf/cudf/_lib/partitioning.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@

from cudf.core.buffer import acquire_spill_lock

from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector

from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.partitioning cimport partition as cpp_partition
from pylibcudf.libcudf.table.table cimport table
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns

import pylibcudf as plc

from cudf._lib.reduce import minmax
from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count

cimport pylibcudf.libcudf.types as libcudf_types


@acquire_spill_lock()
def partition(list source_columns, Column partition_map,
Expand Down Expand Up @@ -50,25 +39,15 @@ def partition(list source_columns, Column partition_map,

if num_partitions is None:
num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True)
cdef int c_num_partitions = num_partitions
cdef table_view c_source_view = table_view_from_columns(source_columns)

cdef column_view c_partition_map_view = partition_map.view()

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
if partition_map.size > 0:
lo, hi = minmax(partition_map)
if lo < 0 or hi >= num_partitions:
raise ValueError("Partition map has invalid values")
with nogil:
c_result = move(
cpp_partition(
c_source_view,
c_partition_map_view,
c_num_partitions
)
)

return (
columns_from_unique_ptr(move(c_result.first)), list(c_result.second)
plc_table, offsets = plc.partitioning.partition(
plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]),
partition_map.to_pylibcudf(mode="read"),
num_partitions
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(cython_sources
lists.pyx
merge.pyx
null_mask.pyx
partitioning.pyx
quantiles.pyx
reduce.pyx
replace.pyx
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/__init__.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ from . cimport (
lists,
merge,
null_mask,
partitioning,
quantiles,
reduce,
replace,
Expand Down Expand Up @@ -60,6 +61,7 @@ __all__ = [
"lists",
"merge",
"null_mask",
"partitioning",
"quantiles",
"reduce",
"replace",
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
lists,
merge,
null_mask,
partitioning,
quantiles,
reduce,
replace,
Expand Down Expand Up @@ -74,6 +75,7 @@
"lists",
"merge",
"null_mask",
"partitioning",
"quantiles",
"reduce",
"replace",
Expand Down
7 changes: 7 additions & 0 deletions python/pylibcudf/pylibcudf/libcudf/partitioning.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ cdef extern from "cudf/partitioning.hpp" namespace "cudf" nogil:
const column_view& partition_map,
int num_partitions
) except +

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] \
round_robin_partition "cudf::round_robin_partition" (
const table_view& input,
int num_partitions,
int start_partition
) except +
19 changes: 19 additions & 0 deletions python/pylibcudf/pylibcudf/partitioning.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from .column cimport Column
from .table cimport Table


cpdef tuple[Table, list] hash_partition(
Table input,
list columns_to_hash,
int num_partitions
)

cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions)

cpdef tuple[Table, list] round_robin_partition(
Table input,
int num_partitions,
int start_partition=*
)
120 changes: 120 additions & 0 deletions python/pylibcudf/pylibcudf/partitioning.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

cimport pylibcudf.libcudf.types as libcudf_types
from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pylibcudf.libcudf cimport partitioning as cpp_partitioning
from pylibcudf.libcudf.table.table cimport table

from .column cimport Column
from .table cimport Table


cpdef tuple[Table, list] hash_partition(
Table input,
list columns_to_hash,
int num_partitions
):
"""
Partitions rows from the input table into multiple output tables.

For details, see :cpp:func:`hash_partition`.

Parameters
----------
input : Table
The table to partition
columns_to_hash : list[int]
Indices of input columns to hash
num_partitions : int
The number of partitions to use

Returns
-------
tuple[Table, list[int]]
An output table and a vector of row offsets to each partition
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash
cdef int c_num_partitions = num_partitions

with nogil:
c_result = move(
cpp_partitioning.hash_partition(
input.view(), c_columns_to_hash, c_num_partitions
)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)

cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions):
"""
Partitions rows of `t` according to the mapping specified by `partition_map`.

For details, see :cpp:func:`partition`.

Parameters
----------
t : Table
The table to partition
partition_map : Column
Non-nullable column of integer values that map each row
in `t` to it's partition.
num_partitions : int
The total number of partitions

Returns
-------
tuple[Table, list[int]]
An output table and a list of row offsets to each partition
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef int c_num_partitions = num_partitions

with nogil:
c_result = move(
cpp_partitioning.partition(t.view(), partition_map.view(), c_num_partitions)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)


cpdef tuple[Table, list] round_robin_partition(
Table input,
int num_partitions,
int start_partition=0
):
"""
Round-robin partition.

For details, see :cpp:func:`round_robin_partition`.

Parameters
----------
input : Table
The input table to be round-robin partitioned
num_partitions : int
Number of partitions for the table
start_partition : int, default 0
Index of the 1st partition

Returns
-------
tuple[Table, list[int]]
The partitioned table and the partition offsets
for each partition within the table.
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef int c_num_partitions = num_partitions
cdef int c_start_partition = start_partition

with nogil:
c_result = move(
cpp_partitioning.round_robin_partition(
input.view(), c_num_partitions, c_start_partition
)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)
54 changes: 54 additions & 0 deletions python/pylibcudf/pylibcudf/tests/test_partitioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

import pyarrow as pa
import pylibcudf as plc
import pytest
from utils import assert_table_eq


@pytest.fixture(scope="module")
def partitioning_data():
data = {"a": [1, 2, 3], "b": [1, 2, 5], "c": [1, 2, 10]}
pa_table = pa.table(data)
return data, pa_table


def test_partition(partitioning_data):
raw_data, pa_table = partitioning_data
result, result_offsets = plc.partitioning.partition(
plc.interop.from_arrow(pa_table),
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
plc.interop.from_arrow(pa.array([0, 0, 0])),
1,
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0, 3]


def test_hash_partition(partitioning_data):
raw_data, pa_table = partitioning_data
result, result_offsets = plc.partitioning.hash_partition(
plc.interop.from_arrow(pa_table), [0, 1], 1
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0]


def test_round_robin_partition(partitioning_data):
raw_data, pa_table = partitioning_data
result, result_offsets = plc.partitioning.round_robin_partition(
plc.interop.from_arrow(pa_table), 1, 0
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0]
Loading