Skip to content

Commit

Permalink
Use stream pool for gather.
Browse files Browse the repository at this point in the history
  • Loading branch information
bdice committed Sep 21, 2023
1 parent e87d2fc commit 7d5e527
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
22 changes: 18 additions & 4 deletions cpp/include/cudf/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
Expand Down Expand Up @@ -658,18 +659,27 @@ std::unique_ptr<table> gather(table_view const& source_table,
{
std::vector<std::unique_ptr<column>> destination_columns;

// TODO: Could be beneficial to use streams internally here
// The data gather for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
auto const num_columns = source_table.num_columns();
auto streams = std::vector<rmm::cuda_stream_view>{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
} else {
streams.push_back(stream);
}

for (auto const& source_column : source_table) {
// The data gather for n columns will be put on the first n streams
for (auto i = 0; i < num_columns; ++i) {
CUDF_FUNC_RANGE();
auto const& source_column = source_table.column(i);
destination_columns.push_back(
cudf::type_dispatcher<dispatch_storage_type>(source_column.type(),
column_gatherer{},
source_column,
gather_map_begin,
gather_map_end,
bounds_policy == out_of_bounds_policy::NULLIFY,
stream,
streams[i],
mr));
}

Expand All @@ -683,6 +693,10 @@ std::unique_ptr<table> gather(table_view const& source_table,
gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr);
}

// Join streams as late as possible so that the gather_bitmask can run on its
// own stream while other streams are gathering. Skip joining if only one
// column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
return std::make_unique<table>(std::move(destination_columns));
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/utilities/stream_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace cudf::detail {
* auto const num_streams = 2;
* // do work on stream
* // allocate streams and wait for an event on stream before executing on any of streams
* auto streams = cudf::detail::fork_stream(stream, num_streams);
* auto streams = cudf::detail::fork_streams(stream, num_streams);
* // do work on streams[0] and streams[1]
* // wait for event on streams before continuing to do work on stream
* cudf::detail::join_streams(streams, stream);
Expand Down

0 comments on commit 7d5e527

Please sign in to comment.