Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] Add forward iterator to async_for_each #16689

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions src/v/ssx/async_algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,62 @@ static ssize_t remaining(const C& c) {
*/
constexpr ssize_t FIXED_COST = 1;

template<typename I>
struct iter_size {
I iter;
ssize_t count;
};

/**
* A mix of for_each and for_each_n: iterates from begin to end, or until
* limit elements have been visited, whichever comes first, applying f to
* each element.
*
* Returns the number of elements visited as well as the iterator to the first
* unvisited element. This can be implemented more efficiently with random
* access iterators since we can calculate the exact end iterator up front and
* so do an efficient loop with a single sentinel. The forward iterator version
* must increment an count in the loop and check both end iterator and counter
* as the termination condition.
*/
template<std::random_access_iterator I, typename Fn>
iter_size<I> for_each_limit(const I begin, const I end, ssize_t limit, Fn f) {
auto chunk_size = std::min(limit, end - begin);
I chunk_end = begin + chunk_size;
std::for_each(begin, chunk_end, std::move(f));
return {chunk_end, chunk_size};
}

template<std::forward_iterator I, typename Fn>
iter_size<I> for_each_limit(const I begin, const I end, ssize_t limit, Fn f) {
ssize_t count = 0;
auto i = begin;
while (i != end && count < limit) {
f(*i);
++i;
++count;
}
return {i, count};
}

template<
typename Traits,
typename Counter,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<>
async_for_each_coro(Counter counter, Iterator begin, Iterator end, Fn f) {
do {
auto chunk_size = std::min(remaining<Traits>(counter), end - begin);
Iterator chunk_end = begin + chunk_size;
std::for_each(begin, chunk_end, f);
begin = chunk_end;
counter.count += chunk_size;
auto new_begin = for_each_limit(
begin, end, remaining<Traits>(counter), f);
begin = new_begin.iter;
counter.count += new_begin.count;
if (counter.count >= Traits::interval) {
co_await ss::coroutine::maybe_yield();
counter.count = 0;
Traits::yield_called();
}
} while (begin != end);

counter.count += FIXED_COST;
}

/**
Expand All @@ -112,21 +147,23 @@ template<
typename Traits = async_algo_traits,
typename Counter,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<>
async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) {
// This first part is an important optimization: if the input range is small
// enough, we don't want to create a coroutine frame as that's costly, so
// this function is not coroutine and we do the whole iteration here (as we
// won't yield), otherwise we defer to the coroutine-based helper.
if (auto total_size = (end - begin) + FIXED_COST;
total_size <= detail::remaining<Traits>(counter)) {
std::for_each(begin, end, std::move(f));
counter.count += total_size;

ssize_t limit = detail::remaining<Traits>(counter);
auto new_begin = for_each_limit(begin, end, limit, f);
counter.count += new_begin.count + FIXED_COST;
if (new_begin.iter == end && counter.count < Traits::interval) [[likely]] {
return ss::make_ready_future();
}

return async_for_each_coro<Traits>(counter, begin, end, std::move(f));
return async_for_each_coro<Traits>(
counter, new_begin.iter, end, std::move(f));
}

} // namespace detail
Expand All @@ -152,7 +189,7 @@ async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) {
template<
typename Traits = async_algo_traits,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) {
return async_for_each_fast<Traits>(
detail::internal_counter{}, begin, end, std::move(f));
Expand Down Expand Up @@ -208,7 +245,7 @@ ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) {
template<
typename Traits = async_algo_traits,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<> async_for_each_counter(
async_counter& counter, Iterator begin, Iterator end, Fn f) {
return detail::async_for_each_fast<Traits>(
Expand Down
Loading