Skip to content

Commit

Permalink
Migrate CSV writer to pylibcudf (rapidsai#17163)
Browse files Browse the repository at this point in the history
Apart of rapidsai#15162

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Matthew Roeschke (https://github.com/mroeschke)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Lawrence Mitchell (https://github.com/wence-)

URL: rapidsai#17163
  • Loading branch information
Matt711 authored Nov 21, 2024
1 parent 0d9e577 commit f54c1a5
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 121 deletions.
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ table_with_metadata read_csv(
*/

/**
*@brief Builder to build options for `writer_csv()`.
*@brief Builder to build options for `write_csv()`.
*/
class csv_writer_options_builder;

Expand Down
108 changes: 34 additions & 74 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types

Expand All @@ -23,16 +19,7 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport sink_info
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

Expand Down Expand Up @@ -318,59 +305,40 @@ def write_csv(
--------
cudf.to_csv
"""
cdef table_view input_table_view = table_view_from_table(
table, not index
)
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, input_table_view)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

index_and_not_empty = index is True and table.index is not None
columns = [
col.to_pylibcudf(mode="read") for col in table.index._columns
] if index_and_not_empty else []
columns.extend(col.to_pylibcudf(mode="read") for col in table._columns)
col_names = []
if header:
all_names = list(table.index.names) if index_and_not_empty else []
all_names.extend(
na_rep if name is None or pd.isnull(name)
else name for name in table._column_names
)
col_names = [
'""' if (name in (None, '') and len(all_names) == 1)
else (str(name) if name not in (None, '') else '')
for name in all_names
]
try:
with nogil:
cpp_write_csv(options)
plc.io.csv.write_csv(
(
plc.io.csv.CsvWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc.Table(columns)
)
.names(col_names)
.na_rep(na_rep)
.include_header(header)
.rows_per_chunk(rows_per_chunk)
.line_terminator(str(lineterminator))
.inter_column_delimiter(str(sep))
.true_value("True")
.false_value("False")
.build()
)
)
except OverflowError:
raise OverflowError(
f"Writing CSV file with chunksize={rows_per_chunk} failed. "
Expand Down Expand Up @@ -419,11 +387,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *:

dtype = cudf.dtype(dtype)
return dtype_to_pylibcudf_type(dtype)


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if pd.isnull(col_name)
else col_name
for col_name in column_names
)
35 changes: 35 additions & 0 deletions python/pylibcudf/pylibcudf/io/csv.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.vector cimport vector
from libcpp.string cimport string
from libcpp cimport bool
from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
csv_writer_options_builder,
)
from pylibcudf.libcudf.io.types cimport quote_style
from pylibcudf.io.types cimport SinkInfo
from pylibcudf.table cimport Table

cdef class CsvWriterOptions:
cdef csv_writer_options c_obj
cdef Table table
cdef SinkInfo sink


cdef class CsvWriterOptionsBuilder:
cdef csv_writer_options_builder c_obj
cdef Table table
cdef SinkInfo sink
cpdef CsvWriterOptionsBuilder names(self, list names)
cpdef CsvWriterOptionsBuilder na_rep(self, str val)
cpdef CsvWriterOptionsBuilder include_header(self, bool val)
cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val)
cpdef CsvWriterOptionsBuilder line_terminator(self, str term)
cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim)
cpdef CsvWriterOptionsBuilder true_value(self, str val)
cpdef CsvWriterOptionsBuilder false_value(self, str val)
cpdef CsvWriterOptions build(self)


cpdef void write_csv(CsvWriterOptions options)
22 changes: 22 additions & 0 deletions python/pylibcudf/pylibcudf/io/csv.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ from collections.abc import Mapping
from pylibcudf.io.types import (
CompressionType,
QuoteStyle,
SinkInfo,
SourceInfo,
TableWithMetadata,
)
from pylibcudf.table import Table
from pylibcudf.types import DataType

def read_csv(
Expand Down Expand Up @@ -52,3 +54,23 @@ def read_csv(
# detect_whitespace_around_quotes: bool = False,
# timestamp_type: DataType = DataType(type_id.EMPTY),
) -> TableWithMetadata: ...
def write_csv(options: CsvWriterOptionsBuilder) -> None: ...

class CsvWriterOptions:
def __init__(self): ...
@staticmethod
def builder(sink: SinkInfo, table: Table) -> CsvWriterOptionsBuilder: ...

class CsvWriterOptionsBuilder:
def __init__(self): ...
def names(self, names: list) -> CsvWriterOptionsBuilder: ...
def na_rep(self, val: str) -> CsvWriterOptionsBuilder: ...
def include_header(self, val: bool) -> CsvWriterOptionsBuilder: ...
def rows_per_chunk(self, val: int) -> CsvWriterOptionsBuilder: ...
def line_terminator(self, term: str) -> CsvWriterOptionsBuilder: ...
def inter_column_delimiter(
self, delim: str
) -> CsvWriterOptionsBuilder: ...
def true_value(self, val: str) -> CsvWriterOptionsBuilder: ...
def false_value(self, val: str) -> CsvWriterOptionsBuilder: ...
def build(self) -> CsvWriterOptions: ...
Loading

0 comments on commit f54c1a5

Please sign in to comment.