Skip to content

Commit

Permalink
treewide: use coroutine::parallel_for_each(range) when appropriate
Browse files Browse the repository at this point in the history
`coroutine::parallel_for_each` accepts both a range and a pair of
iterators. let's use the former when appropriate. it is simpler this way.

Signed-off-by: Kefu Chai <[email protected]>

Closes scylladb#21684
  • Loading branch information
tchaikov authored and avikivity committed Nov 27, 2024
1 parent 20bbb11 commit 5e391ee
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,7 @@ future<> system_keyspace::drop_truncation_rp_records() {
auto rs = co_await execute_cql(req);

bool any = false;
co_await coroutine::parallel_for_each(rs->begin(), rs->end(), [&] (const cql3::untyped_result_set_row& row) -> future<> {
co_await coroutine::parallel_for_each(*rs, [&] (const cql3::untyped_result_set_row& row) -> future<> {
auto table_uuid = table_id(row.get_as<utils::UUID>("table_uuid"));
auto shard = row.get_as<int32_t>("shard");
auto segment_id = row.get_as<int64_t>("segment_id");
Expand Down Expand Up @@ -1740,7 +1740,7 @@ future<> system_keyspace::drop_all_commitlog_cleanup_records() {
sstring req = format("SELECT shard from system.{}", COMMITLOG_CLEANUPS);
auto rs = co_await execute_cql(req);

co_await coroutine::parallel_for_each(rs->begin(), rs->end(), [&] (const cql3::untyped_result_set_row& row) -> future<> {
co_await coroutine::parallel_for_each(*rs, [&] (const cql3::untyped_result_set_row& row) -> future<> {
auto shard = row.get_as<int32_t>("shard");
co_await execute_cql(format("DELETE FROM system.{} WHERE shard = {}", COMMITLOG_CLEANUPS, shard));
});
Expand Down
2 changes: 1 addition & 1 deletion direct_failure_detector/failure_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ future<> endpoint_worker::notify_fiber() noexcept {
endpoint_liveness.marked_alive = alive;

try {
co_await coroutine::parallel_for_each(listeners.begin(), listeners.end(), [this, endpoint = _id, alive] (const listener_info& listener) {
co_await coroutine::parallel_for_each(listeners, [this, endpoint = _id, alive] (const listener_info& listener) {
return _fd._parent.container().invoke_on(listener.shard, [listener = listener.id, endpoint, alive] (failure_detector& fd) {
return fd._impl->mark(listener, endpoint, alive);
});
Expand Down
2 changes: 1 addition & 1 deletion gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2097,7 +2097,7 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes,

for (;;) {
size_t nodes_down = 0;
co_await coroutine::parallel_for_each(nodes.begin(), nodes.end(), [this, &request, &responses, &nodes_talked, &nodes_down] (gms::inet_address node) -> future<> {
co_await coroutine::parallel_for_each(nodes, [this, &request, &responses, &nodes_talked, &nodes_down] (gms::inet_address node) -> future<> {
logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states);
try {
auto response = co_await ser::gossip_rpc_verbs::send_gossip_get_endpoint_states(&_messaging, msg_addr(node), netw::messaging_service::clock_type::now() + std::chrono::seconds(5), request);
Expand Down
6 changes: 3 additions & 3 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ do_parse_schema_tables(distributed<service::storage_proxy>& proxy, const sstring
auto keyspace_name = r.template get_nonnull<sstring>("keyspace_name");
names.emplace(keyspace_name);
}
co_await coroutine::parallel_for_each(names.begin(), names.end(), [&] (sstring name) mutable -> future<> {
co_await coroutine::parallel_for_each(names, [&] (sstring name) mutable -> future<> {
if (is_system_keyspace(name)) {
co_return;
}
Expand Down Expand Up @@ -724,7 +724,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
batch.commit();
co_await do_parse_schema_tables(proxy, db::schema_tables::TABLES, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
std::map<sstring, schema_ptr> tables = co_await create_tables_from_tables_partition(proxy, v.second);
co_await coroutine::parallel_for_each(tables.begin(), tables.end(), [&] (auto& t) -> future<> {
co_await coroutine::parallel_for_each(tables, [&] (auto& t) -> future<> {
co_await this->add_column_family_and_make_directory(t.second, replica::database::is_new_cf::no);
auto s = t.second;
// Recreate missing column mapping entries in case
Expand All @@ -738,7 +738,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
}));
co_await do_parse_schema_tables(proxy, db::schema_tables::VIEWS, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
std::vector<view_ptr> views = co_await create_views_from_schema_partition(proxy, v.second);
co_await coroutine::parallel_for_each(views.begin(), views.end(), [&] (auto&& v) -> future<> {
co_await coroutine::parallel_for_each(views, [&] (auto&& v) -> future<> {
check_no_legacy_secondary_index_mv_schema(*this, v, nullptr);
co_await this->add_column_family_and_make_directory(v, replica::database::is_new_cf::no);
});
Expand Down
2 changes: 1 addition & 1 deletion service/mapreduce_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_req
retrying_dispatcher dispatcher(*this, tr_state);
query::mapreduce_result result;

co_await coroutine::parallel_for_each(vnodes_per_addr.begin(), vnodes_per_addr.end(),
co_await coroutine::parallel_for_each(vnodes_per_addr,
[&] (std::pair<const netw::messaging_service::msg_addr, dht::partition_range_vector>& vnodes_with_addr) -> future<> {
netw::messaging_service::msg_addr addr = vnodes_with_addr.first;
query::mapreduce_result& result_ = result;
Expand Down
2 changes: 1 addition & 1 deletion service/migration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema,
_messaging.knows_version(endpoint) &&
_messaging.get_raw_version(endpoint) == netw::messaging_service::current_version;
});
co_await coroutine::parallel_for_each(live_members.begin(), live_members.end(),
co_await coroutine::parallel_for_each(live_members,
std::bind(std::mem_fn(&migration_manager::push_schema_mutation), this, std::placeholders::_1, schema));
} catch (...) {
mlogger.error("failed to announce migration to all nodes: {}", std::current_exception());
Expand Down
2 changes: 1 addition & 1 deletion service/qos/service_level_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ future<> service_level_controller::drop_distributed_service_level(sstring name,
auto& role_manager = _auth_service.local().underlying_role_manager();
auto attributes = co_await role_manager.query_attribute_for_all("service_level");

co_await coroutine::parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name, &mc] (auto&& attr) {
co_await coroutine::parallel_for_each(attributes, [&role_manager, name, &mc] (auto&& attr) {
if (attr.second == name) {
return do_with(attr.first, [&role_manager, &mc] (const sstring& role_name) {
return role_manager.remove_attribute(role_name, "service_level", mc);
Expand Down
2 changes: 1 addition & 1 deletion test/boost/reader_concurrency_semaphore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ class allocating_reader {
if (_admission_fut) {
co_await std::move(_admission_fut).value();
}
co_await coroutine::parallel_for_each(_pending_resource_units.begin(), _pending_resource_units.end(), [] (future<reader_permit::resource_units>& fut) {
co_await coroutine::parallel_for_each(_pending_resource_units, [] (future<reader_permit::resource_units>& fut) {
return std::move(fut).then_wrapped([] (future<reader_permit::resource_units>&& fut) {
try {
fut.get();
Expand Down

0 comments on commit 5e391ee

Please sign in to comment.