diff --git a/cpp/include/cudf/detail/gather.cuh b/cpp/include/cudf/detail/gather.cuh index 955f9914632..5b4d1243c49 100644 --- a/cpp/include/cudf/detail/gather.cuh +++ b/cpp/include/cudf/detail/gather.cuh @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -658,10 +659,19 @@ std::unique_ptr gather(table_view const& source_table, { std::vector> destination_columns; - // TODO: Could be beneficial to use streams internally here + // The data gather for n columns will be executed over n streams. If there is + // only a single column, the fork/join overhead should be avoided. + auto const num_columns = source_table.num_columns(); + auto streams = std::vector{}; + if (num_columns > 1) { + streams = cudf::detail::fork_streams(stream, num_columns); + } else { + streams.push_back(stream); + } - for (auto const& source_column : source_table) { - // The data gather for n columns will be put on the first n streams + for (auto i = 0; i < num_columns; ++i) { + CUDF_FUNC_RANGE(); + auto const& source_column = source_table.column(i); destination_columns.push_back( cudf::type_dispatcher(source_column.type(), column_gatherer{}, @@ -669,7 +679,7 @@ std::unique_ptr
gather(table_view const& source_table, gather_map_begin, gather_map_end, bounds_policy == out_of_bounds_policy::NULLIFY, - stream, + streams[i], mr)); } @@ -683,6 +693,10 @@ std::unique_ptr
gather(table_view const& source_table, gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr); } + // Join streams as late as possible so that the gather_bitmask can run on its + // own stream while other streams are gathering. Skip joining if only one + // column, since it used the passed in stream rather than forking. + if (num_columns > 1) { cudf::detail::join_streams(streams, stream); } return std::make_unique
(std::move(destination_columns)); } diff --git a/cpp/include/cudf/detail/utilities/stream_pool.hpp b/cpp/include/cudf/detail/utilities/stream_pool.hpp index 95384a9d73e..70014d18fb8 100644 --- a/cpp/include/cudf/detail/utilities/stream_pool.hpp +++ b/cpp/include/cudf/detail/utilities/stream_pool.hpp @@ -40,7 +40,7 @@ namespace cudf::detail { * auto const num_streams = 2; * // do work on stream * // allocate streams and wait for an event on stream before executing on any of streams - * auto streams = cudf::detail::fork_stream(stream, num_streams); + * auto streams = cudf::detail::fork_streams(stream, num_streams); * // do work on streams[0] and streams[1] * // wait for event on streams before continuing to do work on stream * cudf::detail::join_streams(streams, stream);