Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into skip-polars-failing-test
Browse files Browse the repository at this point in the history
  • Loading branch information
vyasr authored Nov 20, 2024
2 parents ddbd289 + 05365af commit b3750c4
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 254 deletions.
43 changes: 29 additions & 14 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
#include <cub/block/block_scan.cuh>
#include <cuco/static_set.cuh>
#include <thrust/fill.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/sequence.h>

#include <cstddef>
Expand Down Expand Up @@ -79,14 +81,9 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
* rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
Expand Down Expand Up @@ -176,15 +173,33 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), lhs_index_type>{d_probe_hasher});

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, {stream.value()});
auto found_indices = rmm::device_uvector<size_type>(probe_table_num_rows, stream);
auto const found_begin =
thrust::make_transform_output_iterator(found_indices.begin(), output_fn{});

// TODO conditional find for nulls once `cuco::static_set::find_if` is added
// If `idx` is within the range `[0, probe_table_num_rows)` and `found_indices[idx]` is not equal
// to `JoinNoneValue`, then `idx` has a match in the hash set.
this->_hash_table.find_async(iter, iter + probe_table_num_rows, found_begin, stream.value());

auto const tuple_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<thrust::tuple<size_type, size_type>>(
[found_iter = found_indices.begin()] __device__(size_type idx) {
return thrust::tuple{*(found_iter + idx), idx};
}));
auto const output_begin =
thrust::make_zip_iterator(build_indices->begin(), probe_indices->begin());
auto const output_end =
thrust::copy_if(rmm::exec_policy_nosync(stream),
tuple_iter,
tuple_iter + probe_table_num_rows,
found_indices.begin(),
output_begin,
cuda::proclaim_return_type<bool>(
[] __device__(size_type idx) { return idx != JoinNoneValue; }));
auto const actual_size = std::distance(output_begin, output_end);

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down
10 changes: 2 additions & 8 deletions python/cudf/cudf/_lib/transform.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from numba.np import numpy_support

import cudf
from cudf.core._internals.expressions import parse_expression
from cudf.core.buffer import acquire_spill_lock, as_buffer
from cudf.utils import cudautils

from pylibcudf cimport transform as plc_transform
from pylibcudf.expressions cimport Expression
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.column cimport Column
Expand Down Expand Up @@ -93,7 +91,7 @@ def one_hot_encode(Column input_column, Column categories):


@acquire_spill_lock()
def compute_column(list columns, tuple column_names, expr: str):
def compute_column(list columns, tuple column_names, str expr):
"""Compute a new column by evaluating an expression on a set of columns.
Parameters
Expand All @@ -108,12 +106,8 @@ def compute_column(list columns, tuple column_names, expr: str):
expr : str
The expression to evaluate.
"""
visitor = parse_expression(expr, column_names)

# At the end, all the stack contains is the expression to evaluate.
cdef Expression cudf_expr = visitor.expression
result = plc_transform.compute_column(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
cudf_expr,
plc.expressions.to_expression(expr, column_names),
)
return Column.from_pylibcudf(result)
229 changes: 0 additions & 229 deletions python/cudf/cudf/core/_internals/expressions.py

This file was deleted.

11 changes: 8 additions & 3 deletions python/custreamz/custreamz/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,14 @@ def read_gdf(
"parquet": cudf.io.read_parquet,
}

result = cudf_readers[message_format](
kafka_datasource, engine="cudf", lines=True
)
if message_format == "json":
result = cudf_readers[message_format](
kafka_datasource, engine="cudf", lines=True
)
else:
result = cudf_readers[message_format](
kafka_datasource, engine="cudf"
)

# Close up the cudf datasource instance
# TODO: Ideally the C++ destructor should handle the
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/expressions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ class Operation(Expression):
left: Expression,
right: Expression | None = None,
): ...

def to_expression(expr: str, column_names: tuple[str, ...]) -> Expression: ...
Loading

0 comments on commit b3750c4

Please sign in to comment.