Skip to content

Commit

Permalink
Migrate parquet merge_row_group_metadata to pylibcudf
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed Dec 3, 2024
1 parent 12c77f3 commit c635480
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 59 deletions.
62 changes: 5 additions & 57 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ except ImportError:

import numpy as np

from cython.operator cimport dereference

from cudf.api.types import is_list_like

from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io
Expand All @@ -25,17 +23,17 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from pylibcudf.expressions cimport Expression
from pylibcudf.io.parquet cimport BufferArrayFromVector
from pylibcudf.io.parquet cimport ChunkedParquetReader
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
parquet_writer_options,
write_parquet as parquet_writer,
Expand Down Expand Up @@ -66,46 +64,6 @@ from pylibcudf cimport Table
from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
cdef unique_ptr[vector[uint8_t]] in_vec

# these two things declare part of the buffer interface
cdef Py_ssize_t shape[1]
cdef Py_ssize_t strides[1]

@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] in_vec
):
cdef BufferArrayFromVector buf = BufferArrayFromVector()
buf.in_vec = move(in_vec)
buf.length = dereference(buf.in_vec).size()
return buf

def __getbuffer__(self, Py_buffer *buffer, int flags):
cdef Py_ssize_t itemsize = sizeof(uint8_t)

self.shape[0] = self.length
self.strides[0] = 1

buffer.buf = dereference(self.in_vec).data()

buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = itemsize
buffer.len = self.length * itemsize # product(shape) * itemsize
buffer.ndim = 1
buffer.obj = self
buffer.readonly = 0
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL

def __releasebuffer__(self, Py_buffer *buffer):
pass


def _parse_metadata(meta):
file_is_range_index = False
file_index_cols = None
Expand Down Expand Up @@ -808,19 +766,9 @@ cpdef merge_filemetadata(object filemetadata_list):
--------
cudf.io.parquet.merge_row_group_metadata
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef vector[uint8_t] blob_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob_py in filemetadata_list:
blob_c = blob_py
list_c.push_back(move(make_unique[vector[uint8_t]](blob_c)))

with nogil:
output_c = move(parquet_merge_metadata(list_c))

out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c))
return np.asarray(out_metadata_py)
return np.asarray(
plc.io.parquet.merge_row_group_metadata(filemetadata_list)
)


cdef statistics_freq _get_stat_freq(str statistics):
Expand Down
16 changes: 16 additions & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
cdef unique_ptr[vector[uint8_t]] in_vec

# these two things declare part of the buffer interface
cdef Py_ssize_t shape[1]
cdef Py_ssize_t strides[1]

@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] vec
)


cdef class ChunkedParquetReader:
cdef unique_ptr[cpp_chunked_parquet_reader] reader

Expand Down Expand Up @@ -91,3 +105,5 @@ cdef class ParquetWriterOptionsBuilder:
cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)

cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list)
70 changes: 68 additions & 2 deletions python/pylibcudf/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from cython.operator cimport dereference
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.memory cimport unique_ptr, make_unique
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand All @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.parquet cimport (
read_parquet as cpp_read_parquet,
write_parquet as cpp_write_parquet,
parquet_writer_options,
merge_row_group_metadata as cpp_merge_row_group_metadata,
)
from pylibcudf.libcudf.io.types cimport (
compression_type,
Expand All @@ -38,10 +39,44 @@ __all__ = [
"ParquetWriterOptions",
"ParquetWriterOptionsBuilder",
"read_parquet",
"write_parquet"
"write_parquet",
"merge_row_group_metadata",
]


cdef class BufferArrayFromVector:
@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] in_vec
):
cdef BufferArrayFromVector buf = BufferArrayFromVector()
buf.in_vec = move(in_vec)
buf.length = dereference(buf.in_vec).size()
return buf

def __getbuffer__(self, Py_buffer *buffer, int flags):
cdef Py_ssize_t itemsize = sizeof(uint8_t)

self.shape[0] = self.length
self.strides[0] = 1

buffer.buf = dereference(self.in_vec).data()

buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = itemsize
buffer.len = self.length * itemsize # product(shape) * itemsize
buffer.ndim = 1
buffer.obj = self
buffer.readonly = 0
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL

def __releasebuffer__(self, Py_buffer *buffer):
pass


cdef parquet_reader_options _setup_parquet_reader_options(
SourceInfo source_info,
list columns = None,
Expand Down Expand Up @@ -577,3 +612,34 @@ cpdef memoryview write_parquet(ParquetWriterOptions options):
c_result = cpp_write_parquet(c_options)

return memoryview(HostBuffer.from_unique_ptr(move(c_result)))


cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list):
"""
Merges multiple raw metadata blobs that were previously
created by write_parquet into a single metadata blob.
For details, see :cpp:func:`merge_row_group_metadata`.
Parameters
----------
metdata_list : list
List of input file metadata
Returns
-------
BufferArrayFromVector
A parquet-compatible blob that contains the data for all row groups in the list
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef vector[uint8_t] blob_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob in metdata_list:
blob_c = blob
list_c.push_back(move(make_unique[vector[uint8_t]](blob_c)))

with nogil:
output_c = move(cpp_merge_row_group_metadata(list_c))

return BufferArrayFromVector.from_unique_ptr(move(output_c))

0 comments on commit c635480

Please sign in to comment.