Skip to content

Commit

Permalink
Merge branch 'cudf-polars-dask-simple' into cudf-polars-dask-groupby-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Nov 21, 2024
2 parents cc9eb18 + 62f10bc commit 242e79d
Show file tree
Hide file tree
Showing 28 changed files with 755 additions and 309 deletions.
3 changes: 3 additions & 0 deletions ci/run_cudf_polars_polars_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ if [[ $(arch) == "aarch64" ]]; then
DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity")
else
# Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image.
# Allow errors since any of these commands could produce empty results that would cause the script to fail.
set +e
glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2)
latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2)
set -e
if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then
DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh")
fi
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ dependencies:
- ptxcompiler
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
- pynvml>=11.4.1,<12.0.0a0
- pytest-benchmark
- pytest-cases>=3.8.2
- pytest-cov
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies:
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
- pynvjitlink>=0.0.0a0
- pynvml>=11.4.1,<12.0.0a0
- pytest-benchmark
- pytest-cases>=3.8.2
- pytest-cov
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/dask-cudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ requirements:
run:
- python
- cudf ={{ version }}
- pynvml >=11.4.1,<12.0.0a0
- rapids-dask-dependency ={{ minor_version }}
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}

Expand Down
4 changes: 4 additions & 0 deletions cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ namespace CUDF_EXPORT cudf {
* @throw cudf::logic_error if the any of the DLTensor fields are unsupported
*
* @param managed_tensor a 1D or 2D column-major (Fortran order) tensor
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table's device memory
*
* @return Table with a copy of the tensor data
*/
std::unique_ptr<table> from_dlpack(
DLManagedTensor const* managed_tensor,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
Expand All @@ -79,12 +81,14 @@ std::unique_ptr<table> from_dlpack(
* or if any of columns have non-zero null count
*
* @param input Table to convert to DLPack
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned DLPack tensor's device memory
*
* @return 1D or 2D DLPack tensor with a copy of the table data, or nullptr
*/
DLManagedTensor* to_dlpack(
table_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/** @} */ // end of group
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ table_with_metadata read_csv(
*/

/**
*@brief Builder to build options for `writer_csv()`.
*@brief Builder to build options for `write_csv()`.
*/
class csv_writer_options_builder;

Expand Down
9 changes: 6 additions & 3 deletions cpp/src/interop/dlpack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,19 @@ DLManagedTensor* to_dlpack(table_view const& input,
} // namespace detail

std::unique_ptr<table> from_dlpack(DLManagedTensor const* managed_tensor,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::from_dlpack(managed_tensor, cudf::get_default_stream(), mr);
return detail::from_dlpack(managed_tensor, stream, mr);
}

DLManagedTensor* to_dlpack(table_view const& input, rmm::device_async_resource_ref mr)
DLManagedTensor* to_dlpack(table_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::to_dlpack(input, cudf::get_default_stream(), mr);
return detail::to_dlpack(input, stream, mr);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ ConfigureTest(STREAM_DICTIONARY_TEST streams/dictionary_test.cpp STREAM_MODE tes
ConfigureTest(STREAM_FILLING_TEST streams/filling_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_GROUPBY_TEST streams/groupby_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_HASHING_TEST streams/hash_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_INTEROP streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JOIN_TEST streams/join_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LABELING_BINS_TEST streams/labeling_bins_test.cpp STREAM_MODE testing)
Expand Down
46 changes: 46 additions & 0 deletions cpp/tests/streams/interop_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>

#include <cudf/interop.hpp>
#include <cudf/table/table_view.hpp>

#include <dlpack/dlpack.h>

struct dlpack_deleter {
void operator()(DLManagedTensor* tensor) { tensor->deleter(tensor); }
};

struct DLPackTest : public cudf::test::BaseFixture {};

TEST_F(DLPackTest, ToDLPack)
{
cudf::table_view empty(std::vector<cudf::column_view>{});
cudf::to_dlpack(empty, cudf::test::get_default_stream());
}

TEST_F(DLPackTest, FromDLPack)
{
using unique_managed_tensor = std::unique_ptr<DLManagedTensor, dlpack_deleter>;
cudf::test::fixed_width_column_wrapper<int32_t> col1({});
cudf::test::fixed_width_column_wrapper<int32_t> col2({});
cudf::table_view input({col1, col2});
unique_managed_tensor tensor(cudf::to_dlpack(input, cudf::test::get_default_stream()));
auto result = cudf::from_dlpack(tensor.get(), cudf::test::get_default_stream());
}
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ dependencies:
common:
- output_types: [conda, requirements, pyproject]
packages:
- pynvml>=11.4.1,<12.0.0a0
- rapids-dask-dependency==25.2.*,>=0.0.0a0
run_custreamz:
common:
Expand Down
108 changes: 34 additions & 74 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types

Expand All @@ -23,16 +19,7 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport sink_info
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

Expand Down Expand Up @@ -318,59 +305,40 @@ def write_csv(
--------
cudf.to_csv
"""
cdef table_view input_table_view = table_view_from_table(
table, not index
)
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, input_table_view)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

index_and_not_empty = index is True and table.index is not None
columns = [
col.to_pylibcudf(mode="read") for col in table.index._columns
] if index_and_not_empty else []
columns.extend(col.to_pylibcudf(mode="read") for col in table._columns)
col_names = []
if header:
all_names = list(table.index.names) if index_and_not_empty else []
all_names.extend(
na_rep if name is None or pd.isnull(name)
else name for name in table._column_names
)
col_names = [
'""' if (name in (None, '') and len(all_names) == 1)
else (str(name) if name not in (None, '') else '')
for name in all_names
]
try:
with nogil:
cpp_write_csv(options)
plc.io.csv.write_csv(
(
plc.io.csv.CsvWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc.Table(columns)
)
.names(col_names)
.na_rep(na_rep)
.include_header(header)
.rows_per_chunk(rows_per_chunk)
.line_terminator(str(lineterminator))
.inter_column_delimiter(str(sep))
.true_value("True")
.false_value("False")
.build()
)
)
except OverflowError:
raise OverflowError(
f"Writing CSV file with chunksize={rows_per_chunk} failed. "
Expand Down Expand Up @@ -419,11 +387,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *:

dtype = cudf.dtype(dtype)
return dtype_to_pylibcudf_type(dtype)


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if pd.isnull(col_name)
else col_name
for col_name in column_names
)
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/dsl/expressions/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
*by: Expr,
) -> None:
self.dtype = dtype
self.options = (options[0], tuple(options[1]), tuple(options[2]))
self.options = options
self.children = (column, *by)

def do_evaluate(
Expand Down
6 changes: 4 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,13 +1599,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
# polars requires that all to-explode columns have the
# same sub-shapes
raise NotImplementedError("Explode with more than one column")
self.options = (tuple(to_explode),)
elif self.name == "rename":
old, new, _ = self.options
old, new, strict = self.options
# TODO: perhaps polars should validate renaming in the IR?
if len(new) != len(set(new)) or (
set(new) & (set(df.schema.keys()) - set(old))
):
raise NotImplementedError("Duplicate new names in rename.")
self.options = (tuple(old), tuple(new), strict)
elif self.name == "unpivot":
indices, pivotees, variable_name, value_name = self.options
value_name = "value" if value_name is None else value_name
Expand All @@ -1631,7 +1633,7 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
def get_hashable(self) -> Hashable: # pragma: no cover; Needed by experimental
"""Hashable representation of the node."""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, self.name, str(self.options), *self.children)
return (type(self), schema_hash, self.name, self.options, *self.children)

@classmethod
def do_evaluate(
Expand Down
3 changes: 2 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ def _(node: pl_expr.Sort, translator: Translator, dtype: plc.DataType) -> expr.E

@_translate_expr.register
def _(node: pl_expr.SortBy, translator: Translator, dtype: plc.DataType) -> expr.Expr:
options = node.sort_options
return expr.SortBy(
dtype,
node.sort_options,
(options[0], tuple(options[1]), tuple(options[2])),
translator.translate_expr(n=node.expr),
*(translator.translate_expr(n=n) for n in node.by),
)
Expand Down
Loading

0 comments on commit 242e79d

Please sign in to comment.