Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate CSV writer to pylibcudf #17163

Merged
merged 40 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cd78dcd
[WIP] Migrate CSV writer to pylibcudf
Matt711 Oct 24, 2024
4f899b6
migrate the CSV writer
Matt711 Oct 24, 2024
6723045
get existing test passing
Matt711 Oct 25, 2024
cbeea6b
add a test
Matt711 Oct 29, 2024
d3998fc
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 1, 2024
8286f74
clean up
Matt711 Nov 5, 2024
dc93b8b
merge conflict
Matt711 Nov 6, 2024
d7d21ca
add more test cases
Matt711 Nov 6, 2024
a6007d7
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 6, 2024
2ce052f
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 7, 2024
1a8c38c
fix type checking in SinkInfo
Matt711 Nov 7, 2024
afaa46c
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 7, 2024
b132456
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 11, 2024
5f8e4b9
Expose the CsvWriterOptions nad CsvWriterOptionsBuilder
Matt711 Nov 12, 2024
2287eba
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 12, 2024
3706bd5
Add type stubs
Matt711 Nov 12, 2024
15370b0
commit declaration file
Matt711 Nov 12, 2024
f10492c
merge conflict
Matt711 Nov 15, 2024
dd72323
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 15, 2024
4e706bd
add type stub
Matt711 Nov 15, 2024
99e770a
switch to options arg
Matt711 Nov 15, 2024
cf11996
remove casts
Matt711 Nov 15, 2024
f9e4570
clean up
Matt711 Nov 15, 2024
b7d971a
keep tables alive
Matt711 Nov 15, 2024
464ae48
address review
Matt711 Nov 15, 2024
2643831
addres reviews
Matt711 Nov 15, 2024
fb2e1bc
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 18, 2024
ffe895e
add to all
Matt711 Nov 18, 2024
496d664
clean up
Matt711 Nov 18, 2024
d6b3667
clean up
Matt711 Nov 18, 2024
94599b9
simplify fixtures
Matt711 Nov 18, 2024
4bdd6dd
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 19, 2024
18978ff
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 19, 2024
ffdee70
address review
Matt711 Nov 19, 2024
3a5dca6
fix write_csv doc string
Matt711 Nov 19, 2024
2da0446
finish addressing review
Matt711 Nov 19, 2024
0b6042a
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 21, 2024
e33d2ea
address review
Matt711 Nov 21, 2024
becc262
remove post processing
Matt711 Nov 21, 2024
aac021f
clean up
Matt711 Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 24 additions & 74 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types

Expand All @@ -23,23 +19,16 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport sink_info
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

from cudf.api.types import is_hashable

from pylibcudf.types cimport DataType

from cudf._lib.json import _dtype_to_names_list

