Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into dask-post-2024.11.2-support
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora authored Nov 27, 2024
2 parents 0d11d89 + 83f0ae0 commit a652edd
Show file tree
Hide file tree
Showing 44 changed files with 1,494 additions and 2,134 deletions.
64 changes: 42 additions & 22 deletions cpp/src/io/json/write_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ struct validity_fn {
*
* @param strings_columns Table of strings columns
* @param column_names Column of names for each column in the table
* @param num_rows Number of rows in the table
* @param row_prefix Prepend this string to each row
* @param row_suffix Append this string to each row
* @param value_separator Separator between values
Expand All @@ -255,6 +256,7 @@ struct validity_fn {
*/
std::unique_ptr<column> struct_to_strings(table_view const& strings_columns,
column_view const& column_names,
size_type const num_rows,
string_view const row_prefix,
string_view const row_suffix,
string_view const value_separator,
Expand All @@ -268,40 +270,54 @@ std::unique_ptr<column> struct_to_strings(table_view const& strings_columns,
auto const num_columns = strings_columns.num_columns();
CUDF_EXPECTS(num_columns == column_names.size(),
"Number of column names should be equal to number of columns in the table");
auto const strings_count = strings_columns.num_rows();
if (strings_count == 0) // empty begets empty
if (num_rows == 0) // empty begets empty
return make_empty_column(type_id::STRING);
// check all columns are of type string
CUDF_EXPECTS(std::all_of(strings_columns.begin(),
strings_columns.end(),
[](auto const& c) { return c.type().id() == type_id::STRING; }),
"All columns must be of type string");
auto constexpr strviews_per_column = 3; // (for each "column_name:", "value", "separator")
auto const num_strviews_per_row = strings_columns.num_columns() * strviews_per_column + 1;
auto const num_strviews_per_row = strings_columns.num_columns() == 0
? 2
: (1 + strings_columns.num_columns() * strviews_per_column);
// e.g. {col1: value, col2: value, col3: value} = 1 + 3 + 3 + (3-1) + 1 = 10

auto tbl_device_view = cudf::table_device_view::create(strings_columns, stream);
auto d_column_names = column_device_view::create(column_names, stream);

// Note for future: chunk it but maximize parallelism, if memory usage is high.
auto const total_strings = num_strviews_per_row * strings_columns.num_rows();
auto const total_rows = strings_columns.num_rows() * strings_columns.num_columns();
auto const total_strings = num_strviews_per_row * num_rows;
auto const total_rows = num_rows * strings_columns.num_columns();
rmm::device_uvector<string_view> d_strviews(total_strings, stream);
struct_scatter_strings_fn scatter_fn{*tbl_device_view,
*d_column_names,
strviews_per_column,
num_strviews_per_row,
row_prefix,
row_suffix,
value_separator,
narep.value(stream),
include_nulls,
d_strviews.begin()};
// scatter row_prefix, row_suffix, column_name:, value, value_separator as string_views
thrust::for_each(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(total_rows),
scatter_fn);
if (strings_columns.num_columns() > 0) {
struct_scatter_strings_fn scatter_fn{*tbl_device_view,
*d_column_names,
strviews_per_column,
num_strviews_per_row,
row_prefix,
row_suffix,
value_separator,
narep.value(stream),
include_nulls,
d_strviews.begin()};
// scatter row_prefix, row_suffix, column_name:, value, value_separator as string_views
thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(total_rows),
scatter_fn);
} else {
thrust::for_each(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_rows),
[d_strviews = d_strviews.begin(), row_prefix, row_suffix, num_strviews_per_row] __device__(
auto idx) {
auto const this_index = idx * num_strviews_per_row;
d_strviews[this_index] = row_prefix;
d_strviews[this_index + num_strviews_per_row - 1] = row_suffix;
});
}
if (!include_nulls) {
// if previous column was null, then we skip the value separator
rmm::device_uvector<bool> d_str_separator(total_rows, stream);
Expand Down Expand Up @@ -341,7 +357,7 @@ std::unique_ptr<column> struct_to_strings(table_view const& strings_columns,

// gather from offset and create a new string column
auto old_offsets = strings_column_view(joined_col->view()).offsets();
rmm::device_uvector<size_type> row_string_offsets(strings_columns.num_rows() + 1, stream, mr);
rmm::device_uvector<size_type> row_string_offsets(num_rows + 1, stream, mr);
auto const d_strview_offsets = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_type>([num_strviews_per_row] __device__(size_type const i) {
return i * num_strviews_per_row;
Expand All @@ -353,7 +369,7 @@ std::unique_ptr<column> struct_to_strings(table_view const& strings_columns,
row_string_offsets.begin());
auto chars_data = joined_col->release().data;
return make_strings_column(
strings_columns.num_rows(),
num_rows,
std::make_unique<cudf::column>(std::move(row_string_offsets), rmm::device_buffer{}, 0),
std::move(chars_data.release()[0]),
0,
Expand Down Expand Up @@ -677,6 +693,7 @@ struct column_to_strings_fn {
auto col_string = operator()(child_it,
child_it + column.num_children(),
children_names,
column.size(),
struct_row_end_wrap.value(stream_));
col_string->set_null_mask(cudf::detail::copy_bitmask(column, stream_, mr_),
column.null_count());
Expand All @@ -688,6 +705,7 @@ struct column_to_strings_fn {
std::unique_ptr<column> operator()(column_iterator column_begin,
column_iterator column_end,
host_span<column_name_info const> children_names,
size_type num_rows,
cudf::string_view const row_end_wrap_value) const
{
auto const num_columns = std::distance(column_begin, column_end);
Expand Down Expand Up @@ -733,6 +751,7 @@ struct column_to_strings_fn {
//
return struct_to_strings(str_table_view,
column_names_view,
num_rows,
struct_row_begin_wrap.value(stream_),
row_end_wrap_value,
struct_value_separator.value(stream_),
Expand Down Expand Up @@ -908,6 +927,7 @@ void write_json_uncompressed(data_sink* out_sink,
auto str_concat_col = converter(sub_view.begin(),
sub_view.end(),
user_column_names,
sub_view.num_rows(),
d_line_terminator_with_row_end.value(stream));

// Needs line_terminator at the end, to separate from next chunk
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ set(cython_sources
interop.pyx
json.pyx
merge.pyx
null_mask.pyx
orc.pyx
parquet.pyx
reduce.pyx
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
interop,
json,
merge,
null_mask,
nvtext,
orc,
parquet,
Expand Down
21 changes: 14 additions & 7 deletions python/cudf/cudf/_lib/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import pylibcudf
import rmm

import cudf
import cudf._lib as libcudf
from cudf.core.buffer import (
Buffer,
ExposureTrackedBuffer,
Expand All @@ -36,7 +35,6 @@ from cudf._lib.types cimport (
dtype_to_pylibcudf_type,
)

from cudf._lib.null_mask import bitmask_allocation_size_bytes
from cudf._lib.types import dtype_from_pylibcudf_column

cimport pylibcudf.libcudf.copying as cpp_copying
Expand Down Expand Up @@ -159,7 +157,10 @@ cdef class Column:
if self.base_mask is None or self.offset == 0:
self._mask = self.base_mask
else:
self._mask = libcudf.null_mask.copy_bitmask(self)
with acquire_spill_lock():
self._mask = as_buffer(
pylibcudf.null_mask.copy_bitmask(self.to_pylibcudf(mode="read"))
)
return self._mask

@property
Expand All @@ -183,7 +184,9 @@ cdef class Column:

if value is not None:
# bitmask size must be relative to offset = 0 data.
required_size = bitmask_allocation_size_bytes(self.base_size)
required_size = pylibcudf.null_mask.bitmask_allocation_size_bytes(
self.base_size
)
if value.size < required_size:
error_msg = (
"The Buffer for mask is smaller than expected, "
Expand Down Expand Up @@ -220,7 +223,7 @@ cdef class Column:
and compute new data Buffers zero-copy that use pointer arithmetic to
properly adjust the pointer.
"""
mask_size = bitmask_allocation_size_bytes(self.size)
mask_size = pylibcudf.null_mask.bitmask_allocation_size_bytes(self.size)
required_num_bytes = -(-self.size // 8) # ceiling divide
error_msg = (
"The value for mask is smaller than expected, got {} bytes, "
Expand Down Expand Up @@ -790,13 +793,17 @@ cdef class Column:
mask = as_buffer(
rmm.DeviceBuffer(
ptr=mask_ptr,
size=bitmask_allocation_size_bytes(base_size)
size=pylibcudf.null_mask.bitmask_allocation_size_bytes(
base_size
)
)
)
else:
mask = as_buffer(
data=mask_ptr,
size=bitmask_allocation_size_bytes(base_size),
size=pylibcudf.null_mask.bitmask_allocation_size_bytes(
base_size
),
owner=mask_owner,
exposed=True
)
Expand Down
99 changes: 62 additions & 37 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -202,46 +202,71 @@ def read_csv(
raise ValueError(
"dtype should be a scalar/str/list-like/dict-like"
)
options = (
plc.io.csv.CsvReaderOptions.builder(plc.io.SourceInfo([datasource]))
.compression(c_compression)
.mangle_dupe_cols(mangle_dupe_cols)
.byte_range_offset(byte_range[0])
.byte_range_size(byte_range[1])
.nrows(nrows if nrows is not None else -1)
.skiprows(skiprows)
.skipfooter(skipfooter)
.quoting(quoting)
.lineterminator(str(lineterminator))
.quotechar(quotechar)
.decimal(decimal)
.delim_whitespace(delim_whitespace)
.skipinitialspace(skipinitialspace)
.skip_blank_lines(skip_blank_lines)
.doublequote(doublequote)
.keep_default_na(keep_default_na)
.na_filter(na_filter)
.dayfirst(dayfirst)
.build()
)

options.set_header(header)

if names is not None:
options.set_names([str(name) for name in names])

if prefix is not None:
options.set_prefix(prefix)

if usecols is not None:
if all(isinstance(col, int) for col in usecols):
options.set_use_cols_indexes(list(usecols))
else:
options.set_use_cols_names([str(name) for name in usecols])

if delimiter is not None:
options.set_delimiter(delimiter)

if thousands is not None:
options.set_thousands(thousands)

lineterminator = str(lineterminator)
if comment is not None:
options.set_comment(comment)

if parse_dates is not None:
options.set_parse_dates(list(parse_dates))

if hex_cols is not None:
options.set_parse_hex(list(hex_cols))

options.set_dtypes(new_dtypes)

if true_values is not None:
options.set_true_values([str(val) for val in true_values])

if false_values is not None:
options.set_false_values([str(val) for val in false_values])

if na_values is not None:
options.set_na_values([str(val) for val in na_values])

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
plc.io.csv.read_csv(
plc.io.SourceInfo([datasource]),
lineterminator=lineterminator,
quotechar = quotechar,
quoting = quoting,
doublequote = doublequote,
header = header,
mangle_dupe_cols = mangle_dupe_cols,
usecols = usecols,
delimiter = delimiter,
delim_whitespace = delim_whitespace,
skipinitialspace = skipinitialspace,
col_names = names,
dtypes = new_dtypes,
skipfooter = skipfooter,
skiprows = skiprows,
dayfirst = dayfirst,
compression = c_compression,
thousands = thousands,
decimal = decimal,
true_values = true_values,
false_values = false_values,
nrows = nrows if nrows is not None else -1,
byte_range_offset = byte_range[0],
byte_range_size = byte_range[1],
skip_blank_lines = skip_blank_lines,
parse_dates = parse_dates,
parse_hex = hex_cols,
comment = comment,
na_values = na_values,
keep_default_na = keep_default_na,
na_filter = na_filter,
prefix = prefix,
)
)
*data_from_pylibcudf_io(plc.io.csv.read_csv(options))
)

if dtype is not None:
Expand Down
65 changes: 0 additions & 65 deletions python/cudf/cudf/_lib/null_mask.pyx

This file was deleted.

Loading

0 comments on commit a652edd

Please sign in to comment.