diff --git a/src/v/ssx/async_algorithm.h b/src/v/ssx/async_algorithm.h index 8b1d819e8ebec..5de8c137fe70a 100644 --- a/src/v/ssx/async_algorithm.h +++ b/src/v/ssx/async_algorithm.h @@ -82,27 +82,62 @@ static ssize_t remaining(const C& c) { */ constexpr ssize_t FIXED_COST = 1; +template +struct iter_size { + I iter; + ssize_t count; +}; + +/** + * A mix of for_each and for_each_n: iterates from begin to end, or until + * limit elements have been visited, whichever comes first, applying f to + * each element. + * + * Returns the number of elements visited as well as the iterator to the first + * unvisited element. This can be implemented more efficiently with random + * access iterators since we can calculate the exact end iterator up front and + * so do an efficient loop with a single sentinel. The forward iterator version + * must increment an count in the loop and check both end iterator and counter + * as the termination condition. + */ +template +iter_size for_each_limit(const I begin, const I end, ssize_t limit, Fn f) { + auto chunk_size = std::min(limit, end - begin); + I chunk_end = begin + chunk_size; + std::for_each(begin, chunk_end, std::move(f)); + return {chunk_end, chunk_size}; +} + +template +iter_size for_each_limit(const I begin, const I end, ssize_t limit, Fn f) { + ssize_t count = 0; + auto i = begin; + while (i != end && count < limit) { + f(*i); + ++i; + ++count; + } + return {i, count}; +} + template< typename Traits, typename Counter, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_coro(Counter counter, Iterator begin, Iterator end, Fn f) { do { - auto chunk_size = std::min(remaining(counter), end - begin); - Iterator chunk_end = begin + chunk_size; - std::for_each(begin, chunk_end, f); - begin = chunk_end; - counter.count += chunk_size; + auto new_begin = for_each_limit( + begin, end, remaining(counter), f); + begin = new_begin.iter; + counter.count += new_begin.count; if (counter.count >= Traits::interval) { co_await ss::coroutine::maybe_yield(); counter.count = 0; Traits::yield_called(); } } while (begin != end); - - counter.count += FIXED_COST; } /** @@ -112,21 +147,23 @@ template< typename Traits = async_algo_traits, typename Counter, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) { // This first part is an important optimization: if the input range is small // enough, we don't want to create a coroutine frame as that's costly, so // this function is not coroutine and we do the whole iteration here (as we // won't yield), otherwise we defer to the coroutine-based helper. - if (auto total_size = (end - begin) + FIXED_COST; - total_size <= detail::remaining(counter)) { - std::for_each(begin, end, std::move(f)); - counter.count += total_size; + + ssize_t limit = detail::remaining(counter); + auto new_begin = for_each_limit(begin, end, limit, f); + counter.count += new_begin.count + FIXED_COST; + if (new_begin.iter == end && counter.count < Traits::interval) [[likely]] { return ss::make_ready_future(); } - return async_for_each_coro(counter, begin, end, std::move(f)); + return async_for_each_coro( + counter, new_begin.iter, end, std::move(f)); } } // namespace detail @@ -152,7 +189,7 @@ async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) { template< typename Traits = async_algo_traits, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { return async_for_each_fast( detail::internal_counter{}, begin, end, std::move(f)); @@ -208,7 +245,7 @@ ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { template< typename Traits = async_algo_traits, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_counter( async_counter& counter, Iterator begin, Iterator end, Fn f) { return detail::async_for_each_fast(