CSV_HEX_TYPE_MAP = {
"hex": np.dtype("int64"),
"hex64": np.dtype("int64"),
Expand Down Expand Up @@ -318,59 +307,28 @@ def write_csv(
--------
cudf.to_csv
"""
cdef table_view input_table_view = table_view_from_table(
table, not index
)
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, input_table_view)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

col_names = []
for name in table._column_names:
col_names.append((name, _dtype_to_names_list(table[name]._column)))
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
for i, t in enumerate(col_names):
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
if t[0] is None or pd.isnull(t[0]):
col_names[i] = (na_rep, t[1])
columns = [col.to_pylibcudf(mode="read") for col in table._columns]
try:
with nogil:
cpp_write_csv(options)
plc.io.csv.write_csv(
plc.io.SinkInfo([path_or_buf]),
plc.io.TableWithMetadata(
plc.Table(columns),
col_names
),
path_or_buf=path_or_buf,
sep=str(sep),
na_rep=str(na_rep),
header=header,
lineterminator=str(lineterminator),
rows_per_chunk=rows_per_chunk,
indices=table._index if index else None,
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
)
except OverflowError:
raise OverflowError(
f"Writing CSV file with chunksize={rows_per_chunk} failed. "
Expand Down Expand Up @@ -419,11 +377,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *:

dtype = cudf.dtype(dtype)
return dtype_to_pylibcudf_type(dtype)


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if pd.isnull(col_name)
else col_name
for col_name in column_names
)
104 changes: 102 additions & 2 deletions python/pylibcudf/pylibcudf/io/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

from libcpp cimport bool
from libcpp.map cimport map

from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.io.types cimport SourceInfo, SinkInfo, TableWithMetadata
from pylibcudf.libcudf.io.csv cimport (
csv_reader_options,
csv_writer_options,
read_csv as cpp_read_csv,
write_csv as cpp_write_csv,
)

from pylibcudf.libcudf.io.types cimport (
compression_type,
quote_style,
table_with_metadata,
)
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table

cdef tuple _process_parse_dates_hex(list cols):
cdef vector[string] str_cols
Expand Down Expand Up @@ -80,6 +84,8 @@ def read_csv(
):
"""Reads a CSV file into a :py:class:`~.types.TableWithMetadata`.

For details, see :cpp:func:`read_csv`.

Parameters
----------
source_info : SourceInfo
Expand Down Expand Up @@ -261,3 +267,97 @@ def read_csv(
c_result = move(cpp_read_csv(options))

return TableWithMetadata.from_libcudf(c_result)


def write_csv(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this (should this? @vyasr) be a cpdef function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We've been cpdef'ing them, so I did it here too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good practice to do it consistently everywhere to enable full Cython usage. For I/O it's probably less important in the grand scheme of things since that shouldn't propagate bad typing afterwards in any way, but consistency helps.

SinkInfo sink_info,
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
TableWithMetadata table,
*,
object path_or_buf=None,
str sep=",",
str na_rep="",
bool header=True,
str lineterminator="\n",
int rows_per_chunk=8,
object indices=None,
):
"""
Writes a :py:class:`~pylibcudf.io.types.TableWithMetadata` to CSV format.
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

For details, see :cpp:func:`write_csv`.

Parameters
----------
sink_info: SinkInfo
The SinkInfo object to write to.
table : TableWithMetadata
The TableWithMetadata object containing the Table to write.
path_or_buf : object, default None
The source file-like object (eg. io.StringIO).
sep : str
Character to delimit column values.
na_rep : str
The string representation for null values.
header : bool, default True
Whether to write headers to csv. Includes the column names
and optionally, the index names (see ``index`` argument).
lineterminator : str, default '\\n'
The character used to determine the end of a line.
rows_per_chunk: int, default 8
The maximum number of rows to write at a time.
indices : object
The indices in the table.
"""
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

if header is True:
all_names = table.column_names()
if indices is not None:
all_names = indices.names + all_names
if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)
cdef Table new_table = table.tbl
if indices is not None:
new_table = Table(
[col.to_pylibcudf(mode="read") for col in indices._columns] + table.columns
)
cdef csv_writer_options options = move(
csv_writer_options.builder(
sink_info.c_obj,
new_table.view()
)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

with nogil:
cpp_write_csv(options)
23 changes: 14 additions & 9 deletions python/pylibcudf/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -267,25 +267,30 @@ cdef class SinkInfo:

cdef object initial_sink_cls = type(sinks[0])
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

if not all(isinstance(s, initial_sink_cls) for s in sinks):
if not all(isinstance(s, initial_sink_cls) or
(isinstance(sinks[0], io.IOBase) and isinstance(s, io.IOBase))
for s in sinks):
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("All sinks must be of the same type!")

if initial_sink_cls in {io.StringIO, io.BytesIO, io.TextIOBase}:
if isinstance(sinks[0], io.IOBase):
data_sinks.reserve(len(sinks))
if isinstance(sinks[0], (io.StringIO, io.BytesIO)):
for s in sinks:
for s in sinks:
if isinstance(s, (io.StringIO, io.BytesIO)):
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s))
)
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(sinks[0], io.TextIOBase):
for s in sinks:
if codecs.lookup(s).name not in ('utf-8', 'ascii'):
elif isinstance(s, io.TextIOBase):
if codecs.lookup(s.encoding).name not in ('utf-8', 'ascii'):
raise NotImplementedError(f"Unsupported encoding {s.encoding}")
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s.buffer))
)
data_sinks.push_back(self.sink_storage.back().get())
elif initial_sink_cls is str:
else:
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s))
)
data_sinks.push_back(self.sink_storage.back().get())
elif isinstance(sinks[0], str):
paths.reserve(len(sinks))
for s in sinks:
paths.push_back(<string> s.encode())
Expand Down
8 changes: 3 additions & 5 deletions python/pylibcudf/pylibcudf/tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,10 @@ def make_source(path_or_buf, pa_table, format, **kwargs):
NESTED_STRUCT_TESTING_TYPE,
]

NON_NESTED_PA_TYPES = NUMERIC_PA_TYPES + STRING_PA_TYPES + BOOL_PA_TYPES

DEFAULT_PA_TYPES = (
NUMERIC_PA_TYPES
+ STRING_PA_TYPES
+ BOOL_PA_TYPES
+ LIST_PA_TYPES
+ DEFAULT_PA_STRUCT_TESTING_TYPES
NON_NESTED_PA_TYPES + LIST_PA_TYPES + DEFAULT_PA_STRUCT_TESTING_TYPES
)

# Map pylibcudf compression types to pandas ones
Expand Down
Loading
Loading