diff --git a/cpp/include/cudf/detail/gather.cuh b/cpp/include/cudf/detail/gather.cuh
index 5b4d1243c49..cebcd7fc3ac 100644
--- a/cpp/include/cudf/detail/gather.cuh
+++ b/cpp/include/cudf/detail/gather.cuh
@@ -657,31 +657,32 @@ std::unique_ptr
gather(table_view const& source_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
- std::vector> destination_columns;
+ auto const num_columns = source_table.num_columns();
+ auto result = std::vector>(num_columns);
// The data gather for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
- auto const num_columns = source_table.num_columns();
- auto streams = std::vector{};
+ auto streams = std::vector{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
} else {
streams.push_back(stream);
}
- for (auto i = 0; i < num_columns; ++i) {
- CUDF_FUNC_RANGE();
+ auto it = thrust::make_counting_iterator(0);
+
+ std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
auto const& source_column = source_table.column(i);
- destination_columns.push_back(
- cudf::type_dispatcher(source_column.type(),
- column_gatherer{},
- source_column,
- gather_map_begin,
- gather_map_end,
- bounds_policy == out_of_bounds_policy::NULLIFY,
- streams[i],
- mr));
- }
+ return cudf::type_dispatcher(
+ source_column.type(),
+ column_gatherer{},
+ source_column,
+ gather_map_begin,
+ gather_map_end,
+ bounds_policy == out_of_bounds_policy::NULLIFY,
+ streams[i],
+ mr);
+ });
auto const nullable = bounds_policy == out_of_bounds_policy::NULLIFY ||
std::any_of(source_table.begin(), source_table.end(), [](auto const& col) {
@@ -690,14 +691,15 @@ std::unique_ptr gather(table_view const& source_table,
if (nullable) {
auto const op = bounds_policy == out_of_bounds_policy::NULLIFY ? gather_bitmask_op::NULLIFY
: gather_bitmask_op::DONT_CHECK;
- gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr);
+ gather_bitmask(source_table, gather_map_begin, result, op, stream, mr);
}
- // Join streams as late as possible so that the gather_bitmask can run on its
- // own stream while other streams are gathering. Skip joining if only one
- // column, since it used the passed in stream rather than forking.
+ // Join streams as late as possible so that null mask computations can run on
+ // the passed in stream while other streams are gathering. Skip joining if
+ // only one column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
- return std::make_unique(std::move(destination_columns));
+
+ return std::make_unique(std::move(result));
}
} // namespace detail
diff --git a/cpp/include/cudf/detail/scatter.cuh b/cpp/include/cudf/detail/scatter.cuh
index dbf7bfa9527..844988036ca 100644
--- a/cpp/include/cudf/detail/scatter.cuh
+++ b/cpp/include/cudf/detail/scatter.cuh
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -405,22 +406,32 @@ std::unique_ptr scatter(table_view const& source,
thrust::make_transform_iterator(scatter_map_begin, index_converter{target.num_rows()});
auto updated_scatter_map_end =
thrust::make_transform_iterator(scatter_map_end, index_converter{target.num_rows()});
- auto result = std::vector>(target.num_columns());
-
- std::transform(source.begin(),
- source.end(),
- target.begin(),
- result.begin(),
- [=](auto const& source_col, auto const& target_col) {
- return type_dispatcher(source_col.type(),
- column_scatterer{},
- source_col,
- updated_scatter_map_begin,
- updated_scatter_map_end,
- target_col,
- stream,
- mr);
- });
+
+ auto const num_columns = target.num_columns();
+ auto result = std::vector>(num_columns);
+
+ // The data scatter for n columns will be executed over n streams. If there is
+ // only a single column, the fork/join overhead should be avoided.
+ auto streams = std::vector{};
+ if (num_columns > 1) {
+ streams = cudf::detail::fork_streams(stream, num_columns);
+ } else {
+ streams.push_back(stream);
+ }
+
+ auto it = thrust::make_counting_iterator(0);
+
+ std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
+ auto const& source_col = source.column(i);
+ return type_dispatcher(source_col.type(),
+ column_scatterer{},
+ source_col,
+ updated_scatter_map_begin,
+ updated_scatter_map_end,
+ target.column(i),
+ streams[i],
+ mr);
+ });
// We still need to call `gather_bitmask` even when the source columns are not nullable,
// as if the target has null_mask, that null_mask needs to be updated after scattering.
@@ -451,6 +462,12 @@ std::unique_ptr scatter(table_view const& source,
}
});
}
+
+ // Join streams as late as possible so that null mask computations can run on
+ // the passed in stream while other streams are scattering. Skip joining if
+ // only one column, since it used the passed in stream rather than forking.
+ if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
+
return std::make_unique(std::move(result));
}
} // namespace detail