diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst index d6f8cd2a1ff..7cd8ef4c504 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst @@ -25,6 +25,7 @@ This page provides API documentation for pylibcudf. lists merge null_mask + partitioning quantiles reduce replace diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst new file mode 100644 index 00000000000..6951dbecca0 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst @@ -0,0 +1,6 @@ +============ +partitioning +============ + +.. automodule:: pylibcudf.partitioning + :members: diff --git a/python/cudf/cudf/_lib/hash.pyx b/python/cudf/cudf/_lib/hash.pyx index 48f75b12a73..9b7ab0888d2 100644 --- a/python/cudf/cudf/_lib/hash.pyx +++ b/python/cudf/cudf/_lib/hash.pyx @@ -3,11 +3,8 @@ from cudf.core.buffer import acquire_spill_lock from libcpp.memory cimport unique_ptr -from libcpp.pair cimport pair from libcpp.utility cimport move -from libcpp.vector cimport vector -cimport pylibcudf.libcudf.types as libcudf_types from pylibcudf.libcudf.column.column cimport column from pylibcudf.libcudf.hash cimport ( md5, @@ -19,37 +16,23 @@ from pylibcudf.libcudf.hash cimport ( sha512, xxhash_64, ) -from pylibcudf.libcudf.partitioning cimport ( - hash_partition as cpp_hash_partition, -) -from pylibcudf.libcudf.table.table cimport table from pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns +from cudf._lib.utils cimport table_view_from_columns + +import pylibcudf as plc @acquire_spill_lock() -def hash_partition(list source_columns, object columns_to_hash, +def hash_partition(list source_columns, list columns_to_hash, int num_partitions): - cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash - cdef int c_num_partitions = num_partitions - cdef table_view c_source_view = table_view_from_columns(source_columns) - - cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result - with nogil: - c_result = move( - cpp_hash_partition( - c_source_view, - c_columns_to_hash, - c_num_partitions - ) - ) - - return ( - columns_from_unique_ptr(move(c_result.first)), - list(c_result.second) + plc_table, offsets = plc.partitioning.hash_partition( + plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), + columns_to_hash, + num_partitions ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/partitioning.pyx b/python/cudf/cudf/_lib/partitioning.pyx index d94f0e1b564..13997da8403 100644 --- a/python/cudf/cudf/_lib/partitioning.pyx +++ b/python/cudf/cudf/_lib/partitioning.pyx @@ -2,24 +2,13 @@ from cudf.core.buffer import acquire_spill_lock -from libcpp.memory cimport unique_ptr -from libcpp.pair cimport pair -from libcpp.utility cimport move -from libcpp.vector cimport vector - -from pylibcudf.libcudf.column.column_view cimport column_view -from pylibcudf.libcudf.partitioning cimport partition as cpp_partition -from pylibcudf.libcudf.table.table cimport table -from pylibcudf.libcudf.table.table_view cimport table_view - from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns + +import pylibcudf as plc from cudf._lib.reduce import minmax from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count -cimport pylibcudf.libcudf.types as libcudf_types - @acquire_spill_lock() def partition(list source_columns, Column partition_map, @@ -50,25 +39,15 @@ def partition(list source_columns, Column partition_map, if num_partitions is None: num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True) - cdef int c_num_partitions = num_partitions - cdef table_view c_source_view = table_view_from_columns(source_columns) - - cdef column_view c_partition_map_view = partition_map.view() - cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result if partition_map.size > 0: lo, hi = minmax(partition_map) if lo < 0 or hi >= num_partitions: raise ValueError("Partition map has invalid values") - with nogil: - c_result = move( - cpp_partition( - c_source_view, - c_partition_map_view, - c_num_partitions - ) - ) - return ( - columns_from_unique_ptr(move(c_result.first)), list(c_result.second) + plc_table, offsets = plc.partitioning.partition( + plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), + partition_map.to_pylibcudf(mode="read"), + num_partitions ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets diff --git a/python/pylibcudf/pylibcudf/CMakeLists.txt b/python/pylibcudf/pylibcudf/CMakeLists.txt index f07c8897e34..eaf29685e6c 100644 --- a/python/pylibcudf/pylibcudf/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/CMakeLists.txt @@ -31,6 +31,7 @@ set(cython_sources lists.pyx merge.pyx null_mask.pyx + partitioning.pyx quantiles.pyx reduce.pyx replace.pyx diff --git a/python/pylibcudf/pylibcudf/__init__.pxd b/python/pylibcudf/pylibcudf/__init__.pxd index b7cf6413c05..d4bedce6699 100644 --- a/python/pylibcudf/pylibcudf/__init__.pxd +++ b/python/pylibcudf/pylibcudf/__init__.pxd @@ -17,6 +17,7 @@ from . cimport ( lists, merge, null_mask, + partitioning, quantiles, reduce, replace, @@ -60,6 +61,7 @@ __all__ = [ "lists", "merge", "null_mask", + "partitioning", "quantiles", "reduce", "replace", diff --git a/python/pylibcudf/pylibcudf/__init__.py b/python/pylibcudf/pylibcudf/__init__.py index 84b1c29f791..11aa0774998 100644 --- a/python/pylibcudf/pylibcudf/__init__.py +++ b/python/pylibcudf/pylibcudf/__init__.py @@ -28,6 +28,7 @@ lists, merge, null_mask, + partitioning, quantiles, reduce, replace, @@ -74,6 +75,7 @@ "lists", "merge", "null_mask", + "partitioning", "quantiles", "reduce", "replace", diff --git a/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd b/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd index 1ea10e8a194..89bddbffab5 100644 --- a/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd @@ -25,3 +25,10 @@ cdef extern from "cudf/partitioning.hpp" namespace "cudf" nogil: const column_view& partition_map, int num_partitions ) except + + + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] \ + round_robin_partition "cudf::round_robin_partition" ( + const table_view& input, + int num_partitions, + int start_partition + ) except + diff --git a/python/pylibcudf/pylibcudf/partitioning.pxd b/python/pylibcudf/pylibcudf/partitioning.pxd new file mode 100644 index 00000000000..aad60149fc4 --- /dev/null +++ b/python/pylibcudf/pylibcudf/partitioning.pxd @@ -0,0 +1,19 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from .column cimport Column +from .table cimport Table + + +cpdef tuple[Table, list] hash_partition( + Table input, + list columns_to_hash, + int num_partitions +) + +cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions) + +cpdef tuple[Table, list] round_robin_partition( + Table input, + int num_partitions, + int start_partition=* +) diff --git a/python/pylibcudf/pylibcudf/partitioning.pyx b/python/pylibcudf/pylibcudf/partitioning.pyx new file mode 100644 index 00000000000..8fa70daab5a --- /dev/null +++ b/python/pylibcudf/pylibcudf/partitioning.pyx @@ -0,0 +1,120 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cimport pylibcudf.libcudf.types as libcudf_types +from libcpp.memory cimport unique_ptr +from libcpp.pair cimport pair +from libcpp.utility cimport move +from libcpp.vector cimport vector +from pylibcudf.libcudf cimport partitioning as cpp_partitioning +from pylibcudf.libcudf.table.table cimport table + +from .column cimport Column +from .table cimport Table + + +cpdef tuple[Table, list] hash_partition( + Table input, + list columns_to_hash, + int num_partitions +): + """ + Partitions rows from the input table into multiple output tables. + + For details, see :cpp:func:`hash_partition`. + + Parameters + ---------- + input : Table + The table to partition + columns_to_hash : list[int] + Indices of input columns to hash + num_partitions : int + The number of partitions to use + + Returns + ------- + tuple[Table, list[int]] + An output table and a vector of row offsets to each partition + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash + cdef int c_num_partitions = num_partitions + + with nogil: + c_result = move( + cpp_partitioning.hash_partition( + input.view(), c_columns_to_hash, c_num_partitions + ) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) + +cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions): + """ + Partitions rows of `t` according to the mapping specified by `partition_map`. + + For details, see :cpp:func:`partition`. + + Parameters + ---------- + t : Table + The table to partition + partition_map : Column + Non-nullable column of integer values that map each row + in `t` to it's partition. + num_partitions : int + The total number of partitions + + Returns + ------- + tuple[Table, list[int]] + An output table and a list of row offsets to each partition + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef int c_num_partitions = num_partitions + + with nogil: + c_result = move( + cpp_partitioning.partition(t.view(), partition_map.view(), c_num_partitions) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) + + +cpdef tuple[Table, list] round_robin_partition( + Table input, + int num_partitions, + int start_partition=0 +): + """ + Round-robin partition. + + For details, see :cpp:func:`round_robin_partition`. + + Parameters + ---------- + input : Table + The input table to be round-robin partitioned + num_partitions : int + Number of partitions for the table + start_partition : int, default 0 + Index of the 1st partition + + Returns + ------- + tuple[Table, list[int]] + The partitioned table and the partition offsets + for each partition within the table. + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef int c_num_partitions = num_partitions + cdef int c_start_partition = start_partition + + with nogil: + c_result = move( + cpp_partitioning.round_robin_partition( + input.view(), c_num_partitions, c_start_partition + ) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) diff --git a/python/pylibcudf/pylibcudf/tests/test_partitioning.py b/python/pylibcudf/pylibcudf/tests/test_partitioning.py new file mode 100644 index 00000000000..444d0089d2c --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/test_partitioning.py @@ -0,0 +1,55 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import pyarrow as pa +import pylibcudf as plc +import pytest +from utils import assert_table_eq + + +@pytest.fixture(scope="module") +def partitioning_data(): + data = {"a": [1, 2, 3], "b": [1, 2, 5], "c": [1, 2, 10]} + pa_table = pa.table(data) + plc_table = plc.interop.from_arrow(pa_table) + return data, plc_table, pa_table + + +def test_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.partition( + plc_table, + plc.interop.from_arrow(pa.array([0, 0, 0])), + 1, + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0, 3] + + +def test_hash_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.hash_partition( + plc_table, [0, 1], 1 + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0] + + +def test_round_robin_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.round_robin_partition( + plc_table, 1, 0 + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0]