diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index f6d5b7a5a..27385a6c7 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -4,7 +4,6 @@ #include #include -#include #include #include #include diff --git a/src/libraries/JANA/Engine/JSubeventArrow.h b/src/libraries/JANA/Engine/JSubeventArrow.h index a58cc5d00..b643152ec 100644 --- a/src/libraries/JANA/Engine/JSubeventArrow.h +++ b/src/libraries/JANA/Engine/JSubeventArrow.h @@ -10,7 +10,7 @@ #include "JArrow.h" #include -/// SubtaskProcessor offers sub-event-level parallelism. The idea is to split parent +/// SubeventProcessor offers sub-event-level parallelism. The idea is to split parent /// event S into independent subtasks T, and automatically bundling them with /// bookkeeping information X onto a Queue. process :: T -> U handles the stateless, /// parallel parts; its Arrow pushes messages on to a Queue, so that merge() :: S -> [U] -> V @@ -36,6 +36,21 @@ struct JSubeventProcessor { }; +template +struct SubeventWrapper { + + std::shared_ptr* parent; + SubeventT* data; + size_t id; + size_t total; + + SubeventWrapper(std::shared_ptr* parent, SubeventT* data, size_t id, size_t total) + : parent(std::move(parent)) + , data(data) + , id(id) + , total(total) {} +}; + template class JSubeventArrow : public JArrow { diff --git a/src/libraries/JANA/Engine/JSubeventMailbox.h b/src/libraries/JANA/Engine/JSubeventMailbox.h deleted file mode 100644 index 586b5b5eb..000000000 --- a/src/libraries/JANA/Engine/JSubeventMailbox.h +++ /dev/null @@ -1,226 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_JSUBEVENTMAILBOX_H -#define JANA2_JSUBEVENTMAILBOX_H - -#include -#include -#include -#include -#include - -template -struct SubeventWrapper { - - std::shared_ptr* parent; - SubeventT* data; - size_t id; - size_t total; - - SubeventWrapper(std::shared_ptr* parent, SubeventT* data, size_t id, size_t total) - : parent(std::move(parent)) - , data(data) - , id(id) - , total(total) {} -}; - - -template -class JSubeventMailbox { - -private: - - struct alignas(JANA2_CACHE_LINE_BYTES) LocalMailbox { - std::mutex mutex; - std::deque*> ready; - std::map*, size_t> in_progress; - size_t reserved_count = 0; - }; - - size_t m_threshold; - size_t m_locations_count; - bool m_enable_work_stealing = false; - std::unique_ptr m_mailboxes; - JLogger m_logger; - -public: - - enum class Status {Ready, Congested, Empty, Full, Finished}; - - friend std::ostream& operator<<(std::ostream& os, const Status& s) { - switch (s) { - case Status::Ready: os << "Ready"; break; - case Status::Congested: os << "Congested"; break; - case Status::Empty: os << "Empty"; break; - case Status::Full: os << "Full"; break; - case Status::Finished: os << "Finished"; break; - default: os << "Unknown"; break; - } - return os; - } - - - /// threshold: the (soft) maximum number of items in the queue at any time - /// domain_count: the number of domains - /// enable_work_stealing: allow domains to pop from other domains' queues when theirs is empty - JSubeventMailbox(size_t threshold=100, size_t locations_count=1, bool enable_work_stealing=false) - : m_threshold(threshold) - , m_locations_count(locations_count) - , m_enable_work_stealing(enable_work_stealing) { - - m_mailboxes = std::unique_ptr(new LocalMailbox[locations_count]); - } - - virtual ~JSubeventMailbox() { - //delete [] m_mailboxes; - } - - /// size() counts the number of items in the queue across all domains - /// This should be used sparingly because it will mess up a bunch of caches. - /// Meant to be used by measure_perf() - size_t size() { - size_t result = 0; - for (size_t i = 0; i lock(m_mailboxes[i].mutex); - result += m_mailboxes[i].queue.size(); - } - return result; - }; - - /// size(domain) counts the number of items in the queue for a particular domain - /// Meant to be used by Scheduler::next_assignment() and measure_perf(), eventually - size_t size(size_t domain) { - return m_mailboxes[domain].queue.size(); - } - - /// reserve(requested_count) keeps our queues bounded in size. The caller should - /// reserve their desired chunk size on the output queue first. The output - /// queue will return a reservation which is less than or equal to requested_count. - /// The caller may then request as many items from the input queue as have been - /// reserved on the output queue. Note that because the input queue may return - /// fewer items than requested, the caller must push their original reserved_count - /// alongside the items, to avoid a "reservation leak". - size_t reserve(size_t requested_count, size_t domain = 0) { - - LocalMailbox& mb = m_mailboxes[domain]; - std::lock_guard lock(mb.mutex); - size_t doable_count = m_threshold - mb.queue.size() - mb.reserved_count; - if (doable_count > 0) { - size_t reservation = std::min(doable_count, requested_count); - mb.reserved_count += reservation; - return reservation; - } - return 0; - }; - - /// push(items, reserved_count, domain) This function will always - /// succeed, although it may exceed the threshold if the caller didn't reserve - /// space, and it may take a long time because it will wait on a mutex. - /// Note that if the caller had called reserve(), they must pass in the reserved_count here. - Status push(std::vector>& buffer, size_t reserved_count = 0, size_t domain = 0) { - - auto& mb = m_mailboxes[domain]; - std::lock_guard lock(mb.mutex); - mb.reserved_count -= reserved_count; - for (const auto& subevent : buffer) { - - // Problem: Are we sure we are updating the event in a way which is effectively thread-safe? - // Should we be doing this insert, or should the caller? - (*(subevent.parent))->template Insert(subevent.data); - if (subevent.total == 1) { - // Goes straight into "ready" - mb.ready.push_back(subevent.parent); - } - else { - auto pair = mb.in_progress.find(subevent.parent); - if (pair == mb.in_progress.end()) { - mb.in_progress[subevent.parent] = subevent.total-1; - } - else { - if (pair->second == 1) { - mb.ready.push_back(subevent.parent); - } - else { - pair->second -= 1; - } - } - } - } - buffer.clear(); - // if (mb.in_progress.size() > m_threshold) { - // return Status::Full; - // } - return Status::Ready; - } - - - /// pop() will pop up to requested_count items for the desired domain. - /// If many threads are contending for the queue, this will fail with Status::Contention, - /// in which case the caller should probably consult the Scheduler. - Status pop(std::vector*>& buffer, size_t requested_count, size_t location_id = 0) { - - auto& mb = m_mailboxes[location_id]; - if (!mb.mutex.try_lock()) { - return Status::Congested; - } - auto nitems = std::min(requested_count, mb.ready.size()); - buffer.reserve(nitems); - for (size_t i=0; i= m_threshold) { - return Status::Full; - } - else if (size != 0) { - return Status::Ready; - } - return Status::Empty; - } - -#if 0 - Status pop(std::shared_ptr item, bool& success, size_t location_id = 0) { - - success = false; - auto& mb = m_mailboxes[location_id]; - if (!mb.mutex.try_lock()) { - return Status::Congested; - } - size_t nitems = mb.ready.size(); - if (nitems > 1) { - item = std::move(mb.ready.front()); - mb.ready.pop_front(); - success = true; - mb.mutex.unlock(); - return Status::Ready; - } - else if (nitems == 1) { - item = std::move(mb.ready.front()); - mb.ready.pop_front(); - success = true; - mb.mutex.unlock(); - return Status::Empty; - } - else if (is_active()) { - mb.mutex.unlock(); - return Status::Empty; - } - mb.mutex.unlock(); - return Status::Finished; - } - -#endif - - size_t get_threshold() { return m_threshold; } - void set_threshold(size_t threshold) { m_threshold = threshold; } - -}; - - - -#endif //JANA2_JSUBEVENTMAILBOX_H diff --git a/src/programs/unit_tests/SubeventTests.cc b/src/programs/unit_tests/SubeventTests.cc index 92ba32189..6a3331119 100644 --- a/src/programs/unit_tests/SubeventTests.cc +++ b/src/programs/unit_tests/SubeventTests.cc @@ -7,7 +7,6 @@ #include #include -#include #include #include "JANA/Engine/JArrowTopology.h" #include "JANA/Engine/JTopologyBuilder.h" @@ -42,7 +41,7 @@ TEST_CASE("Create subevent processor") { REQUIRE(output->z == 29.6f); } - +#if 0 TEST_CASE("Simplest working SubeventMailbox") { std::vector> unmerged; @@ -123,7 +122,7 @@ TEST_CASE("SubeventMailbox with two overlapping events") { auto items_in_event = (*(merged[0]))->Get(); REQUIRE(items_in_event.size() == 4); } - +#endif TEST_CASE("Basic subevent arrow functionality") {