Skip to content

Commit

Permalink
Migrate ORC Writer to pylibcudf (#17310)
Browse files Browse the repository at this point in the history
Apart of #15162.

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #17310
  • Loading branch information
Matt711 authored Nov 26, 2024
1 parent b89728b commit 165d756
Show file tree
Hide file tree
Showing 9 changed files with 762 additions and 124 deletions.
167 changes: 83 additions & 84 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from libc.stdint cimport int64_t
from libcpp cimport bool, int
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import itertools
from collections import OrderedDict

try:
Expand All @@ -16,31 +14,19 @@ except ImportError:
import json

cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
orc_chunked_writer,
orc_writer_options,
write_orc as libcudf_write_orc,
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
sink_info,
table_input_metadata,
)
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.io.utils cimport update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

import cudf
from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES
from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.core.buffer import acquire_spill_lock

from pylibcudf.io.types cimport TableInputMetadata, SinkInfo, ColumnInMetadata
from pylibcudf.io.orc cimport OrcChunkedWriter

# TODO: Consider inlining this function since it seems to only be used in one place.
cpdef read_parsed_orc_statistics(filepath_or_buffer):
Expand Down Expand Up @@ -246,61 +232,58 @@ def write_orc(
--------
cudf.read_orc
"""
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, index)
)

user_data = {}
user_data["pandas"] = generate_pandas_metadata(table, index)
if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
columns = table._columns if table._index is None else [
*table.index._columns, *table._columns
]
plc_table = plc.Table([col.to_pylibcudf(mode="read") for col in columns])
tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
_index_level_name(idx_name, level, table._column_names)
)
num_index_cols_meta = len(table._index.names)
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = TableInputMetadata(plc_table)
num_index_cols_meta = 0

if cols_as_map_type is not None:
cols_as_map_type = set(cols_as_map_type)

for i, name in enumerate(table._column_names, num_index_cols_meta):
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.column_metadata[i].set_name(name)
_set_col_children_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
(cols_as_map_type is not None)
and (name in cols_as_map_type),
)

cdef orc_writer_options c_orc_writer_options = move(
orc_writer_options.builder(
sink_info_c, tv
).metadata(tbl_meta)
.key_value_metadata(move(user_data))
options = (
plc.io.orc.OrcWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc_table
)
.metadata(tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
if stripe_size_bytes is not None:
c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes)
options.set_stripe_size_bytes(stripe_size_bytes)
if stripe_size_rows is not None:
c_orc_writer_options.set_stripe_size_rows(stripe_size_rows)
options.set_stripe_size_rows(stripe_size_rows)
if row_index_stride is not None:
c_orc_writer_options.set_row_index_stride(row_index_stride)
options.set_row_index_stride(row_index_stride)

with nogil:
libcudf_write_orc(c_orc_writer_options)
plc.io.orc.write_orc(options)


cdef int64_t get_skiprows_arg(object arg) except*:
Expand All @@ -326,13 +309,12 @@ cdef class ORCWriter:
cudf.io.orc.to_orc
"""
cdef bool initialized
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef OrcChunkedWriter writer
cdef SinkInfo sink
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef TableInputMetadata tbl_meta
cdef object cols_as_map_type
cdef object stripe_size_bytes
cdef object stripe_size_rows
Expand All @@ -347,8 +329,7 @@ cdef class ORCWriter:
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.sink = plc.io.SinkInfo([path])
self.statistics = statistics
self.compression = compression
self.index = index
Expand All @@ -368,17 +349,21 @@ cdef class ORCWriter:
table._index.name is not None or
isinstance(table._index, cudf.core.multiindex.MultiIndex)
)
tv = table_view_from_table(table, not keep_index)
if keep_index:
columns = [
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
else:
columns = [col.to_pylibcudf(mode="read") for col in table._columns]

with nogil:
self.writer.get()[0].write(tv)
self.writer.write(plc.Table(columns))

def close(self):
if not self.initialized:
return

with nogil:
self.writer.get()[0].close()
self.writer.close()

def __dealloc__(self):
self.close()
Expand All @@ -387,71 +372,85 @@ cdef class ORCWriter:
"""
Prepare all the values required to build the
chunked_orc_writer_options anb creates a writer"""
cdef table_view tv

num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True),
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
self.tbl_meta = TableInputMetadata(plc_table)
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
idx_name
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(
table.index._columns, table._columns
)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
self.tbl_meta.column_metadata[0].set_name(
str.encode(table._index.name)
table._index.name
)
num_index_cols_meta = 1

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
self.tbl_meta.column_metadata[i].set_name(name)
_set_col_children_metadata(
table[name]._column,
self.tbl_meta.column_metadata[i],
(self.cols_as_map_type is not None)
and (name in self.cols_as_map_type),
)

