diff --git a/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu b/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu index 16ea6eee5..cc2b2cd2b 100644 --- a/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu +++ b/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu @@ -4,10 +4,10 @@ #include #include -#include #include #include #include "JANA/Engine/JTopologyBuilder.h" +#include struct MyInput : public JObject { @@ -128,8 +128,7 @@ int main() { JMailbox > subevents_out; auto split_arrow = new JSplitArrow("split", &processor, &events_in, &subevents_in); - auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, - &subevents_out); + auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, &subevents_out); auto merge_arrow = new JMergeArrow("merge", &processor, &subevents_out, &events_out); JApplication app; @@ -140,12 +139,14 @@ int main() { source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work // here because we aren't using JComponentManager to manage the EventSource - auto topology = app.GetService()->create_empty(); - auto source_arrow = new JEventSourceArrow("simpleSource", - {source}, - &events_in, - topology->event_pool); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool); + auto topology = app.GetService(); + auto source_arrow = new JEventSourceArrow("simpleSource", {source}); + source_arrow->set_input(topology->event_pool); + source_arrow->set_output(&events_in); + + auto proc_arrow = new JEventMapArrow("simpleProcessor"); + proc_arrow->set_input(&events_out); + proc_arrow->set_output(topology->event_pool); proc_arrow->add_processor(new SimpleProcessor); topology->arrows.push_back(source_arrow); diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index 8617fef82..cef877c02 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -8,9 +8,9 @@ #include #include -#include +#include #include -#include "JANA/Topology/JTopologyBuilder.h" +#include struct MyInput : public JObject { @@ -111,7 +111,7 @@ int main() { source_arrow->set_input(topology->event_pool); source_arrow->set_output(&events_in); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor"); + auto proc_arrow = new JEventMapArrow("simpleProcessor"); proc_arrow->set_input(&events_out); proc_arrow->set_output(topology->event_pool); proc_arrow->add_processor(new SimpleProcessor); diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index bac884cab..627e54a86 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -17,7 +17,6 @@ set(JANA2_SOURCES Engine/JPerfMetrics.cc Engine/JPerfSummary.cc - Topology/JEventProcessorArrow.cc Topology/JEventSourceArrow.cc Topology/JEventMapArrow.cc Topology/JEventTapArrow.cc diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index f47c7329d..09ced8640 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -10,22 +10,24 @@ #include #include #include -#include +#include #ifndef JANA2_ARROWDATA_MAX_SIZE #define JANA2_ARROWDATA_MAX_SIZE 10 #endif -struct PlaceRefBase; +struct Place; + +using EventT = std::shared_ptr; class JArrow { private: - const std::string m_name; // Used for human understanding - const bool m_is_parallel; // Whether or not it is safe to parallelize - const bool m_is_source; // Whether or not this arrow should activate/drain the topology - bool m_is_sink; // Whether or not tnis arrow contributes to the final event count - JArrowMetrics m_metrics; // Performance information accumulated over all workers + std::string m_name; // Used for human understanding + bool m_is_parallel; // Whether or not it is safe to parallelize + bool m_is_source; // Whether or not this arrow should activate/drain the topology + bool m_is_sink; // Whether or not tnis arrow contributes to the final event count + JArrowMetrics m_metrics; // Performance information accumulated over all workers friend class JScheduler; std::vector m_listeners; // Downstream Arrows @@ -34,27 +36,22 @@ class JArrow { // This is usable by subclasses. JLogger m_logger; friend class JTopologyBuilder; - std::vector m_places; // Will eventually supplant m_listeners, m_chunksize + std::vector m_places; // Will eventually supplant m_listeners public: + std::string get_name() { return m_name; } + JLogger& get_logger() { return m_logger; } bool is_parallel() { return m_is_parallel; } bool is_source() { return m_is_source; } bool is_sink() { return m_is_sink; } + JArrowMetrics& get_metrics() { return m_metrics; } - std::string get_name() { return m_name; } - - void set_logger(JLogger logger) { - m_logger = logger; - } + void set_name(std::string name) { m_name = name; } + void set_logger(JLogger logger) { m_logger = logger; } + void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; } + void set_is_source(bool is_source) { m_is_source = is_source; } + void set_is_sink(bool is_sink) { m_is_sink = is_sink; } - void set_is_sink(bool is_sink) { - m_is_sink = is_sink; - } - - // TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize - JArrowMetrics& get_metrics() { - return m_metrics; - } JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) : m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) { @@ -77,16 +74,15 @@ class JArrow { m_listeners.push_back(downstream); }; - void attach(PlaceRefBase* place) { + void attach(Place* place) { if (std::find(m_places.begin(), m_places.end(), place) == m_places.end()) { m_places.push_back(place); } }; }; -template struct Data { - std::array items; + std::array items; size_t item_count = 0; size_t reserve_count = 0; size_t location_id; @@ -96,20 +92,14 @@ struct Data { } }; -struct PlaceRefBase { +struct Place { void* place_ref = nullptr; bool is_queue = true; bool is_input = false; size_t min_item_count = 1; size_t max_item_count = 1; - virtual size_t get_pending() { return 0; } -}; - -template -struct PlaceRef : public PlaceRefBase { - - PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) { + Place(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) { assert(parent != nullptr); parent->attach(this); this->is_input = is_input; @@ -117,38 +107,38 @@ struct PlaceRef : public PlaceRefBase { this->max_item_count = max_item_count; } - void set_queue(JMailbox* queue) { + void set_queue(JMailbox* queue) { assert(queue != nullptr); this->place_ref = queue; this->is_queue = true; } - void set_pool(JPool* pool) { + void set_pool(JEventPool* pool) { assert(pool != nullptr); this->place_ref = pool; this->is_queue = false; } - size_t get_pending() override { + size_t get_pending() { assert(place_ref != nullptr); if (is_input && is_queue) { - auto queue = static_cast*>(place_ref); + auto queue = static_cast*>(place_ref); return queue->size(); } return 0; } - bool pull(Data& data) { + bool pull(Data& data) { assert(place_ref != nullptr); if (is_input) { // Actually pull the data if (is_queue) { - auto queue = static_cast*>(place_ref); + auto queue = static_cast*>(place_ref); data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id); data.reserve_count = data.item_count; return (data.item_count >= min_item_count); } else { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id); data.reserve_count = 0; return (data.item_count >= min_item_count); @@ -158,7 +148,7 @@ struct PlaceRef : public PlaceRefBase { if (is_queue) { // Reserve a space on the output queue data.item_count = 0; - auto queue = static_cast*>(place_ref); + auto queue = static_cast*>(place_ref); data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id); return (data.reserve_count >= min_item_count); } @@ -171,31 +161,31 @@ struct PlaceRef : public PlaceRefBase { } } - void revert(Data& data) { + void revert(Data& data) { assert(place_ref != nullptr); if (is_queue) { - auto queue = static_cast*>(place_ref); + auto queue = static_cast*>(place_ref); queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); } else { if (is_input) { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); pool->push(data.items.data(), data.item_count, false, data.location_id); } } } - size_t push(Data& data) { + size_t push(Data& data) { assert(place_ref != nullptr); if (is_queue) { - auto queue = static_cast*>(place_ref); + auto queue = static_cast*>(place_ref); queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); data.item_count = 0; data.reserve_count = 0; return is_input ? 0 : data.item_count; } else { - auto pool = static_cast*>(place_ref); + auto pool = static_cast(place_ref); pool->push(data.items.data(), data.item_count, !is_input, data.location_id); data.item_count = 0; data.reserve_count = 0; @@ -206,7 +196,7 @@ struct PlaceRef : public PlaceRefBase { inline size_t JArrow::get_pending() { size_t sum = 0; - for (PlaceRefBase* place : m_places) { + for (Place* place : m_places) { sum += place->get_pending(); } return sum; diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index dc4b48936..dfefbef3d 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -25,7 +25,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) { m_procs.push_back(processor); } -void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { +void JEventMapArrow::process(std::shared_ptr* event, bool& success, JArrowMetrics::Status& status) { LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; for (JEventSource* source : m_sources) { diff --git a/src/libraries/JANA/Topology/JEventMapArrow.h b/src/libraries/JANA/Topology/JEventMapArrow.h index e1c0a0343..742213854 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JEventMapArrow.h @@ -11,10 +11,8 @@ class JEventUnfolder; class JEventProcessor; class JEvent; -using Event = std::shared_ptr; -using EventQueue = JMailbox; -class JEventMapArrow : public JPipelineArrow { +class JEventMapArrow : public JPipelineArrow { private: std::vector m_sources; @@ -28,7 +26,7 @@ class JEventMapArrow : public JPipelineArrow { void add_unfolder(JEventUnfolder* unfolder); void add_processor(JEventProcessor* proc); - void process(Event* event, bool& success, JArrowMetrics::Status& status); + void process(std::shared_ptr* event, bool& success, JArrowMetrics::Status& status); void initialize() final; void finalize() final; diff --git a/src/libraries/JANA/Topology/JPool.h b/src/libraries/JANA/Topology/JEventPool.h similarity index 61% rename from src/libraries/JANA/Topology/JPool.h rename to src/libraries/JANA/Topology/JEventPool.h index e7aa3bc39..8871b1251 100644 --- a/src/libraries/JANA/Topology/JPool.h +++ b/src/libraries/JANA/Topology/JEventPool.h @@ -1,74 +1,80 @@ -// Copyright 2023, Jefferson Science Associates, LLC. + +// Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. + #pragma once -#include -#include + +#include +#include +#include #include #include -class JPoolBase { -protected: - size_t m_pool_size; - size_t m_location_count; - bool m_limit_total_events_in_flight; -public: - JPoolBase( - size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight) - : m_pool_size(pool_size) - , m_location_count(location_count) - , m_limit_total_events_in_flight(limit_total_events_in_flight) {} - - virtual ~JPoolBase() = default; -}; - -template -class JPool : public JPoolBase { +class JEventPool { private: struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool { std::mutex mutex; - std::vector available_items; - std::vector items; + std::vector*> available_items; + std::vector> items; }; std::unique_ptr m_pools; + size_t m_pool_size; + size_t m_location_count; + bool m_limit_total_events_in_flight; + + std::shared_ptr m_component_manager; + JEventLevel m_level; + + public: - JPool(size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight) - { + inline JEventPool(std::shared_ptr component_manager, + size_t pool_size, + size_t location_count, + bool limit_total_events_in_flight, + JEventLevel level = JEventLevel::PhysicsEvent) + + : m_pool_size(pool_size) + , m_location_count(location_count) + , m_limit_total_events_in_flight(limit_total_events_in_flight) + , m_component_manager(component_manager) + , m_level(level) { + assert(m_location_count >= 1); assert(m_pool_size > 0 || !m_limit_total_events_in_flight); - } - - virtual ~JPool() = default; - void init() { m_pools = std::unique_ptr(new LocalPool[m_location_count]()); for (size_t j=0; j(m_pool_size); // Default-construct everything in place + m_pools[j].items = std::vector>(m_pool_size); // Default-construct everything in place - for (T& item : m_pools[j].items) { + for (auto& item : m_pools[j].items) { configure_item(&item); m_pools[j].available_items.push_back(&item); } } } - virtual void configure_item(T*) { + void configure_item(std::shared_ptr* item) { + (*item) = std::make_shared(); + m_component_manager->configure_event(**item); + item->get()->SetLevel(m_level); // This needs to happen _after_ configure_event } - virtual void release_item(T*) { + void release_item(std::shared_ptr* item) { + if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item); + (*item)->mFactorySet->Release(); + (*item)->mInspector.Reset(); + (*item)->GetJCallGraphRecorder()->Reset(); + (*item)->Reset(); } - T* get(size_t location=0) { + std::shared_ptr* get(size_t location=0) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). LocalPool& pool = m_pools[location % m_location_count]; @@ -79,20 +85,20 @@ class JPool : public JPoolBase { return nullptr; } else { - auto t = new T; + auto t = new std::shared_ptr(); configure_item(t); return t; } } else { - T* item = pool.available_items.back(); + std::shared_ptr* item = pool.available_items.back(); pool.available_items.pop_back(); return item; } } - void put(T* item, bool release, size_t location) { + void put(std::shared_ptr* item, bool release, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). @@ -118,7 +124,7 @@ class JPool : public JPoolBase { } - size_t pop(T** dest, size_t min_count, size_t max_count, size_t location) { + size_t pop(std::shared_ptr** dest, size_t min_count, size_t max_count, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). @@ -135,7 +141,7 @@ class JPool : public JPoolBase { // Return as many as we can. We aren't allowed to create any more size_t count = std::min(available_count, max_count); for (size_t i=0; i* t = pool.available_items.back(); pool.available_items.pop_back(); dest[i] = t; } @@ -147,13 +153,13 @@ class JPool : public JPoolBase { size_t i=0; for (i=0; i* t = pool.available_items.back(); pool.available_items.pop_back(); dest[i] = t; } for (; i; configure_item(t); dest[i] = t; } @@ -161,7 +167,7 @@ class JPool : public JPoolBase { } } - void push(T** source, size_t count, bool release, size_t location) { + void push(std::shared_ptr** source, size_t count, bool release, size_t location) { for (size_t i=0; i -#include -#include -#include - - -JEventProcessorArrow::JEventProcessorArrow(std::string name) - : JPipelineArrow(std::move(name), true, false, true) {} - -void JEventProcessorArrow::add_processor(JEventProcessor* processor) { - m_processors.push_back(processor); -} - -void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { - - - LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; - for (JEventProcessor* processor : m_processors) { - // TODO: Move me into JEventProcessor::DoMap - JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope - if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { - processor->DoLegacyProcess(*event); - } - else { - processor->DoMap(*event); - processor->DoTap(*event); - - } - } - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; - success = true; - status = JArrowMetrics::Status::KeepGoing; -} - -void JEventProcessorArrow::initialize() { - LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; - for (auto processor : m_processors) { - LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; - processor->DoInitialize(); - LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; - } - LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END; -} - -void JEventProcessorArrow::finalize() { - LOG_DEBUG(m_logger) << "Finalizing arrow " << get_name() << LOG_END; - for (auto processor : m_processors) { - LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END; - processor->DoFinalize(); - LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END; - } - LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END; -} - diff --git a/src/libraries/JANA/Topology/JEventProcessorArrow.h b/src/libraries/JANA/Topology/JEventProcessorArrow.h deleted file mode 100644 index 007c65c54..000000000 --- a/src/libraries/JANA/Topology/JEventProcessorArrow.h +++ /dev/null @@ -1,26 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#pragma once -#include -#include - -class JEventPool; - -using Event = std::shared_ptr; -using EventQueue = JMailbox; - -class JEventProcessorArrow : public JPipelineArrow { - -private: - std::vector m_processors; - -public: - JEventProcessorArrow(std::string name); - void add_processor(JEventProcessor* processor); - void process(Event* event, bool& success, JArrowMetrics::Status& status); - void initialize() final; - void finalize() final; -}; - diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index c0394da18..094008a56 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -6,7 +6,6 @@ #include #include #include -#include @@ -19,8 +18,8 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { auto start_total_time = std::chrono::steady_clock::now(); - Data in_data {location_id}; - Data out_data {location_id}; + Data in_data {location_id}; + Data out_data {location_id}; bool success = m_input.pull(in_data) && m_output.pull(out_data); if (!success) { diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h index 785d2a5ff..a4e5957ce 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -6,8 +6,6 @@ #include using Event = std::shared_ptr; -using EventQueue = JMailbox; -class JEventPool; class JEventSourceArrow : public JArrow { private: @@ -16,8 +14,8 @@ class JEventSourceArrow : public JArrow { bool m_barrier_active = false; std::shared_ptr* m_pending_barrier_event = nullptr; - PlaceRef m_input {this, true, 1, 1}; - PlaceRef m_output {this, false, 1, 1}; + Place m_input {this, true, 1, 1}; + Place m_output {this, false, 1, 1}; public: JEventSourceArrow(std::string name, std::vector sources); @@ -25,13 +23,13 @@ class JEventSourceArrow : public JArrow { void set_input(JMailbox* queue) { m_input.set_queue(queue); } - void set_input(JPool* pool) { + void set_input(JEventPool* pool) { m_input.set_pool(pool); } void set_output(JMailbox* queue) { m_output.set_queue(queue); } - void set_output(JPool* pool) { + void set_output(JEventPool* pool) { m_output.set_pool(pool); } diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc index d02a2c797..7be832c9d 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -3,7 +3,6 @@ #include -#include #include #include #include @@ -16,7 +15,7 @@ void JEventTapArrow::add_processor(JEventProcessor* proc) { m_procs.push_back(proc); } -void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { +void JEventTapArrow::process(std::shared_ptr* event, bool& success, JArrowMetrics::Status& status) { LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; for (JEventProcessor* proc : m_procs) { diff --git a/src/libraries/JANA/Topology/JEventTapArrow.h b/src/libraries/JANA/Topology/JEventTapArrow.h index 856f5863e..dc2a922c7 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.h +++ b/src/libraries/JANA/Topology/JEventTapArrow.h @@ -5,14 +5,11 @@ #include -class JEventPool; class JEventProcessor; class JEvent; -using Event = std::shared_ptr; -using EventQueue = JMailbox; -class JEventTapArrow : public JPipelineArrow { +class JEventTapArrow : public JPipelineArrow { private: std::vector m_procs; @@ -21,7 +18,7 @@ class JEventTapArrow : public JPipelineArrow { JEventTapArrow(std::string name); void add_processor(JEventProcessor* proc); - void process(Event* event, bool& success, JArrowMetrics::Status& status); + void process(std::shared_ptr* event, bool& success, JArrowMetrics::Status& status); void initialize() final; void finalize() final; }; diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index f416a488a..e662aa444 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -4,7 +4,6 @@ #pragma once #include -#include #include class JFoldArrow : public JArrow { @@ -17,9 +16,9 @@ class JFoldArrow : public JArrow { JEventLevel m_parent_level; JEventLevel m_child_level; - PlaceRef m_child_in; - PlaceRef m_child_out; - PlaceRef m_parent_out; + Place m_child_in; + Place m_child_out; + Place m_parent_out; public: JFoldArrow( @@ -86,7 +85,7 @@ class JFoldArrow : public JArrow { LOG_INFO(m_logger) << "Finalized JEventFolder (trivial)" << LOG_END; } - bool try_pull_all(Data& ci, Data& co, Data& po) { + bool try_pull_all(Data& ci, Data& co, Data& po) { bool success; success = m_child_in.pull(ci); if (! success) { @@ -103,7 +102,7 @@ class JFoldArrow : public JArrow { return true; } - size_t push_all(Data& ci, Data& co, Data& po) { + size_t push_all(Data& ci, Data& co, Data& po) { size_t message_count = co.item_count; m_child_in.push(ci); m_child_out.push(co); @@ -115,9 +114,9 @@ class JFoldArrow : public JArrow { auto start_total_time = std::chrono::steady_clock::now(); - Data child_in_data {location_id}; - Data child_out_data {location_id}; - Data parent_out_data {location_id}; + Data child_in_data {location_id}; + Data child_out_data {location_id}; + Data parent_out_data {location_id}; bool success = try_pull_all(child_in_data, child_out_data, parent_out_data); if (success) { diff --git a/src/libraries/JANA/Topology/JJunctionArrow.h b/src/libraries/JANA/Topology/JJunctionArrow.h index c8c07c6bf..be8ca8292 100644 --- a/src/libraries/JANA/Topology/JJunctionArrow.h +++ b/src/libraries/JANA/Topology/JJunctionArrow.h @@ -5,18 +5,18 @@ #include #include -#include +#include -template +template class JJunctionArrow : public JArrow { protected: - PlaceRef first_input {this, true, 1, 1}; - PlaceRef first_output {this, false, 1, 1}; - PlaceRef second_input {this, true, 1, 1}; - PlaceRef second_output {this, false, 1, 1}; + Place first_input {this, true, 1, 1}; + Place first_output {this, false, 1, 1}; + Place second_input {this, true, 1, 1}; + Place second_output {this, false, 1, 1}; public: using Status = JArrowMetrics::Status; @@ -30,7 +30,7 @@ class JJunctionArrow : public JArrow { { } - bool try_pull_all(Data& fi, Data& fo, Data& si, Data& so) { + bool try_pull_all(Data& fi, Data& fo, Data& si, Data& so) { bool success; success = first_input.pull(fi); @@ -58,7 +58,7 @@ class JJunctionArrow : public JArrow { return true; } - size_t push_all(Data& fi, Data& fo, Data& si, Data& so) { + size_t push_all(Data& fi, Data& fo, Data& si, Data& so) { size_t message_count = 0; message_count += first_input.push(fi); message_count += first_output.push(fo); @@ -71,10 +71,10 @@ class JJunctionArrow : public JArrow { auto start_total_time = std::chrono::steady_clock::now(); - Data first_input_data {location_id}; - Data first_output_data {location_id}; - Data second_input_data {location_id}; - Data second_output_data {location_id}; + Data first_input_data {location_id}; + Data first_output_data {location_id}; + Data second_input_data {location_id}; + Data second_output_data {location_id}; bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data); if (success) { diff --git a/src/libraries/JANA/Topology/JPipelineArrow.h b/src/libraries/JANA/Topology/JPipelineArrow.h index 28db84cd3..f48ea14f1 100644 --- a/src/libraries/JANA/Topology/JPipelineArrow.h +++ b/src/libraries/JANA/Topology/JPipelineArrow.h @@ -6,13 +6,15 @@ #include #include -#include +#include -template +using MessageT = std::shared_ptr; + +template class JPipelineArrow : public JArrow { private: - PlaceRef m_input {this, true, 1, 1}; - PlaceRef m_output {this, false, 1, 1}; + Place m_input {this, true, 1, 1}; + Place m_output {this, false, 1, 1}; public: JPipelineArrow(std::string name, @@ -25,13 +27,13 @@ class JPipelineArrow : public JArrow { void set_input(JMailbox* queue) { m_input.set_queue(queue); } - void set_input(JPool* pool) { + void set_input(JEventPool* pool) { m_input.set_pool(pool); } void set_output(JMailbox* queue) { m_output.set_queue(queue); } - void set_output(JPool* pool) { + void set_output(JEventPool* pool) { m_output.set_pool(pool); } @@ -39,8 +41,8 @@ class JPipelineArrow : public JArrow { auto start_total_time = std::chrono::steady_clock::now(); - Data in_data {location_id}; - Data out_data {location_id}; + Data in_data {location_id}; + Data out_data {location_id}; bool success = m_input.pull(in_data) && m_output.pull(out_data); if (!success) { diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index f581e072f..0e4b53181 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -6,11 +6,11 @@ #include "JTopologyBuilder.h" #include "JEventSourceArrow.h" -#include "JEventProcessorArrow.h" #include "JEventMapArrow.h" #include "JEventTapArrow.h" #include "JUnfoldArrow.h" #include "JFoldArrow.h" +#include #include @@ -52,7 +52,7 @@ std::string JTopologyBuilder::print_topology() { i += 1; } // Build index lookup for pools - for (JPoolBase* pool : pools) { + for (JEventPool* pool : pools) { lookup[pool] = i; i += 1; } @@ -63,7 +63,7 @@ std::string JTopologyBuilder::print_topology() { for (JArrow* arrow : arrows) { show_row = true; - for (PlaceRefBase* place : arrow->m_places) { + for (Place* place : arrow->m_places) { if (show_row) { t | arrow->get_name(); t | arrow->is_parallel(); @@ -98,7 +98,6 @@ void JTopologyBuilder::create_topology() { m_pool_capacity, m_location_count, m_limit_total_events_in_flight); - event_pool->init(); if (m_configure_topology) { m_configure_topology(*this); @@ -162,7 +161,7 @@ void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downst queues.push_back(queue); size_t i = 0; - for (PlaceRefBase* place : upstream->m_places) { + for (Place* place : upstream->m_places) { if (!place->is_input) { if (i++ == up_index) { // Found the correct output @@ -172,7 +171,7 @@ void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downst } } i = 0; - for (PlaceRefBase* place : downstream->m_places) { + for (Place* place : downstream->m_places) { if (place->is_input) { if (i++ == down_index) { // Found the correct input @@ -262,7 +261,6 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par // 0. Pool // -------------------------- JEventPool* pool_at_level = new JEventPool(m_components, m_pool_capacity, m_location_count, m_limit_total_events_in_flight, current_level); - pool_at_level->init(); pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology // -------------------------- diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index af1e29d19..7ee9996c3 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include // TODO: Should't be here #include @@ -18,7 +18,6 @@ class JParameterManager; class JComponentManager; class JArrow; class JQueue; -class JPoolBase; class JQueue; class JFoldArrow; class JUnfoldArrow; @@ -32,7 +31,7 @@ class JTopologyBuilder : public JService { // The topology itself std::vector arrows; std::vector queues; // Queues shared between arrows - std::vector pools; // Pools shared between arrows + std::vector pools; // Pools shared between arrows // Topology configuration size_t m_pool_capacity = 4; diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 5dd8c75cd..a5befde01 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -5,19 +5,17 @@ #include #include -#include class JUnfoldArrow : public JArrow { private: - using EventT = std::shared_ptr; JEventUnfolder* m_unfolder = nullptr; EventT* m_parent_event = nullptr; bool m_ready_to_fetch_parent = true; - PlaceRef m_parent_in; - PlaceRef m_child_in; - PlaceRef m_child_out; + Place m_parent_in; + Place m_child_in; + Place m_child_out; public: @@ -65,7 +63,7 @@ class JUnfoldArrow : public JArrow { LOG_INFO(m_logger) << "Finalized JEventUnfolder '" << m_unfolder->GetTypeName() << "'" << LOG_END; } - bool try_pull_all(Data& pi, Data& ci, Data& co) { + bool try_pull_all(Data& pi, Data& ci, Data& co) { bool success; success = m_parent_in.pull(pi); if (! success) { @@ -85,7 +83,7 @@ class JUnfoldArrow : public JArrow { return true; } - size_t push_all(Data& parent_in, Data& child_in, Data& child_out) { + size_t push_all(Data& parent_in, Data& child_in, Data& child_out) { size_t message_count = 0; message_count += m_parent_in.push(parent_in); message_count += m_child_in.push(child_in); @@ -96,7 +94,7 @@ class JUnfoldArrow : public JArrow { size_t get_pending() final { size_t sum = 0; - for (PlaceRefBase* place : m_places) { + for (Place* place : m_places) { sum += place->get_pending(); } if (m_parent_event != nullptr) { @@ -111,9 +109,9 @@ class JUnfoldArrow : public JArrow { auto start_total_time = std::chrono::steady_clock::now(); - Data parent_in_data {location_id}; - Data child_in_data {location_id}; - Data child_out_data {location_id}; + Data parent_in_data {location_id}; + Data child_in_data {location_id}; + Data child_out_data {location_id}; bool success = try_pull_all(parent_in_data, child_in_data, child_out_data); if (success) { diff --git a/src/libraries/JANA/Utils/JCallGraphEntryMaker.h b/src/libraries/JANA/Utils/JCallGraphEntryMaker.h index 23448abb1..f0952148c 100644 --- a/src/libraries/JANA/Utils/JCallGraphEntryMaker.h +++ b/src/libraries/JANA/Utils/JCallGraphEntryMaker.h @@ -18,11 +18,13 @@ /// (and possibly other places). class JCallGraphEntryMaker{ public: + JCallGraphEntryMaker(JCallGraphRecorder &callgraphrecorder, JFactory *factory) : m_call_graph(callgraphrecorder), m_factory(factory){ m_call_graph.StartFactoryCall(m_factory->GetObjectName(), m_factory->GetTag()); } + JCallGraphEntryMaker(JCallGraphRecorder &callgraphrecorder, std::string name) : m_call_graph(callgraphrecorder) { - // (This is used mainly for JEventProcessors and called from JEventProcessorArrow::execute ) + // This is used mainly for JEventProcessors m_call_graph.StartFactoryCall(name, ""); } diff --git a/src/libraries/JANA/Utils/JEventPool.h b/src/libraries/JANA/Utils/JEventPool.h deleted file mode 100644 index 3a3d6ebc5..000000000 --- a/src/libraries/JANA/Utils/JEventPool.h +++ /dev/null @@ -1,44 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#pragma once - -#include -#include -#include - - -class JEventPool : public JPool> { - - std::shared_ptr m_component_manager; - JEventLevel m_level; - -public: - inline JEventPool(std::shared_ptr component_manager, - size_t pool_size, - size_t location_count, - bool limit_total_events_in_flight, - JEventLevel level = JEventLevel::PhysicsEvent) - : JPool(pool_size, location_count, limit_total_events_in_flight) - , m_component_manager(component_manager) - , m_level(level) { - } - - void configure_item(std::shared_ptr* item) override { - (*item) = std::make_shared(); - m_component_manager->configure_event(**item); - item->get()->SetLevel(m_level); // This needs to happen _after_ configure_event - } - - void release_item(std::shared_ptr* item) override { - if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item); - (*item)->mFactorySet->Release(); - (*item)->mInspector.Reset(); - (*item)->GetJCallGraphRecorder()->Reset(); - (*item)->Reset(); - } -}; - - diff --git a/src/programs/unit_tests/Components/UnfoldTests.cc b/src/programs/unit_tests/Components/UnfoldTests.cc index 92293eea0..e7a2cf24c 100644 --- a/src/programs/unit_tests/Components/UnfoldTests.cc +++ b/src/programs/unit_tests/Components/UnfoldTests.cc @@ -51,9 +51,6 @@ TEST_CASE("UnfoldTests_Basic") { JMailbox parent_queue {3}; // size JMailbox child_queue {3}; - parent_pool.init(); - child_pool.init(); - auto ts1 = parent_pool.get(); (*ts1)->SetEventNumber(17); @@ -94,9 +91,6 @@ TEST_CASE("FoldArrowTests") { // We only use these to obtain preconfigured JEvents JEventPool parent_pool {jcm, 5, 1, true, JEventLevel::Timeslice}; // size=5, locations=1, limit_total_events_in_flight=true JEventPool child_pool {jcm, 5, 1, true, JEventLevel::PhysicsEvent}; - parent_pool.init(); - child_pool.init(); - // We set up our test cases by putting events on these queues JMailbox*> child_in; diff --git a/src/programs/unit_tests/Engine/ArrowActivationTests.cc b/src/programs/unit_tests/Engine/ArrowActivationTests.cc index 47f12f2fa..11847d614 100644 --- a/src/programs/unit_tests/Engine/ArrowActivationTests.cc +++ b/src/programs/unit_tests/Engine/ArrowActivationTests.cc @@ -17,21 +17,21 @@ JArrowMetrics::Status steppe(JArrow* arrow) { TEST_CASE("ArrowActivationTests") { - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - auto p1 = new JPool(0,1,false); - auto p2 = new JPool(0,1,false); - p1->init(); - p2->init(); + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - MultByTwoProcessor processor; + auto p1 = new JEventPool(jcm, 0,1,false); + auto p2 = new JEventPool(jcm, 0,1,false); - auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); - auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); - auto sum_everything = new SumSink("sum_everything", q3, p2); + auto emit_rand_ints = new RandIntArrow("emit_rand_ints", p1, q1); + auto multiply_by_two = new MultByTwoArrow("multiply_by_two", q1, q2); + auto subtract_one = new SubOneArrow("subtract_one", q2, q3); + auto sum_everything = new SumArrow("sum_everything", q3, p2); auto topology = std::make_shared(); @@ -88,7 +88,6 @@ TEST_CASE("ArrowActivationTests") { scheduler.run_topology(1); state = scheduler.get_topology_state(); - // TODO: Check that initialize has been called, but not finalize REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); diff --git a/src/programs/unit_tests/Engine/SchedulerTests.cc b/src/programs/unit_tests/Engine/SchedulerTests.cc index aec091ccb..eb4c1678b 100644 --- a/src/programs/unit_tests/Engine/SchedulerTests.cc +++ b/src/programs/unit_tests/Engine/SchedulerTests.cc @@ -12,21 +12,21 @@ TEST_CASE("SchedulerTests") { - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - auto p1 = new JPool(0,1,false); - auto p2 = new JPool(0,1,false); - p1->init(); - p2->init(); + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - MultByTwoProcessor processor; + auto p1 = new JEventPool(jcm, 0,1,false); + auto p2 = new JEventPool(jcm, 0,1,false); - auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); - auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); - auto sum_everything = new SumSink("sum_everything", q3, p2); + auto emit_rand_ints = new RandIntArrow("emit_rand_ints", p1, q1); + auto multiply_by_two = new MultByTwoArrow("multiply_by_two", q1, q2); + auto subtract_one = new SubOneArrow("subtract_one", q2, q3); + auto sum_everything = new SumArrow("sum_everything", q3, p2); auto topology = std::make_shared(); @@ -96,22 +96,22 @@ TEST_CASE("SchedulerTests") { } TEST_CASE("SchedulerRoundRobinBehaviorTests") { + + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); - - auto p1 = new JPool(0,1,false); - auto p2 = new JPool(0,1,false); - p1->init(); - p2->init(); + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - MultByTwoProcessor processor; + auto p1 = new JEventPool(jcm, 0,1,false); + auto p2 = new JEventPool(jcm, 0,1,false); - auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); - auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); - auto sum_everything = new SumSink("sum_everything", q3, p2); + auto emit_rand_ints = new RandIntArrow("emit_rand_ints", p1, q1); + auto multiply_by_two = new MultByTwoArrow("multiply_by_two", q1, q2); + auto subtract_one = new SubOneArrow("subtract_one", q2, q3); + auto sum_everything = new SumArrow("sum_everything", q3, p2); auto topology = std::make_shared(); diff --git a/src/programs/unit_tests/Topology/ArrowTests.cc b/src/programs/unit_tests/Topology/ArrowTests.cc index 20d261b93..1aaf84e75 100644 --- a/src/programs/unit_tests/Topology/ArrowTests.cc +++ b/src/programs/unit_tests/Topology/ArrowTests.cc @@ -1,18 +1,25 @@ #include #include +#include namespace jana { namespace arrowtests { -struct TestMapArrow : public JJunctionArrow { +struct ArrowTestData { + int x; + double y; +}; + +using EventT = std::shared_ptr; +struct TestJunctionArrow : public JJunctionArrow { - TestMapArrow(JMailbox* qi, - JPool* pi, - JPool* pd, - JMailbox* qd) - : JJunctionArrow("testmaparrow", false, false, true) { + TestJunctionArrow(JMailbox* qi, + JEventPool* pi, + JEventPool* pd, + JMailbox* qd) + : JJunctionArrow("testjunctionarrow", false, false, true) { first_input.set_queue(qi); first_output.set_pool(pi); @@ -20,10 +27,10 @@ struct TestMapArrow : public JJunctionArrow { second_output.set_queue(qd); } - Status process(Data& input_int, - Data& output_int, - Data& input_double, - Data& output_double) { + Status process(Data& input_int, + Data& output_int, + Data& input_double, + Data& output_double) { std::cout << "Hello from process" << std::endl; REQUIRE(input_int.item_count == 1); @@ -35,24 +42,27 @@ struct TestMapArrow : public JJunctionArrow { REQUIRE(output_double.item_count == 0); REQUIRE(output_double.reserve_count == 1); - int* x = input_int.items[0]; + EventT* x_event = input_int.items[0]; input_int.items[0] = nullptr; input_int.item_count = 0; // TODO: Maybe user shouldn't be allowed to modify reserve_count at all // TODO Maybe user should only be allowed to push and pull from ... range...? - double* y = input_double.items[0]; + EventT* y_event = input_double.items[0]; input_double.items[0] = nullptr; input_double.item_count = 0; + auto data = (*x_event)->Get(); + int x = data.at(0)->x; // Do something useful here - *y = *x + 22.2; + double y = x + 22.2; + (*y_event)->Insert(new ArrowTestData{.x = x, .y = y}); - output_int.items[0] = x; + output_int.items[0] = x_event; output_int.item_count = 1; - output_double.items[0] = y; + output_double.items[0] = y_event; output_double.item_count = 1; return Status::KeepGoing; } @@ -60,29 +70,33 @@ struct TestMapArrow : public JJunctionArrow { }; -TEST_CASE("ArrowTests_Basic") { +TEST_CASE("ArrowTests_Basic") { - JMailbox qi {2, 1, false}; - JPool pi {5, 1, true}; - JPool pd {5, 1, true}; - JMailbox qd {2, 1, false}; + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - pi.init(); - pd.init(); + JMailbox qi {2, 1, false}; + JEventPool pi {jcm, 5, 1, true}; + JEventPool pd {jcm, 5, 1, true}; + JMailbox qd {2, 1, false}; - TestMapArrow a {&qi, &pi, &pd, &qd}; + TestJunctionArrow a {&qi, &pi, &pd, &qd}; - int* x; + EventT* x = nullptr; pi.pop(&x, 1, 1, 0); - *x = 100; + REQUIRE(x != nullptr); + REQUIRE((*x)->GetEventNumber() == 0); + (*x)->Insert(new ArrowTestData {.x = 100, .y=0}); qi.push_and_unreserve(&x, 1, 0, 0); JArrowMetrics m; a.execute(m, 0); - double* y; + EventT* y; qd.pop_and_reserve(&y, 1, 1, 0); - REQUIRE(*y == 122.2); + auto data = (*y)->Get(); + REQUIRE(data.at(0)->y == 122.2); } diff --git a/src/programs/unit_tests/Topology/JPoolTests.cc b/src/programs/unit_tests/Topology/JPoolTests.cc index a5fd69013..389d2bc37 100644 --- a/src/programs/unit_tests/Topology/JPoolTests.cc +++ b/src/programs/unit_tests/Topology/JPoolTests.cc @@ -1,92 +1,84 @@ +#include "JANA/JApplicationFwd.h" +#include "JANA/Services/JComponentManager.h" #include -#include +#include namespace jana { namespace jpooltests { -struct Event { - int x=3; - double y=7.8; - bool* was_dtor_called = nullptr; - - ~Event() { - if (was_dtor_called != nullptr) { - *was_dtor_called = true; - } - } -}; TEST_CASE("JPoolTests_SingleLocationLimitEvents") { + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - JPool pool(3, 1, true); - pool.init(); + JEventPool pool(jcm, 3, 1, true); - Event* e = pool.get(0); + auto* e = pool.get(0); REQUIRE(e != nullptr); - REQUIRE(e->x == 3); + REQUIRE((*e)->GetEventNumber() == 0); // Will segfault if not initialized - Event* f = pool.get(0); + auto* f = pool.get(0); REQUIRE(f != nullptr); - REQUIRE(f->x == 3); + REQUIRE((*f)->GetEventNumber() == 0); - Event* g = pool.get(0); + auto* g = pool.get(0); REQUIRE(g != nullptr); - REQUIRE(g->x == 3); + REQUIRE((*g)->GetEventNumber() == 0); - Event* h = pool.get(0); + auto* h = pool.get(0); REQUIRE(h == nullptr); - f->x = 5; + (*f)->SetEventNumber(5); pool.put(f, true, 0); h = pool.get(0); REQUIRE(h != nullptr); - REQUIRE(h->x == 5); + REQUIRE((*h)->GetEventNumber() == 5); } TEST_CASE("JPoolTests_SingleLocationUnlimitedEvents") { + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - bool was_dtor_called = false; - JPool pool(3, 1, false); - pool.init(); + JEventPool pool(jcm, 3, 1, false); - Event* e = pool.get(0); - e->was_dtor_called = &was_dtor_called; + auto* e = pool.get(0); REQUIRE(e != nullptr); - REQUIRE(e->x == 3); + REQUIRE((*e)->GetEventNumber() == 0); - Event* f = pool.get(0); - f->was_dtor_called = &was_dtor_called; + auto* f = pool.get(0); + std::weak_ptr f_weak = *f; REQUIRE(f != nullptr); - REQUIRE(f->x == 3); + REQUIRE((*f)->GetEventNumber() == 0); - Event* g = pool.get(0); - g->was_dtor_called = &was_dtor_called; + auto* g = pool.get(0); + std::weak_ptr g_weak = *g; REQUIRE(g != nullptr); - REQUIRE(g->x == 3); + REQUIRE((*g)->GetEventNumber() == 0); - Event* h = pool.get(0); + auto* h = pool.get(0); // Unlike the others, h is allocated on the heap - h->x = 9; - h->was_dtor_called = &was_dtor_called; + (*h)->SetEventNumber(9); + std::weak_ptr h_weak = *h; REQUIRE(h != nullptr); - REQUIRE(g->x == 3); - f->x = 5; + (*f)->SetEventNumber(5); pool.put(f, true, 0); // f goes back into the pool, so dtor does not get called - REQUIRE(was_dtor_called == false); + REQUIRE(f_weak.lock() != nullptr); pool.put(h, true, 0); // h's dtor DOES get called - REQUIRE(was_dtor_called == true); + REQUIRE(h_weak.lock() == nullptr); // When we retrieve another event, it comes from the pool // So we get what used to be f - Event* i = pool.get(0); + auto* i = pool.get(0); REQUIRE(i != nullptr); - REQUIRE(i->x == 5); + REQUIRE((*i)->GetEventNumber() == 5); } diff --git a/src/programs/unit_tests/Topology/MapArrow.h b/src/programs/unit_tests/Topology/MapArrow.h deleted file mode 100644 index 6b3147345..000000000 --- a/src/programs/unit_tests/Topology/MapArrow.h +++ /dev/null @@ -1,87 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#ifndef GREENFIELD_MAPARROW_H -#define GREENFIELD_MAPARROW_H - -#include - - -/// ParallelProcessor transforms S to T and it does so in a way which is thread-safe -/// and ideally stateless. It is conceptually equivalent to the first part -/// of JEventProcessor::Process, i.e. up until the lock is acquired. Alternatively, it could -/// become a JFactorySet, in which case process() would call all Factories present, thereby -/// making sure that everything which can be calculated in parallel has in fact been, before -/// proceeding to the (sequential) Sink. - -template -struct ParallelProcessor { - virtual T process(S s) = 0; -}; - - -/// MapArrow lifts a ParallelProcessor into a streaming async context -template -class MapArrow : public JArrow { - -private: - ParallelProcessor& _processor; - JMailbox *_input_queue; - JMailbox *_output_queue; - -public: - MapArrow(std::string name, ParallelProcessor& processor, JMailbox *input_queue, JMailbox *output_queue) - : JArrow(name, true, false, false) - , _processor(processor) - , _input_queue(input_queue) - , _output_queue(output_queue) { - }; - - void execute(JArrowMetrics& result, size_t /* location_id */) override { - - auto start_total_time = std::chrono::steady_clock::now(); - std::vector xs; - std::vector ys; - xs.reserve(1); - ys.reserve(1); - // TODO: Push/pop single items instead of list - - auto in_status = _input_queue->pop(xs, 1); - - auto start_latency_time = std::chrono::steady_clock::now(); - for (S &x : xs) { - ys.push_back(_processor.process(x)); - } - auto message_count = xs.size(); - auto end_latency_time = std::chrono::steady_clock::now(); - - auto out_status = JMailbox::Status::Ready; - if (!ys.empty()) { - out_status = _output_queue->push(ys); - } - auto end_queue_time = std::chrono::steady_clock::now(); - - - auto latency = (end_latency_time - start_latency_time); - auto overhead = (end_queue_time - start_total_time) - latency; - - JArrowMetrics::Status status; - // if (in_status == JMailbox::Status::Finished) { - // set_status(JActivable::Status::Finished); - // status = JArrowMetrics::Status::Finished; - // } - if (in_status == JMailbox::Status::Ready && out_status == JMailbox::Status::Ready) { - status = JArrowMetrics::Status::KeepGoing; - } - else { - status = JArrowMetrics::Status::ComeBackLater; - } - result.update(status, message_count, 1, latency, overhead); - } - - size_t get_pending() final { return _input_queue->size(); } -}; - - -#endif //GREENFIELD_MAPARROW_H diff --git a/src/programs/unit_tests/Topology/SubeventTests.cc b/src/programs/unit_tests/Topology/SubeventTests.cc index 5ee7573e7..f0881fbab 100644 --- a/src/programs/unit_tests/Topology/SubeventTests.cc +++ b/src/programs/unit_tests/Topology/SubeventTests.cc @@ -7,8 +7,9 @@ #include #include +#include #include -#include +#include #include #include @@ -105,7 +106,7 @@ TEST_CASE("Basic subevent arrow functionality") { source_arrow->set_input(topology.event_pool); source_arrow->set_output(&events_in); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor"); + auto proc_arrow = new JEventMapArrow("simpleProcessor"); proc_arrow->set_input(&events_out); proc_arrow->set_output(topology.event_pool); proc_arrow->add_processor(new SimpleProcessor); diff --git a/src/programs/unit_tests/Topology/TestTopologyComponents.h b/src/programs/unit_tests/Topology/TestTopologyComponents.h index 9ea4b6624..c4df0e46f 100644 --- a/src/programs/unit_tests/Topology/TestTopologyComponents.h +++ b/src/programs/unit_tests/Topology/TestTopologyComponents.h @@ -4,93 +4,105 @@ #pragma once +#include "JANA/Topology/JArrowMetrics.h" #include -#include "MapArrow.h" +#include +struct EventData { + int x = 0; + double y = 0.0; + double z = 0.0; +}; -struct RandIntSource : public JPipelineArrow { +struct RandIntArrow : public JPipelineArrow { size_t emit_limit = 20; // How many to emit size_t emit_count = 0; // How many emitted so far int emit_sum = 0; // Sum of all ints emitted so far - RandIntSource(std::string name, JPool* pool, JMailbox* output_queue) - : JPipelineArrow(name, false, true, false) { + RandIntArrow(std::string name, JEventPool* pool, JMailbox* output_queue) + : JPipelineArrow(name, false, true, false) { this->set_input(pool); this->set_output(output_queue); } - void process(int* item, bool& success, JArrowMetrics::Status& status) { + void process(EventT* event, bool& success, JArrowMetrics::Status& status) { if (emit_count >= emit_limit) { success = false; status = JArrowMetrics::Status::Finished; return; } - *item = 7; - emit_sum += *item; + + auto data = new EventData {7}; + + (*event)->Insert(data, "first"); + + emit_sum += data->x; emit_count += 1; - LOG_DEBUG(JArrow::m_logger) << "RandIntSource emitted event " << emit_count << " with value " << *item << LOG_END; + LOG_DEBUG(JArrow::m_logger) << "RandIntSource emitted event " << emit_count << " with value " << data->x << LOG_END; success = true; status = (emit_count == emit_limit) ? JArrowMetrics::Status::Finished : JArrowMetrics::Status::KeepGoing; // This design lets us declare Finished immediately on the last event, instead of after } - - void initialize() override { - LOG_INFO(JArrow::m_logger) << "RandIntSource.initialize() called!" << LOG_END; - }; - - void finalize() override { - LOG_INFO(JArrow::m_logger) << "RandIntSource.finalize() called!" << LOG_END; - } }; -struct MultByTwoProcessor : public ParallelProcessor { +struct MultByTwoArrow : public JPipelineArrow { + + MultByTwoArrow(std::string name, JMailbox* input_queue, JMailbox* output_queue) + : JPipelineArrow(name, true, false, false) { + this->set_input(input_queue); + this->set_output(output_queue); + } - double* process(int* x) override { - return new double(*x * 2.0); + void process(EventT* event, bool& success, JArrowMetrics::Status& status) { + auto prev = (*event)->Get("first"); + auto x = prev.at(0)->x; + auto next = new EventData { .x=x, .y=x*2.0 }; + (*event)->Insert(next, "second"); + success = true; + status = JArrowMetrics::Status::KeepGoing; } }; +struct SubOneArrow : public JPipelineArrow { -struct SubOneProcessor : public JPipelineArrow { - - SubOneProcessor(std::string name, JMailbox* input_queue, JMailbox* output_queue) - : JPipelineArrow(name, true, false, false) { + SubOneArrow(std::string name, JMailbox* input_queue, JMailbox* output_queue) + : JPipelineArrow(name, true, false, false) { this->set_input(input_queue); this->set_output(output_queue); } - void process(double* item, bool&, JArrowMetrics::Status&) { - *item -= 1; + void process(EventT* event, bool& success, JArrowMetrics::Status& status) { + auto prev = (*event)->Get("second"); + auto x = prev.at(0)->x; + auto y = prev.at(0)->y; + auto z = y - 1; + auto next = new EventData { .x=x, .y=y, .z=z }; + (*event)->Insert(next, "third"); + success = true; + status = JArrowMetrics::Status::KeepGoing; } }; +struct SumArrow : public JPipelineArrow { -template -struct SumSink : public JPipelineArrow, T> { + double sum = 0; - T sum = 0; - - SumSink(std::string name, JMailbox* input_queue, JPool* pool) - : JPipelineArrow,T>(name, false, false, true) { + SumArrow(std::string name, JMailbox* input_queue, JEventPool* pool) + : JPipelineArrow(name, false, false, true) { this->set_input(input_queue); this->set_output(pool); } - void process(T* item, bool&, JArrowMetrics::Status&) { - sum += *item; - LOG_DEBUG(JArrow::m_logger) << "SumSink.outprocess() called!" << LOG_END; + void process(EventT* event, bool& success, JArrowMetrics::Status& status) { + auto prev = (*event)->Get("third"); + auto z = prev.at(0)->z; + sum += z; + success = true; + status = JArrowMetrics::Status::KeepGoing; } - - void initialize() override { - LOG_INFO(JArrow::m_logger) << "SumSink.initialize() called!" << LOG_END; - }; - - void finalize() override { - LOG_INFO(JArrow::m_logger) << "SumSink.finalize() called!" << LOG_END; - }; }; diff --git a/src/programs/unit_tests/Topology/TopologyTests.cc b/src/programs/unit_tests/Topology/TopologyTests.cc index 6cc77975a..fac0d83dd 100644 --- a/src/programs/unit_tests/Topology/TopologyTests.cc +++ b/src/programs/unit_tests/Topology/TopologyTests.cc @@ -2,6 +2,7 @@ // Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. +#include "JANA/Services/JComponentManager.h" #include "catch.hpp" #include "JANA/Topology/JTopologyBuilder.h" @@ -24,22 +25,21 @@ JArrowMetrics::Status step(JArrow* arrow) { TEST_CASE("JTopology: Basic functionality") { + JApplication app; + app.Initialize(); + auto jcm = app.GetService(); - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - auto p1 = new JPool(0,1,false); - auto p2 = new JPool(0,1,false); - p1->init(); - p2->init(); + auto p1 = new JEventPool(jcm, 0,1,false); + auto p2 = new JEventPool(jcm, 0,1,false); - MultByTwoProcessor processor; - - auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); - auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); - auto sum_everything = new SumSink("sum_everything", q3, p2); + auto emit_rand_ints = new RandIntArrow("emit_rand_ints", p1, q1); + auto multiply_by_two = new MultByTwoArrow("multiply_by_two", q1, q2); + auto subtract_one = new SubOneArrow("subtract_one", q2, q3); + auto sum_everything = new SumArrow("sum_everything", q3, p2); auto topology = std::make_shared();