Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallel_for_each scalability #1445

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions include/oneapi/tbb/parallel_for_each.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -118,14 +118,17 @@ struct feeder_item_task: public task {
using feeder_type = feeder_impl<Body, Item>;

template <typename ItemType>
feeder_item_task(ItemType&& input_item, feeder_type& feeder, small_object_allocator& alloc) :
feeder_item_task(ItemType&& input_item, feeder_type& feeder, small_object_allocator& alloc, wait_tree_vertex_interface* node) :
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
item(std::forward<ItemType>(input_item)),
my_feeder(feeder),
my_allocator(alloc)
{}
my_allocator(alloc),
m_wait_tree_vertex(node)
{
m_wait_tree_vertex->reserve();
}

void finalize(const execution_data& ed) {
my_feeder.my_wait_context.release();
m_wait_tree_vertex->release();
my_allocator.delete_object(this, ed);
}

Expand Down Expand Up @@ -160,6 +163,7 @@ struct feeder_item_task: public task {
Item item;
feeder_type& my_feeder;
small_object_allocator my_allocator;
wait_tree_vertex_interface* m_wait_tree_vertex;
}; // class feeder_item_task

/** Implements new task adding procedure.
Expand All @@ -170,9 +174,8 @@ class feeder_impl : public feeder<Item> {
void internal_add_copy_impl(std::true_type, const Item& item) {
using feeder_task = feeder_item_task<Body, Item>;
small_object_allocator alloc;
auto task = alloc.new_object<feeder_task>(item, *this, alloc);
auto task = alloc.new_object<feeder_task>(item, *this, alloc, r1::get_thread_reference_vertex(&my_wait_context));

my_wait_context.reserve();
spawn(*task, my_execution_context);
}

Expand All @@ -187,20 +190,19 @@ class feeder_impl : public feeder<Item> {
void internal_add_move(Item&& item) override {
using feeder_task = feeder_item_task<Body, Item>;
small_object_allocator alloc{};
auto task = alloc.new_object<feeder_task>(std::move(item), *this, alloc);
auto task = alloc.new_object<feeder_task>(std::move(item), *this, alloc, r1::get_thread_reference_vertex(&my_wait_context));

my_wait_context.reserve();
spawn(*task, my_execution_context);
}
public:
feeder_impl(const Body& body, wait_context& w_context, task_group_context &context)
feeder_impl(const Body& body, wait_context_vertex& w_context, task_group_context &context)
: my_body(body),
my_wait_context(w_context)
, my_execution_context(context)
{}

const Body& my_body;
wait_context& my_wait_context;
wait_context_vertex& my_wait_context;
task_group_context& my_execution_context;
}; // class feeder_impl

Expand Down Expand Up @@ -263,7 +265,7 @@ struct input_block_handling_task : public task {
using iteration_task_iterator_type = typename input_iteration_task_iterator_helper<Body, Item>::type;
using iteration_task = for_each_iteration_task<iteration_task_iterator_type, Body, Item>;

input_block_handling_task(wait_context& root_wait_context, task_group_context& e_context,
input_block_handling_task(wait_context_vertex& root_wait_context, task_group_context& e_context,
const Body& body, feeder_impl<Body, Item>* feeder_ptr, small_object_allocator& alloc)
:my_size(0), my_wait_context(0), my_root_wait_context(root_wait_context),
my_execution_context(e_context), my_allocator(alloc)
Expand Down Expand Up @@ -312,7 +314,7 @@ struct input_block_handling_task : public task {
aligned_space<iteration_task, max_block_size> task_pool;
std::size_t my_size;
wait_context my_wait_context;
wait_context& my_root_wait_context;
wait_context_vertex& my_root_wait_context;
task_group_context& my_execution_context;
small_object_allocator my_allocator;
}; // class input_block_handling_task
Expand All @@ -326,7 +328,7 @@ struct forward_block_handling_task : public task {
using iteration_task = for_each_iteration_task<Iterator, Body, Item>;

forward_block_handling_task(Iterator first, std::size_t size,
wait_context& w_context, task_group_context& e_context,
wait_context_vertex& w_context, task_group_context& e_context,
const Body& body, feeder_impl<Body, Item>* feeder_ptr,
small_object_allocator& alloc)
: my_size(size), my_wait_context(0), my_root_wait_context(w_context),
Expand Down Expand Up @@ -373,7 +375,7 @@ struct forward_block_handling_task : public task {
aligned_space<iteration_task, max_block_size> task_pool;
std::size_t my_size;
wait_context my_wait_context;
wait_context& my_root_wait_context;
wait_context_vertex& my_root_wait_context;
task_group_context& my_execution_context;
small_object_allocator my_allocator;
}; // class forward_block_handling_task
Expand Down Expand Up @@ -456,15 +458,15 @@ using feeder_is_required = tbb::detail::void_t<decltype(tbb::detail::invoke(std:
// Creates feeder object only if the body can accept it
template <typename Iterator, typename Body, typename Item, typename = void>
struct feeder_holder {
feeder_holder( wait_context&, task_group_context&, const Body& ) {}
feeder_holder( wait_context_vertex&, task_group_context&, const Body& ) {}

feeder_impl<Body, Item>* feeder_ptr() { return nullptr; }
}; // class feeder_holder

template <typename Iterator, typename Body, typename Item>
class feeder_holder<Iterator, Body, Item, feeder_is_required<Body, Iterator, Item>> {
public:
feeder_holder( wait_context& w_context, task_group_context& context, const Body& body )
feeder_holder( wait_context_vertex& w_context, task_group_context& context, const Body& body )
: my_feeder(body, w_context, context) {}

feeder_impl<Body, Item>* feeder_ptr() { return &my_feeder; }
Expand All @@ -475,7 +477,7 @@ class feeder_holder<Iterator, Body, Item, feeder_is_required<Body, Iterator, Ite
template <typename Iterator, typename Body, typename Item>
class for_each_root_task_base : public task {
public:
for_each_root_task_base(Iterator first, Iterator last, const Body& body, wait_context& w_context, task_group_context& e_context)
for_each_root_task_base(Iterator first, Iterator last, const Body& body, wait_context_vertex& w_context, task_group_context& e_context)
: my_first(first), my_last(last), my_wait_context(w_context), my_execution_context(e_context),
my_body(body), my_feeder_holder(my_wait_context, my_execution_context, my_body)
{
Expand All @@ -489,7 +491,7 @@ class for_each_root_task_base : public task {
protected:
Iterator my_first;
Iterator my_last;
wait_context& my_wait_context;
wait_context_vertex& my_wait_context;
task_group_context& my_execution_context;
const Body& my_body;
feeder_holder<Iterator, Body, Item> my_feeder_holder;
Expand Down Expand Up @@ -624,11 +626,11 @@ void run_parallel_for_each( Iterator first, Iterator last, const Body& body, tas
{
if (!(first == last)) {
using ItemType = get_item_type<Body, typename std::iterator_traits<Iterator>::value_type>;
wait_context w_context(0);
wait_context_vertex w_context(0);

for_each_root_task<Iterator, Body, ItemType> root_task(first, last, body, w_context, context);

execute_and_wait(root_task, context, w_context, context);
execute_and_wait(root_task, context, w_context.get_context(), context);
}
}

Expand Down
Loading