diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index caa06d2b1..09ced8640 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -10,7 +10,7 @@ #include #include #include -#include +#include #ifndef JANA2_ARROWDATA_MAX_SIZE @@ -36,7 +36,7 @@ class JArrow { // This is usable by subclasses. JLogger m_logger; friend class JTopologyBuilder; - std::vector m_places; // Will eventually supplant m_listeners, m_chunksize + std::vector m_places; // Will eventually supplant m_listeners public: std::string get_name() { return m_name; } @@ -113,7 +113,7 @@ struct Place { this->is_queue = true; } - void set_pool(JPool* pool) { + void set_pool(JEventPool* pool) { assert(pool != nullptr); this->place_ref = pool; this->is_queue = false; @@ -138,7 +138,7 @@ struct Place { return (data.item_count >= min_item_count); } else { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id); data.reserve_count = 0; return (data.item_count >= min_item_count); @@ -169,7 +169,7 @@ struct Place { } else { if (is_input) { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); pool->push(data.items.data(), data.item_count, false, data.location_id); } } @@ -185,7 +185,7 @@ struct Place { return is_input ? 0 : data.item_count; } else { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); pool->push(data.items.data(), data.item_count, !is_input, data.location_id); data.item_count = 0; data.reserve_count = 0; diff --git a/src/libraries/JANA/Topology/JPool.h b/src/libraries/JANA/Topology/JEventPool.h similarity index 62% rename from src/libraries/JANA/Topology/JPool.h rename to src/libraries/JANA/Topology/JEventPool.h index e7aa3bc39..504fbde2b 100644 --- a/src/libraries/JANA/Topology/JPool.h +++ b/src/libraries/JANA/Topology/JEventPool.h @@ -1,74 +1,83 @@ -// Copyright 2023, Jefferson Science Associates, LLC. + +// Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. + #pragma once -#include -#include + +#include +#include +#include #include #include -class JPoolBase { -protected: - size_t m_pool_size; - size_t m_location_count; - bool m_limit_total_events_in_flight; -public: - JPoolBase( - size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight) - : m_pool_size(pool_size) - , m_location_count(location_count) - , m_limit_total_events_in_flight(limit_total_events_in_flight) {} - - virtual ~JPoolBase() = default; -}; - -template -class JPool : public JPoolBase { +class JEventPool { private: struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool { std::mutex mutex; - std::vector available_items; - std::vector items; + std::vector*> available_items; + std::vector> items; }; std::unique_ptr m_pools; + size_t m_pool_size; + size_t m_location_count; + bool m_limit_total_events_in_flight; + + std::shared_ptr m_component_manager; + JEventLevel m_level; + + public: - JPool(size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight) - { + inline JEventPool(std::shared_ptr component_manager, + size_t pool_size, + size_t location_count, + bool limit_total_events_in_flight, + JEventLevel level = JEventLevel::PhysicsEvent) + + : m_pool_size(pool_size) + , m_location_count(location_count) + , m_limit_total_events_in_flight(limit_total_events_in_flight) + , m_component_manager(component_manager) + , m_level(level) { + assert(m_location_count >= 1); assert(m_pool_size > 0 || !m_limit_total_events_in_flight); } - virtual ~JPool() = default; void init() { m_pools = std::unique_ptr(new LocalPool[m_location_count]()); for (size_t j=0; j(m_pool_size); // Default-construct everything in place + m_pools[j].items = std::vector>(m_pool_size); // Default-construct everything in place - for (T& item : m_pools[j].items) { + for (auto& item : m_pools[j].items) { configure_item(&item); m_pools[j].available_items.push_back(&item); } } } - virtual void configure_item(T*) { + void configure_item(std::shared_ptr* item) { + (*item) = std::make_shared(); + m_component_manager->configure_event(**item); + item->get()->SetLevel(m_level); // This needs to happen _after_ configure_event } - virtual void release_item(T*) { + void release_item(std::shared_ptr* item) { + if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item); + (*item)->mFactorySet->Release(); + (*item)->mInspector.Reset(); + (*item)->GetJCallGraphRecorder()->Reset(); + (*item)->Reset(); } - T* get(size_t location=0) { + std::shared_ptr* get(size_t location=0) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). LocalPool& pool = m_pools[location % m_location_count]; @@ -79,20 +88,20 @@ class JPool : public JPoolBase { return nullptr; } else { - auto t = new T; + auto t = new std::shared_ptr(); configure_item(t); return t; } } else { - T* item = pool.available_items.back(); + std::shared_ptr* item = pool.available_items.back(); pool.available_items.pop_back(); return item; } } - void put(T* item, bool release, size_t location) { + void put(std::shared_ptr* item, bool release, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). @@ -118,7 +127,7 @@ class JPool : public JPoolBase { } - size_t pop(T** dest, size_t min_count, size_t max_count, size_t location) { + size_t pop(std::shared_ptr** dest, size_t min_count, size_t max_count, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). @@ -135,7 +144,7 @@ class JPool : public JPoolBase { // Return as many as we can. We aren't allowed to create any more size_t count = std::min(available_count, max_count); for (size_t i=0; i* t = pool.available_items.back(); pool.available_items.pop_back(); dest[i] = t; } @@ -147,13 +156,13 @@ class JPool : public JPoolBase { size_t i=0; for (i=0; i* t = pool.available_items.back(); pool.available_items.pop_back(); dest[i] = t; } for (; i; configure_item(t); dest[i] = t; } @@ -161,7 +170,7 @@ class JPool : public JPoolBase { } } - void push(T** source, size_t count, bool release, size_t location) { + void push(std::shared_ptr** source, size_t count, bool release, size_t location) { for (size_t i=0; i -#include #include #include diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 3f3f7a77a..094008a56 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -6,7 +6,6 @@ #include #include #include -#include diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h index b61cee968..a4e5957ce 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -23,13 +23,13 @@ class JEventSourceArrow : public JArrow { void set_input(JMailbox* queue) { m_input.set_queue(queue); } - void set_input(JPool* pool) { + void set_input(JEventPool* pool) { m_input.set_pool(pool); } void set_output(JMailbox* queue) { m_output.set_queue(queue); } - void set_output(JPool* pool) { + void set_output(JEventPool* pool) { m_output.set_pool(pool); } diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc index 0bbc96252..7be832c9d 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -3,7 +3,6 @@ #include -#include #include #include #include diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index 462e88728..e662aa444 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -4,7 +4,6 @@ #pragma once #include -#include #include class JFoldArrow : public JArrow { diff --git a/src/libraries/JANA/Topology/JJunctionArrow.h b/src/libraries/JANA/Topology/JJunctionArrow.h index 611e4ae00..be8ca8292 100644 --- a/src/libraries/JANA/Topology/JJunctionArrow.h +++ b/src/libraries/JANA/Topology/JJunctionArrow.h @@ -5,7 +5,7 @@ #include #include -#include +#include diff --git a/src/libraries/JANA/Topology/JPipelineArrow.h b/src/libraries/JANA/Topology/JPipelineArrow.h index 63f6983bf..f48ea14f1 100644 --- a/src/libraries/JANA/Topology/JPipelineArrow.h +++ b/src/libraries/JANA/Topology/JPipelineArrow.h @@ -6,7 +6,7 @@ #include #include -#include +#include using MessageT = std::shared_ptr; @@ -27,13 +27,13 @@ class JPipelineArrow : public JArrow { void set_input(JMailbox* queue) { m_input.set_queue(queue); } - void set_input(JPool* pool) { + void set_input(JEventPool* pool) { m_input.set_pool(pool); } void set_output(JMailbox* queue) { m_output.set_queue(queue); } - void set_output(JPool* pool) { + void set_output(JEventPool* pool) { m_output.set_pool(pool); } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 54f0a5548..b0c7cb67e 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -52,7 +52,7 @@ std::string JTopologyBuilder::print_topology() { i += 1; } // Build index lookup for pools - for (JPoolBase* pool : pools) { + for (JEventPool* pool : pools) { lookup[pool] = i; i += 1; } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index af1e29d19..7ee9996c3 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include // TODO: Should't be here #include @@ -18,7 +18,6 @@ class JParameterManager; class JComponentManager; class JArrow; class JQueue; -class JPoolBase; class JQueue; class JFoldArrow; class JUnfoldArrow; @@ -32,7 +31,7 @@ class JTopologyBuilder : public JService { // The topology itself std::vector arrows; std::vector queues; // Queues shared between arrows - std::vector pools; // Pools shared between arrows + std::vector pools; // Pools shared between arrows // Topology configuration size_t m_pool_capacity = 4; diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 585d03c91..a5befde01 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -5,7 +5,6 @@ #include #include -#include class JUnfoldArrow : public JArrow { private: diff --git a/src/libraries/JANA/Utils/JEventPool.h b/src/libraries/JANA/Utils/JEventPool.h deleted file mode 100644 index 3a3d6ebc5..000000000 --- a/src/libraries/JANA/Utils/JEventPool.h +++ /dev/null @@ -1,44 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#pragma once - -#include -#include -#include - - -class JEventPool : public JPool> { - - std::shared_ptr m_component_manager; - JEventLevel m_level; - -public: - inline JEventPool(std::shared_ptr component_manager, - size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight, - JEventLevel level = JEventLevel::PhysicsEvent) - : JPool(pool_size, location_count, limit_total_events_in_flight) - , m_component_manager(component_manager) - , m_level(level) { - } - - void configure_item(std::shared_ptr* item) override { - (*item) = std::make_shared(); - m_component_manager->configure_event(**item); - item->get()->SetLevel(m_level); // This needs to happen _after_ configure_event - } - - void release_item(std::shared_ptr* item) override { - if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item); - (*item)->mFactorySet->Release(); - (*item)->mInspector.Reset(); - (*item)->GetJCallGraphRecorder()->Reset(); - (*item)->Reset(); - } -}; - - diff --git a/src/programs/unit_tests/Engine/SchedulerTests.cc b/src/programs/unit_tests/Engine/SchedulerTests.cc index 8ec711a1c..68cac604d 100644 --- a/src/programs/unit_tests/Engine/SchedulerTests.cc +++ b/src/programs/unit_tests/Engine/SchedulerTests.cc @@ -99,13 +99,17 @@ TEST_CASE("SchedulerTests") { } TEST_CASE("SchedulerRoundRobinBehaviorTests") { + + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); auto q1 = new JMailbox(); auto q2 = new JMailbox(); auto q3 = new JMailbox(); - auto p1 = new JPool(0,1,false); - auto p2 = new JPool(0,1,false); + auto p1 = new JEventPool(jcm, 0,1,false); + auto p2 = new JEventPool(jcm, 0,1,false); p1->init(); p2->init(); diff --git a/src/programs/unit_tests/Topology/ArrowTests.cc b/src/programs/unit_tests/Topology/ArrowTests.cc index f9102851e..c5613b86b 100644 --- a/src/programs/unit_tests/Topology/ArrowTests.cc +++ b/src/programs/unit_tests/Topology/ArrowTests.cc @@ -2,7 +2,6 @@ #include #include #include -#include namespace jana { namespace arrowtests { @@ -17,9 +16,9 @@ using EventT = std::shared_ptr; struct TestJunctionArrow : public JJunctionArrow { TestJunctionArrow(JMailbox* qi, - JPool* pi, - JPool* pd, - JMailbox* qd) + JEventPool* pi, + JEventPool* pd, + JMailbox* qd) : JJunctionArrow("testjunctionarrow", false, false, true) { first_input.set_queue(qi); diff --git a/src/programs/unit_tests/Topology/JPoolTests.cc b/src/programs/unit_tests/Topology/JPoolTests.cc index a5fd69013..85cc10fa4 100644 --- a/src/programs/unit_tests/Topology/JPoolTests.cc +++ b/src/programs/unit_tests/Topology/JPoolTests.cc @@ -1,92 +1,86 @@ +#include "JANA/JApplicationFwd.h" +#include "JANA/Services/JComponentManager.h" #include -#include +#include namespace jana { namespace jpooltests { -struct Event { - int x=3; - double y=7.8; - bool* was_dtor_called = nullptr; - - ~Event() { - if (was_dtor_called != nullptr) { - *was_dtor_called = true; - } - } -}; TEST_CASE("JPoolTests_SingleLocationLimitEvents") { + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - JPool pool(3, 1, true); + JEventPool pool(jcm, 3, 1, true); pool.init(); - Event* e = pool.get(0); + auto* e = pool.get(0); REQUIRE(e != nullptr); - REQUIRE(e->x == 3); + REQUIRE((*e)->GetEventNumber() == 0); // Will segfault if not initialized - Event* f = pool.get(0); + auto* f = pool.get(0); REQUIRE(f != nullptr); - REQUIRE(f->x == 3); + REQUIRE((*f)->GetEventNumber() == 0); - Event* g = pool.get(0); + auto* g = pool.get(0); REQUIRE(g != nullptr); - REQUIRE(g->x == 3); + REQUIRE((*g)->GetEventNumber() == 0); - Event* h = pool.get(0); + auto* h = pool.get(0); REQUIRE(h == nullptr); - f->x = 5; + (*f)->SetEventNumber(5); pool.put(f, true, 0); h = pool.get(0); REQUIRE(h != nullptr); - REQUIRE(h->x == 5); + REQUIRE((*h)->GetEventNumber() == 5); } TEST_CASE("JPoolTests_SingleLocationUnlimitedEvents") { + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - bool was_dtor_called = false; - JPool pool(3, 1, false); + JEventPool pool(jcm, 3, 1, false); pool.init(); - Event* e = pool.get(0); - e->was_dtor_called = &was_dtor_called; + auto* e = pool.get(0); REQUIRE(e != nullptr); - REQUIRE(e->x == 3); + REQUIRE((*e)->GetEventNumber() == 0); - Event* f = pool.get(0); - f->was_dtor_called = &was_dtor_called; + auto* f = pool.get(0); + std::weak_ptr f_weak = *f; REQUIRE(f != nullptr); - REQUIRE(f->x == 3); + REQUIRE((*f)->GetEventNumber() == 0); - Event* g = pool.get(0); - g->was_dtor_called = &was_dtor_called; + auto* g = pool.get(0); + std::weak_ptr g_weak = *g; REQUIRE(g != nullptr); - REQUIRE(g->x == 3); + REQUIRE((*g)->GetEventNumber() == 0); - Event* h = pool.get(0); + auto* h = pool.get(0); // Unlike the others, h is allocated on the heap - h->x = 9; - h->was_dtor_called = &was_dtor_called; + (*h)->SetEventNumber(9); + std::weak_ptr h_weak = *h; REQUIRE(h != nullptr); - REQUIRE(g->x == 3); - f->x = 5; + (*f)->SetEventNumber(5); pool.put(f, true, 0); // f goes back into the pool, so dtor does not get called - REQUIRE(was_dtor_called == false); + REQUIRE(f_weak.lock() != nullptr); pool.put(h, true, 0); // h's dtor DOES get called - REQUIRE(was_dtor_called == true); + REQUIRE(h_weak.lock() == nullptr); // When we retrieve another event, it comes from the pool // So we get what used to be f - Event* i = pool.get(0); + auto* i = pool.get(0); REQUIRE(i != nullptr); - REQUIRE(i->x == 5); + REQUIRE((*i)->GetEventNumber() == 5); } diff --git a/src/programs/unit_tests/Topology/TestTopologyComponents.h b/src/programs/unit_tests/Topology/TestTopologyComponents.h index 85e9c8ad3..c4df0e46f 100644 --- a/src/programs/unit_tests/Topology/TestTopologyComponents.h +++ b/src/programs/unit_tests/Topology/TestTopologyComponents.h @@ -20,7 +20,7 @@ struct RandIntArrow : public JPipelineArrow { size_t emit_count = 0; // How many emitted so far int emit_sum = 0; // Sum of all ints emitted so far - RandIntArrow(std::string name, JPool* pool, JMailbox* output_queue) + RandIntArrow(std::string name, JEventPool* pool, JMailbox* output_queue) : JPipelineArrow(name, false, true, false) { this->set_input(pool); this->set_output(output_queue); @@ -90,7 +90,7 @@ struct SumArrow : public JPipelineArrow { double sum = 0; - SumArrow(std::string name, JMailbox* input_queue, JPool* pool) + SumArrow(std::string name, JMailbox* input_queue, JEventPool* pool) : JPipelineArrow(name, false, false, true) { this->set_input(input_queue); this->set_output(pool);