cdef map[string, string] user_data
user_data = {}
pandas_metadata = generate_pandas_metadata(table, self.index)
user_data[str.encode("pandas")] = str.encode(pandas_metadata)

cdef chunked_orc_writer_options c_opts = move(
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
user_data["pandas"] = pandas_metadata

options = (
plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
c_opts.set_stripe_size_bytes(self.stripe_size_bytes)
options.set_stripe_size_bytes(self.stripe_size_bytes)
if self.stripe_size_rows is not None:
c_opts.set_stripe_size_rows(self.stripe_size_rows)
options.set_stripe_size_rows(self.stripe_size_rows)
if self.row_index_stride is not None:
c_opts.set_row_index_stride(self.row_index_stride)
options.set_row_index_stride(self.row_index_stride)

with nogil:
self.writer.reset(new orc_chunked_writer(c_opts))
self.writer = plc.io.orc.OrcChunkedWriter.from_options(options)

self.initialized = True

cdef _set_col_children_metadata(Column col,
column_in_metadata& col_meta,
ColumnInMetadata col_meta,
list_column_as_map=False):
if isinstance(col.dtype, cudf.StructDtype):
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
):
col_meta.child(i).set_name(name.encode())
col_meta.child(i).set_name(name)
_set_col_children_metadata(
child_col, col_meta.child(i), list_column_as_map
)
Expand Down
65 changes: 63 additions & 2 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ from libcpp cimport bool
from libcpp.optional cimport optional
from libcpp.string cimport string
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from libcpp.memory cimport unique_ptr
from libcpp.map cimport map
from pylibcudf.io.types cimport (
SourceInfo,
SinkInfo,
TableWithMetadata,
TableInputMetadata,
)
from pylibcudf.libcudf.io.orc_metadata cimport (
column_statistics,
parsed_orc_statistics,
statistics_type,
)
from pylibcudf.libcudf.io.orc cimport (
orc_chunked_writer,
orc_writer_options,
orc_writer_options_builder,
chunked_orc_writer_options,
chunked_orc_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table
from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)

cpdef TableWithMetadata read_orc(
SourceInfo source_info,
Expand Down Expand Up @@ -48,3 +66,46 @@ cdef class ParsedOrcStatistics:
cpdef ParsedOrcStatistics read_parsed_orc_statistics(
SourceInfo source_info
)

cdef class OrcWriterOptions:
cdef orc_writer_options c_obj
cdef Table table
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_type size_rows)
cpdef void set_row_index_stride(self, size_type stride)

cdef class OrcWriterOptionsBuilder:
cdef orc_writer_options_builder c_obj
cdef Table table
cdef SinkInfo sink
cpdef OrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
cpdef void close(self)
cpdef void write(self, Table table)

cdef class ChunkedOrcWriterOptions:
cdef chunked_orc_writer_options c_obj
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_type size_rows)
cpdef void set_row_index_stride(self, size_type stride)

cdef class ChunkedOrcWriterOptionsBuilder:
cdef chunked_orc_writer_options_builder c_obj
cdef SinkInfo sink
cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata(
self, dict kvm
)
cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef ChunkedOrcWriterOptions build(self)
Loading

0 comments on commit 165d756

Please sign in to comment.