diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index b9a35009f..71f5f6b25 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -4,10 +4,13 @@ #include #include -#include #include #include -#include "JANA/Engine/JTopologyBuilder.h" + +#include +#include +#include +#include "JANA/Topology/JTopologyBuilder.h" struct MyInput : public JObject { @@ -82,16 +85,6 @@ struct SimpleProcessor : public JEventProcessor { int main() { - MyProcessor processor; - JMailbox*> events_in; - JMailbox*> events_out; - JMailbox> subevents_in; - JMailbox> subevents_out; - - auto split_arrow = new JSplitArrow("split", &processor, &events_in, &subevents_in); - auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, &subevents_out); - auto merge_arrow = new JMergeArrow("merge", &processor, &subevents_out, &events_out); - JApplication app; app.SetParameterValue("log:info", "JWorker,JScheduler,JArrowProcessingController,JEventProcessorArrow"); app.SetTimeoutEnabled(false); @@ -100,25 +93,39 @@ int main() { auto source = new SimpleSource(); source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work // here because we aren't using JComponentManager to manage the EventSource + MyProcessor processor; + + auto topology = app.GetService(); + topology->set_configure_fn([&](JTopologyBuilder& builder) { + + JMailbox*> events_in; + JMailbox*> events_out; + JMailbox> subevents_in; + JMailbox> subevents_out; + + auto split_arrow = new JSplitArrow("split", &processor, &events_in, &subevents_in); + auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, &subevents_out); + auto merge_arrow = new JMergeArrow("merge", &processor, &subevents_out, &events_out); + + auto source_arrow = new JEventSourceArrow("simpleSource", + {source}, + &events_in, + topology->event_pool); + auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool); + proc_arrow->add_processor(new SimpleProcessor); + + builder.arrows.push_back(source_arrow); + builder.arrows.push_back(split_arrow); + builder.arrows.push_back(subprocess_arrow); + builder.arrows.push_back(merge_arrow); + builder.arrows.push_back(proc_arrow); + + source_arrow->attach(split_arrow); + split_arrow->attach(subprocess_arrow); + subprocess_arrow->attach(merge_arrow); + merge_arrow->attach(proc_arrow); + }); - auto topology = app.GetService()->create_empty(); - auto source_arrow = new JEventSourceArrow("simpleSource", - {source}, - &events_in, - topology->event_pool); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool); - proc_arrow->add_processor(new SimpleProcessor); - - topology->arrows.push_back(source_arrow); - topology->arrows.push_back(split_arrow); - topology->arrows.push_back(subprocess_arrow); - topology->arrows.push_back(merge_arrow); - topology->arrows.push_back(proc_arrow); - - source_arrow->attach(split_arrow); - split_arrow->attach(subprocess_arrow); - subprocess_arrow->attach(merge_arrow); - merge_arrow->attach(proc_arrow); app.Run(true); diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index 9766b5006..45fb77e13 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -26,30 +26,31 @@ set(JANA2_SOURCES JMultifactory.h JService.cc - Engine/JArrow.h - Engine/JArrowMetrics.h - Engine/JArrowPerfSummary.cc - Engine/JArrowPerfSummary.h Engine/JArrowProcessingController.cc Engine/JArrowProcessingController.h - Engine/JArrowTopology.cc - Engine/JArrowTopology.h - Engine/JEventProcessorArrow.cc - Engine/JEventProcessorArrow.h - Engine/JEventSourceArrow.cc - Engine/JEventSourceArrow.h - Engine/JEventMapArrow.h - Engine/JEventMapArrow.cc - Engine/JPool.h - - Engine/JMailbox.h Engine/JScheduler.cc Engine/JScheduler.h - Engine/JSubeventArrow.h Engine/JWorker.h Engine/JWorker.cc Engine/JWorkerMetrics.h - Engine/JTopologyBuilder.h + Engine/JPerfMetrics.cc + Engine/JPerfMetrics.h + Engine/JPerfSummary.cc + Engine/JPerfSummary.h + + Topology/JArrow.h + Topology/JArrowMetrics.h + Topology/JEventProcessorArrow.cc + Topology/JEventProcessorArrow.h + Topology/JEventSourceArrow.cc + Topology/JEventSourceArrow.h + Topology/JEventMapArrow.h + Topology/JEventMapArrow.cc + Topology/JPool.h + Topology/JMailbox.h + Topology/JSubeventArrow.h + Topology/JTopologyBuilder.h + Topology/JTopologyBuilder.cc Services/JComponentManager.cc Services/JComponentManager.h @@ -60,15 +61,11 @@ set(JANA2_SOURCES Services/JParameterManager.h Services/JPluginLoader.cc Services/JPluginLoader.h - Services/JProcessingController.h Services/JServiceLocator.h Services/JEventGroupTracker.h Status/JComponentSummary.h Status/JComponentSummary.cc - Status/JPerfMetrics.cc - Status/JPerfMetrics.h - Status/JPerfSummary.h Streaming/JDiscreteJoin.h Streaming/JEventBuilder.h @@ -245,6 +242,7 @@ file(GLOB jana_cli_headers "CLI/*.h*") file(GLOB jana_compat_headers "Compatibility/*.h*") file(GLOB jana_podio_headers "Podio/*.h*") file(GLOB jana_omni_headers "Omni/*.h*") +file(GLOB jana_topology_headers "Topology/*.h*") install(FILES ${jana_headers} DESTINATION include/JANA) install(FILES ${jana_engine_headers} DESTINATION include/JANA/Engine) @@ -256,6 +254,7 @@ install(FILES ${jana_calibs_headers} DESTINATION include/JANA/Calibrations) install(FILES ${jana_cli_headers} DESTINATION include/JANA/CLI) install(FILES ${jana_compat_headers} DESTINATION include/JANA/Compatibility) install(FILES ${jana_omni_headers} DESTINATION include/JANA/Omni) +install(FILES ${jana_topology_headers} DESTINATION include/JANA/Topology) if (${USE_PODIO}) install(FILES ${jana_podio_headers} DESTINATION include/JANA/Podio) diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index 6fff722a2..3dd70d0ae 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -3,7 +3,8 @@ // Subject to the terms in the LICENSE file found in the top-level directory. #include -#include +#include +#include #include #include @@ -18,6 +19,8 @@ void JArrowProcessingController::acquire_services(JServiceLocator * sl) { m_worker_logger = ls->get_logger("JWorker"); m_scheduler_logger = ls->get_logger("JScheduler"); + m_topology = sl->get(); + // Obtain timeouts from parameter manager auto params = sl->get(); params->SetDefaultParameter("jana:timeout", m_timeout_s, "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely."); @@ -157,14 +160,14 @@ bool JArrowProcessingController::is_timed_out() { // Probably want to refactor so that we only make one such call per ticker iteration. // Since we are storing our metrics summary anyway, we could call measure_performance() // and have print_report(), print_final_report(), is_timed_out(), etc use the cached version - auto metrics = measure_internal_performance(); + auto metrics = measure_performance(); int timeout_s; - if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->event_pool_size / metrics->thread_count) { + if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) { // We are at the beginning and not all events have necessarily had a chance to warm up timeout_s = m_warmup_timeout_s; } - else if (!m_topology->limit_total_events_in_flight) { + else if (!m_topology->m_limit_total_events_in_flight) { // New events are constantly emitted, each of which may contain jfactorysets which need to be warmed up timeout_s = m_warmup_timeout_s; } @@ -219,16 +222,16 @@ JArrowProcessingController::~JArrowProcessingController() { } void JArrowProcessingController::print_report() { - auto metrics = measure_internal_performance(); + auto metrics = measure_performance(); LOG_INFO(m_logger) << "Running" << *metrics << LOG_END; } void JArrowProcessingController::print_final_report() { - auto metrics = measure_internal_performance(); + auto metrics = measure_performance(); LOG_INFO(m_logger) << "Final Report" << *metrics << LOG_END; } -std::unique_ptr JArrowProcessingController::measure_internal_performance() { +std::unique_ptr JArrowProcessingController::measure_performance() { // Measure perf on all Workers first, as this will prompt them to publish // any ArrowMetrics they have collected @@ -275,12 +278,9 @@ std::unique_ptr JArrowProcessingController::measure_int ? std::numeric_limits::infinity() : m_perf_summary.avg_throughput_hz / tighter_bottleneck; - return std::unique_ptr(new JArrowPerfSummary(m_perf_summary)); + return std::unique_ptr(new JPerfSummary(m_perf_summary)); } -std::unique_ptr JArrowProcessingController::measure_performance() { - return measure_internal_performance(); -} diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.h b/src/libraries/JANA/Engine/JArrowProcessingController.h index f502ba29a..608f4d185 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.h +++ b/src/libraries/JANA/Engine/JArrowProcessingController.h @@ -5,55 +5,51 @@ #ifndef JANA2_JARROWPROCESSINGCONTROLLER_H #define JANA2_JARROWPROCESSINGCONTROLLER_H -#include -#include +#include #include -#include -#include +#include #include -class JArrowProcessingController : public JProcessingController { +class JArrowProcessingController : public JService { public: - explicit JArrowProcessingController(std::shared_ptr topology) : m_topology(topology) {}; ~JArrowProcessingController() override; void acquire_services(JServiceLocator *) override; - void initialize() override; - void run(size_t nthreads) override; - void scale(size_t nthreads) override; + void initialize(); + void run(size_t nthreads); + void scale(size_t nthreads); void request_pause(); void wait_until_paused(); - void request_stop() override; - void wait_until_stopped() override; + void request_stop(); + void wait_until_stopped(); - bool is_stopped() override; - bool is_finished() override; - bool is_timed_out() override; - bool is_excepted() override; + bool is_stopped(); + bool is_finished(); + bool is_timed_out(); + bool is_excepted(); - std::vector get_exceptions() const override; + std::vector get_exceptions() const; - std::unique_ptr measure_performance() override; - std::unique_ptr measure_internal_performance(); + std::unique_ptr measure_performance(); - void print_report() override; - void print_final_report() override; + void print_report(); + void print_final_report(); // This is so we can test inline JScheduler* get_scheduler() { return m_scheduler; } private: + std::shared_ptr m_topology; using jclock_t = std::chrono::steady_clock; int m_timeout_s = 8; int m_warmup_timeout_s = 30; - JArrowPerfSummary m_perf_summary; - std::shared_ptr m_topology; // Owned by JArrowProcessingController + JPerfSummary m_perf_summary; JScheduler* m_scheduler = nullptr; std::vector m_workers; diff --git a/src/libraries/JANA/Engine/JArrowTopology.cc b/src/libraries/JANA/Engine/JArrowTopology.cc deleted file mode 100644 index e0893f7c8..000000000 --- a/src/libraries/JANA/Engine/JArrowTopology.cc +++ /dev/null @@ -1,27 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#include "JArrowTopology.h" -#include "JEventProcessorArrow.h" -#include "JEventSourceArrow.h" - -JArrowTopology::JArrowTopology() = default; - -JArrowTopology::~JArrowTopology() { - for (auto arrow : arrows) { - delete arrow; - } - for (auto queue : queues) { - delete queue; - } - for (auto pool : pools) { - delete pool; - } - if (event_pool != nullptr) { - delete event_pool; - } -} - - diff --git a/src/libraries/JANA/Engine/JArrowTopology.h b/src/libraries/JANA/Engine/JArrowTopology.h deleted file mode 100644 index 5a65e5a80..000000000 --- a/src/libraries/JANA/Engine/JArrowTopology.h +++ /dev/null @@ -1,55 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_JARROWTOPOLOGY_H -#define JANA2_JARROWTOPOLOGY_H - - -#include -#include -#include -#include - -#include "JArrow.h" -#include "JMailbox.h" - - -struct JArrowTopology { - - using Event = std::shared_ptr; - using EventQueue = JMailbox; - - explicit JArrowTopology(); - virtual ~JArrowTopology(); - - std::shared_ptr component_manager; - // Ensure that ComponentManager stays alive at least as long as JArrowTopology does - // Otherwise there is a potential use-after-free when JArrowTopology or JArrowProcessingController access components - - JEventPool* event_pool = nullptr; // TODO: Move into pools eventually - JPerfMetrics metrics; - - std::vector arrows; - std::vector queues; // Queues shared between arrows - std::vector pools; // Pools shared between arrows - JProcessorMapping mapping; - - size_t event_pool_size = 1; // Will be defaulted to nthreads by builder - bool limit_total_events_in_flight = true; - bool enable_call_graph_recording = false; - size_t event_queue_threshold = 80; - size_t event_source_chunksize = 40; - size_t event_processor_chunksize = 1; - size_t location_count = 1; - bool enable_stealing = false; - int affinity = 0; // By default, don't pin the CPU at all - int locality = 0; // By default, assume no NUMA domains - - JLogger m_logger; - -}; - - -#endif //JANA2_JARROWTOPOLOGY_H diff --git a/src/libraries/JANA/Status/JPerfMetrics.cc b/src/libraries/JANA/Engine/JPerfMetrics.cc similarity index 100% rename from src/libraries/JANA/Status/JPerfMetrics.cc rename to src/libraries/JANA/Engine/JPerfMetrics.cc diff --git a/src/libraries/JANA/Status/JPerfMetrics.h b/src/libraries/JANA/Engine/JPerfMetrics.h similarity index 100% rename from src/libraries/JANA/Status/JPerfMetrics.h rename to src/libraries/JANA/Engine/JPerfMetrics.h diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.cc b/src/libraries/JANA/Engine/JPerfSummary.cc similarity index 97% rename from src/libraries/JANA/Engine/JArrowPerfSummary.cc rename to src/libraries/JANA/Engine/JPerfSummary.cc index 33c522e71..da33dc4ce 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.cc +++ b/src/libraries/JANA/Engine/JPerfSummary.cc @@ -3,12 +3,12 @@ // Subject to the terms in the LICENSE file found in the top-level directory. -#include "JArrowPerfSummary.h" +#include "JPerfSummary.h" #include #include -std::ostream& operator<<(std::ostream& os, const JArrowPerfSummary& s) { +std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { os << std::endl; os << " Thread team size [count]: " << s.thread_count << std::endl; diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.h b/src/libraries/JANA/Engine/JPerfSummary.h similarity index 77% rename from src/libraries/JANA/Engine/JArrowPerfSummary.h rename to src/libraries/JANA/Engine/JPerfSummary.h index 354a90d7d..4f32f5903 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.h +++ b/src/libraries/JANA/Engine/JPerfSummary.h @@ -7,9 +7,6 @@ #define JANA2_JARROWPERFSUMMARY_H -#include -#include - #include #include @@ -57,7 +54,16 @@ struct WorkerSummary { size_t last_arrow_queue_visit_count; }; -struct JArrowPerfSummary : public JPerfSummary { +struct JPerfSummary { + + size_t monotonic_events_completed = 0; // Since program started + size_t total_events_completed = 0; // Since run or rescale started + size_t latest_events_completed = 0; // Since previous measurement + size_t thread_count = 0; + double total_uptime_s = 0; + double latest_uptime_s = 0; + double avg_throughput_hz = 0; + double latest_throughput_hz = 0; double avg_seq_bottleneck_hz; double avg_par_bottleneck_hz; @@ -66,13 +72,9 @@ struct JArrowPerfSummary : public JPerfSummary { std::vector workers; std::vector arrows; - JArrowPerfSummary() = default; - JArrowPerfSummary(const JArrowPerfSummary&) = default; - virtual ~JArrowPerfSummary() = default; - }; -std::ostream& operator<<(std::ostream& stream, const JArrowPerfSummary& data); +std::ostream& operator<<(std::ostream& stream, const JPerfSummary& data); #endif //JANA2_JARROWPERFSUMMARY_H diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index e411ea0e8..d60225fa3 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -6,11 +6,11 @@ #include "JScheduler.h" #include -#include +#include #include -JScheduler::JScheduler(std::shared_ptr topology) +JScheduler::JScheduler(std::shared_ptr topology) : m_topology(topology) { m_topology_state.next_arrow_index = 0; diff --git a/src/libraries/JANA/Engine/JScheduler.h b/src/libraries/JANA/Engine/JScheduler.h index 8cfc85e6e..7210ffea7 100644 --- a/src/libraries/JANA/Engine/JScheduler.h +++ b/src/libraries/JANA/Engine/JScheduler.h @@ -8,9 +8,9 @@ #include #include -#include -#include -#include +#include +#include +#include struct JArrowTopology; @@ -54,7 +54,7 @@ class JScheduler { // This mutex controls ALL scheduler state std::mutex m_mutex; - std::shared_ptr m_topology; + std::shared_ptr m_topology; // Protected state TopologyState m_topology_state; @@ -63,7 +63,7 @@ class JScheduler { public: /// Constructor. Note that a Scheduler operates on a vector of Arrow*s. - JScheduler(std::shared_ptr topology); + JScheduler(std::shared_ptr topology); // Worker-facing operations diff --git a/src/libraries/JANA/Engine/JTopologyBuilder.h b/src/libraries/JANA/Engine/JTopologyBuilder.h deleted file mode 100644 index 0091bbace..000000000 --- a/src/libraries/JANA/Engine/JTopologyBuilder.h +++ /dev/null @@ -1,417 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#ifndef JANA2_JTOPOLOGYBUILDER_H -#define JANA2_JTOPOLOGYBUILDER_H - -#include - -#include "JEventSourceArrow.h" -#include "JEventProcessorArrow.h" -#include "JEventMapArrow.h" -#include "JUnfoldArrow.h" -#include "JFoldArrow.h" -#include - -#include - -class JTopologyBuilder : public JService { - - std::shared_ptr m_params; - std::shared_ptr m_components; - std::shared_ptr m_topology; - std::function(std::shared_ptr)> m_configure_topology; - - size_t m_event_pool_size = 4; - size_t m_event_queue_threshold = 80; - size_t m_event_source_chunksize = 40; - size_t m_event_processor_chunksize = 1; - size_t m_location_count = 1; - bool m_enable_call_graph_recording = false; - bool m_enable_stealing = false; - bool m_limit_total_events_in_flight = true; - int m_affinity = 0; - int m_locality = 0; - JLogger m_arrow_logger; - JLogger m_queue_logger; - JLogger m_builder_logger; - -public: - - JTopologyBuilder() = default; - - ~JTopologyBuilder() override = default; - - std::string print_topology() { - JTablePrinter t; - t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0); - t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0); - t.AddColumn("Direction", JTablePrinter::Justify::Left, 0); - t.AddColumn("Place", JTablePrinter::Justify::Left, 0); - t.AddColumn("ID", JTablePrinter::Justify::Left, 0); - - // Build index lookup for queues - int i = 0; - std::map lookup; - for (JQueue* queue : m_topology->queues) { - lookup[queue] = i; - i += 1; - } - // Build index lookup for pools - for (JPoolBase* pool : m_topology->pools) { - lookup[pool] = i; - i += 1; - } - // Build table - - bool show_row = true; - - for (JArrow* arrow : m_topology->arrows) { - - show_row = true; - for (PlaceRefBase* place : arrow->m_places) { - if (show_row) { - t | arrow->get_name(); - t | arrow->is_parallel(); - show_row = false; - } - else { - t | "" | "" ; - } - - t | ((place->is_input) ? "Input ": "Output"); - t | ((place->is_queue) ? "Queue ": "Pool"); - t | lookup[place->place_ref]; - } - } - return t.Render(); - } - - /// set allows the user to specify a topology directly. Note that this needs to be set before JApplication::Initialize - /// gets called, which means that you won't be able to include components loaded from plugins. You probably want to use - /// JTopologyBuilder::set_configure_fn instead, which does give you that access. - inline void set(std::shared_ptr topology) { - m_topology = std::move(topology); - } - - /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded. - /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool, - /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows. - /// You have to furnish those yourself. - inline void set_configure_fn(std::function(std::shared_ptr)> configure_fn) { - m_configure_topology = std::move(configure_fn); - } - - inline std::shared_ptr get_or_create() { - if (m_topology == nullptr) { - m_topology = std::make_shared(); - m_topology->component_manager = m_components; // Ensure the lifespan of the component manager exceeds that of the topology - m_topology->mapping.initialize(static_cast(m_affinity), - static_cast(m_locality)); - - m_topology->event_pool = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight); - m_topology->event_pool->init(); - attach_top_level(JEventLevel::Run); - LOG_INFO(m_builder_logger) << "Arrow topology is:\n" << print_topology() << LOG_END; - - if (m_configure_topology) { - m_topology = m_configure_topology(m_topology); - LOG_WARN(m_builder_logger) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END; - } - } - int id=0; - for (auto* queue : m_topology->queues) { - queue->set_logger(m_queue_logger); - queue->set_id(id); - id += 1; - } - for (auto* arrow : m_topology->arrows) { - arrow->set_logger(m_arrow_logger); - } - return m_topology; - } - - - void acquire_services(JServiceLocator *sl) override { - m_components = sl->get(); - m_params = sl->get(); - - // We default event pool size to be equal to nthreads - // We parse the 'nthreads' parameter two different ways for backwards compatibility. - if (m_params->Exists("nthreads")) { - if (m_params->GetParameterValue("nthreads") == "Ncores") { - m_event_pool_size = JCpuInfo::GetNumCpus(); - } else { - m_event_pool_size = m_params->GetParameterValue("nthreads"); - } - } - - m_params->SetDefaultParameter("jana:event_pool_size", m_event_pool_size, - "Sets the initial size of the event pool. Having too few events starves the workers; having too many consumes memory and introduces overhead from extra factory initializations") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:limit_total_events_in_flight", m_limit_total_events_in_flight, - "Controls whether the event pool is allowed to automatically grow beyond jana:event_pool_size") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_queue_threshold", m_event_queue_threshold, - "Max number of events allowed on the main event queue. Higher => Better load balancing; Lower => Fewer events in flight") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_source_chunksize", m_event_source_chunksize, - "Max number of events that a JEventSource may enqueue at once. Higher => less queue contention; Lower => better load balancing") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_processor_chunksize", m_event_processor_chunksize, - "Max number of events that the JEventProcessors may dequeue at once. Higher => less queue contention; Lower => better load balancing") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing, - "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:affinity", m_affinity, - "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:locality", m_locality, - "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("record_call_stack", m_enable_call_graph_recording, - "Records a trace of who called each factory. Reduces performance but necessary for plugins such as janadot.") - ->SetIsAdvanced(true); - - auto m_logging_svc = sl->get(); - m_arrow_logger = m_logging_svc->get_logger("JArrow"); - m_queue_logger = m_logging_svc->get_logger("JQueue"); - m_builder_logger = m_logging_svc->get_logger("JTopologyBuilder"); - }; - - inline std::shared_ptr create_empty() { - m_topology = std::make_shared(); - m_topology->component_manager = m_components; // Ensure the lifespan of the component manager exceeds that of the topology - m_topology->mapping.initialize(static_cast(m_affinity), - static_cast(m_locality)); - - m_topology->event_pool = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight); - m_topology->event_pool->init(); - return m_topology; - - } - - void attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink) { - - std::stringstream ss; - ss << current_level; - - LOG_DEBUG(m_arrow_logger) << "JTopologyBuilder: Attaching components at lower level = " << current_level << LOG_END; - - JEventPool* pool = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight, - current_level); - pool->init(); - m_topology->pools.push_back(pool); // Transfers ownership - - - std::vector sources_at_level; - for (JEventSource* source : m_components->get_evt_srces()) { - if (source->GetLevel() == current_level) { - sources_at_level.push_back(source); - } - } - std::vector procs_at_level; - for (JEventProcessor* proc : m_components->get_evt_procs()) { - if (proc->GetLevel() == current_level) { - procs_at_level.push_back(proc); - } - } - std::vector unfolders_at_level; - for (JEventUnfolder* unfolder : m_components->get_unfolders()) { - if (unfolder->GetLevel() == current_level) { - unfolders_at_level.push_back(unfolder); - } - } - - - if (sources_at_level.size() != 0) { - throw JException("Support for lower-level event sources coming soon!"); - } - if (unfolders_at_level.size() != 0) { - throw JException("Support for lower-level event unfolders coming soon!"); - } - if (procs_at_level.size() == 0) { - throw JException("For now we require you to provide at least one JEventProcessor"); - } - - auto q1 = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - m_topology->queues.push_back(q1); - - auto q2 = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - m_topology->queues.push_back(q2); - - auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap", q1, q2, nullptr); - m_topology->arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); - proc_arrow->set_logger(m_arrow_logger); - if (found_sink) { - proc_arrow->set_is_sink(false); - } - - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); - } - - parent_unfolder->attach_child_in(pool); - parent_unfolder->attach_child_out(q1); - parent_folder->attach_child_in(q2); - parent_folder->attach_child_out(pool); - parent_unfolder->attach(proc_arrow); - proc_arrow->attach(parent_folder); - } - - - void attach_top_level(JEventLevel current_level) { - - std::stringstream ss; - ss << current_level; - auto level_str = ss.str(); - - std::vector sources_at_level; - for (JEventSource* source : m_components->get_evt_srces()) { - if (source->GetLevel() == current_level) { - sources_at_level.push_back(source); - } - } - if (sources_at_level.size() == 0) { - // Skip level entirely for now. Consider eventually supporting - // folding low levels into higher levels without corresponding unfold - LOG_TRACE(m_arrow_logger) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; - JEventLevel next = next_level(current_level); - if (next == JEventLevel::None) { - LOG_WARN(m_arrow_logger) << "No sources found: Processing topology will be empty." << LOG_END; - return; - } - return attach_top_level(next); - } - LOG_DEBUG(m_arrow_logger) << "JTopologyBuilder: Attaching components at top level = " << current_level << LOG_END; - - // We've now found our top level. No matter what, we need an event pool for this level - JEventPool* pool_at_level = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight, - current_level); - pool_at_level->init(); - m_topology->pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - - // There are two possibilities at this point: - // a. This is the only level, in which case we wire up the arrows and exit - // b. We have an unfolder/folder pair, in which case we wire everything up, and then recursively attach_lower_level(). - // We use the presence of an unfolder as our test for whether or not a lower level should be included. This is because - // the folder might be trivial and hence omitted by the user. (Note that some folder is always needed in order to return - // the higher-level event to the pool). - // The user always needs to provide an unfolder because I can't think of a trivial unfolder that would be useful. - - std::vector unfolders_at_level; - for (JEventUnfolder* unfolder : m_components->get_unfolders()) { - if (unfolder->GetLevel() == current_level) { - unfolders_at_level.push_back(unfolder); - } - } - - std::vector procs_at_level; - for (JEventProcessor* proc : m_components->get_evt_procs()) { - if (proc->GetLevel() == current_level) { - procs_at_level.push_back(proc); - } - } - - if (unfolders_at_level.size() == 0) { - // No unfolders, so this is the only level - // Attach the source to the map/tap just like before - // - // We might want to print a friendly warning message communicating why any lower-level - // components are being ignored, like so: - // skip_lower_level(next_level(current_level)); - - LOG_DEBUG(m_arrow_logger) << "JTopologyBuilder: No unfolders found at level " << current_level << ", finishing here." << LOG_END; - - auto queue = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - m_topology->queues.push_back(queue); - - auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, queue, pool_at_level); - m_topology->arrows.push_back(src_arrow); - src_arrow->set_chunksize(m_event_source_chunksize); - - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", queue, nullptr, pool_at_level); - m_topology->arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); - - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); - } - src_arrow->attach(proc_arrow); - } - else if (unfolders_at_level.size() != 1) { - throw JException("At most one unfolder must be provided for each level in the event hierarchy!"); - } - else { - - auto q1 = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - auto q2 = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - - m_topology->queues.push_back(q1); - m_topology->queues.push_back(q2); - - auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, q1, pool_at_level); - m_topology->arrows.push_back(src_arrow); - src_arrow->set_chunksize(m_event_source_chunksize); - - auto *map_arrow = new JEventMapArrow(level_str+"Map", q1, q2);; - m_topology->arrows.push_back(map_arrow); - map_arrow->set_chunksize(m_event_source_chunksize); - src_arrow->attach(map_arrow); - - // TODO: We are using q2 temporarily knowing that it will be overwritten in attach_lower_level. - // It would be better to rejigger how we validate PlaceRefs and accept empty placerefs/fewer ctor args - auto *unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0], q2, pool_at_level, q2); - m_topology->arrows.push_back(unfold_arrow); - unfold_arrow->set_chunksize(m_event_source_chunksize); - map_arrow->attach(unfold_arrow); - - // child_in, child_out, parent_out - auto *fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel(), q2, pool_at_level, pool_at_level); - // TODO: Support user-provided folders - fold_arrow->set_chunksize(m_event_source_chunksize); - - bool found_sink = (procs_at_level.size() > 0); - attach_lower_level(unfolders_at_level[0]->GetChildLevel(), unfold_arrow, fold_arrow, found_sink); - - // Push fold arrow back _after_ attach_lower_level so that arrows can be iterated over in order - m_topology->arrows.push_back(fold_arrow); - - if (procs_at_level.size() != 0) { - - auto q3 = new EventQueue(m_event_queue_threshold, m_topology->mapping.get_loc_count(), m_enable_stealing); - m_topology->queues.push_back(q3); - - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", q3, nullptr, pool_at_level); - m_topology->arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); - - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); - } - - fold_arrow->attach_parent_out(q3); - fold_arrow->attach(proc_arrow); - } - } - } - -}; - - -#endif //JANA2_JTOPOLOGYBUILDER_H diff --git a/src/libraries/JANA/Engine/JWorker.h b/src/libraries/JANA/Engine/JWorker.h index afc230548..810da7e33 100644 --- a/src/libraries/JANA/Engine/JWorker.h +++ b/src/libraries/JANA/Engine/JWorker.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 5e719eed3..dc7e337f8 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -6,14 +6,17 @@ #include +#include #include +#include #include #include +#include #include #include #include -#include -#include + +#include JApplication *japp = nullptr; @@ -147,11 +150,9 @@ void JApplication::Initialize() { LOG_WARN(m_logger) << "Unrecognized engine choice! Falling back to jana:engine=0" << LOG_END; } */ - auto topology = topology_builder->get_or_create(); - ProvideService(std::make_shared(topology)); - + topology_builder->create_topology(); + ProvideService(std::make_shared()); m_processing_controller = m_service_locator->get(); // Get deps from SL - ProvideService(m_processing_controller); // Make abstract class available via SL m_processing_controller->initialize(); m_initialized = true; diff --git a/src/libraries/JANA/JApplicationFwd.h b/src/libraries/JANA/JApplicationFwd.h index 082311180..42f95f3b4 100644 --- a/src/libraries/JANA/JApplicationFwd.h +++ b/src/libraries/JANA/JApplicationFwd.h @@ -17,7 +17,7 @@ class JFactoryGenerator; class JFactorySet; class JComponentManager; class JPluginLoader; -class JProcessingController; +class JArrowProcessingController; class JEventUnfolder; class JServiceLocator; class JParameter; @@ -26,7 +26,7 @@ class JApplication; extern JApplication* japp; #include -#include +#include #include @@ -140,7 +140,7 @@ class JApplication { std::shared_ptr m_params; std::shared_ptr m_plugin_loader; std::shared_ptr m_component_manager; - std::shared_ptr m_processing_controller; + std::shared_ptr m_processing_controller; bool m_quitting = false; bool m_draining_queues = false; diff --git a/src/libraries/JANA/Services/JProcessingController.h b/src/libraries/JANA/Services/JProcessingController.h deleted file mode 100644 index 8d9c80b73..000000000 --- a/src/libraries/JANA/Services/JProcessingController.h +++ /dev/null @@ -1,39 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_JPROCESSINGCONTROLLER_H -#define JANA2_JPROCESSINGCONTROLLER_H - -#include -#include -#include - -#include -#include - -class JProcessingController : public JService { -public: - - virtual ~JProcessingController() = default; - - virtual void initialize() = 0; - virtual void run(size_t nthreads) = 0; - virtual void scale(size_t nthreads) = 0; - virtual void request_stop() = 0; - virtual void wait_until_stopped() = 0; - virtual bool is_stopped() = 0; - virtual bool is_finished() = 0; - virtual bool is_timed_out() = 0; - virtual bool is_excepted() = 0; - - virtual std::unique_ptr measure_performance() = 0; - virtual std::vector get_exceptions() const = 0; - - virtual void print_report() = 0; - virtual void print_final_report() = 0; -}; - -#endif //JANA2_JPROCESSINGCONTROLLER_H - diff --git a/src/libraries/JANA/Status/JPerfSummary.h b/src/libraries/JANA/Status/JPerfSummary.h deleted file mode 100644 index 05e575f10..000000000 --- a/src/libraries/JANA/Status/JPerfSummary.h +++ /dev/null @@ -1,43 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_JPERFORMANCESUMMARY_H -#define JANA2_JPERFORMANCESUMMARY_H - - -#include -#include -#include - -/// JPerfSummary is a plain-old-data container for performance metrics. -/// JProcessingControllers expose a JPerfSummary object, which they may -/// extend in order to expose additional, implementation-specific information. -struct JPerfSummary { - - size_t monotonic_events_completed = 0; // Since program started - size_t total_events_completed = 0; // Since run or rescale started - size_t latest_events_completed = 0; // Since previous measurement - size_t thread_count = 0; - double total_uptime_s = 0; - double latest_uptime_s = 0; - double avg_throughput_hz = 0; - double latest_throughput_hz = 0; - - JPerfSummary() = default; - JPerfSummary(const JPerfSummary&) = default; - virtual ~JPerfSummary() = default; -}; - -inline std::ostream& operator<<(std::ostream& os, const JPerfSummary& x) { - os << " Threads: " << x.thread_count << std::endl - << " Events processed: " << x.total_events_completed << std::endl - << " Inst throughput [Hz]: " << x.latest_throughput_hz << std::endl - << " Avg throughput [Hz]: " << x.avg_throughput_hz << std::endl; - return os; -} - - - -#endif //JANA2_JPERFORMANCESUMMARY_H diff --git a/src/libraries/JANA/Status/JStatusSummary.cc b/src/libraries/JANA/Status/JStatusSummary.cc deleted file mode 100644 index 446047f0e..000000000 --- a/src/libraries/JANA/Status/JStatusSummary.cc +++ /dev/null @@ -1,6 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#include "JStatusSummary.h" diff --git a/src/libraries/JANA/Status/JStatusSummary.h b/src/libraries/JANA/Status/JStatusSummary.h deleted file mode 100644 index 62803bbfd..000000000 --- a/src/libraries/JANA/Status/JStatusSummary.h +++ /dev/null @@ -1,32 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_J_STATUS_SUMMARY_H -#define JANA2_J_STATUS_SUMMARY_H - -#include -#include - -#include -#include - -struct JApplicationSummary { - bool initialized; - bool quitting; - bool draining_queues; - bool skip_join; - int exit_code; -}; - -struct JStatusSummary { - JApplicationSummary application_summary; - JComponentSummary component_summary; - JPerfSummary performance_summary; - std::map backtraces; - JException current_exception; -}; - - -#endif //JANA2_J_STATUS_SUMMARY_H diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Topology/JArrow.h similarity index 99% rename from src/libraries/JANA/Engine/JArrow.h rename to src/libraries/JANA/Topology/JArrow.h index b86a434ca..e9c55eb73 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -14,8 +14,8 @@ #include "JArrowMetrics.h" #include #include -#include -#include +#include +#include #ifndef JANA2_ARROWDATA_MAX_SIZE diff --git a/src/libraries/JANA/Engine/JArrowMetrics.h b/src/libraries/JANA/Topology/JArrowMetrics.h similarity index 100% rename from src/libraries/JANA/Engine/JArrowMetrics.h rename to src/libraries/JANA/Topology/JArrowMetrics.h diff --git a/src/libraries/JANA/Engine/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc similarity index 97% rename from src/libraries/JANA/Engine/JEventMapArrow.cc rename to src/libraries/JANA/Topology/JEventMapArrow.cc index ac6f869db..73636fee5 100644 --- a/src/libraries/JANA/Engine/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -2,7 +2,7 @@ // Subject to the terms in the LICENSE file found in the top-level directory. -#include +#include #include #include diff --git a/src/libraries/JANA/Engine/JEventMapArrow.h b/src/libraries/JANA/Topology/JEventMapArrow.h similarity index 95% rename from src/libraries/JANA/Engine/JEventMapArrow.h rename to src/libraries/JANA/Topology/JEventMapArrow.h index 7f110b4e4..7da94014b 100644 --- a/src/libraries/JANA/Engine/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JEventMapArrow.h @@ -3,7 +3,7 @@ #pragma once -#include +#include class JEventPool; class JEventSource; diff --git a/src/libraries/JANA/Engine/JEventProcessorArrow.cc b/src/libraries/JANA/Topology/JEventProcessorArrow.cc similarity index 97% rename from src/libraries/JANA/Engine/JEventProcessorArrow.cc rename to src/libraries/JANA/Topology/JEventProcessorArrow.cc index b4428b97e..3efdb43bd 100644 --- a/src/libraries/JANA/Engine/JEventProcessorArrow.cc +++ b/src/libraries/JANA/Topology/JEventProcessorArrow.cc @@ -3,7 +3,7 @@ // Subject to the terms in the LICENSE file found in the top-level directory. -#include +#include #include #include #include diff --git a/src/libraries/JANA/Engine/JEventProcessorArrow.h b/src/libraries/JANA/Topology/JEventProcessorArrow.h similarity index 95% rename from src/libraries/JANA/Engine/JEventProcessorArrow.h rename to src/libraries/JANA/Topology/JEventProcessorArrow.h index c29555131..41ee0f153 100644 --- a/src/libraries/JANA/Engine/JEventProcessorArrow.h +++ b/src/libraries/JANA/Topology/JEventProcessorArrow.h @@ -7,7 +7,7 @@ #include -#include +#include class JEventPool; diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc similarity index 98% rename from src/libraries/JANA/Engine/JEventSourceArrow.cc rename to src/libraries/JANA/Topology/JEventSourceArrow.cc index 9669ee910..10af499ce 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -5,7 +5,7 @@ #include #include -#include +#include #include diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h similarity index 94% rename from src/libraries/JANA/Engine/JEventSourceArrow.h rename to src/libraries/JANA/Topology/JEventSourceArrow.h index 2c55dee2a..3373e61c5 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -7,7 +7,7 @@ #define JANA2_JEVENTSOURCEARROW_H -#include +#include using Event = std::shared_ptr; using EventQueue = JMailbox; diff --git a/src/libraries/JANA/Engine/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h similarity index 99% rename from src/libraries/JANA/Engine/JFoldArrow.h rename to src/libraries/JANA/Topology/JFoldArrow.h index 43c360d17..8c1b027c1 100644 --- a/src/libraries/JANA/Engine/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -3,7 +3,7 @@ #pragma once -#include +#include #include #include diff --git a/src/libraries/JANA/Engine/JJunctionArrow.h b/src/libraries/JANA/Topology/JJunctionArrow.h similarity index 97% rename from src/libraries/JANA/Engine/JJunctionArrow.h rename to src/libraries/JANA/Topology/JJunctionArrow.h index a48975fc2..fffd02893 100644 --- a/src/libraries/JANA/Engine/JJunctionArrow.h +++ b/src/libraries/JANA/Topology/JJunctionArrow.h @@ -3,9 +3,9 @@ #pragma once -#include -#include -#include +#include +#include +#include diff --git a/src/libraries/JANA/Engine/JMailbox.h b/src/libraries/JANA/Topology/JMailbox.h similarity index 100% rename from src/libraries/JANA/Engine/JMailbox.h rename to src/libraries/JANA/Topology/JMailbox.h diff --git a/src/libraries/JANA/Engine/JPipelineArrow.h b/src/libraries/JANA/Topology/JPipelineArrow.h similarity index 96% rename from src/libraries/JANA/Engine/JPipelineArrow.h rename to src/libraries/JANA/Topology/JPipelineArrow.h index 7c62733d8..1ce6f0228 100644 --- a/src/libraries/JANA/Engine/JPipelineArrow.h +++ b/src/libraries/JANA/Topology/JPipelineArrow.h @@ -4,9 +4,9 @@ #pragma once -#include -#include -#include +#include +#include +#include template class JPipelineArrow : public JArrow { diff --git a/src/libraries/JANA/Engine/JPool.h b/src/libraries/JANA/Topology/JPool.h similarity index 100% rename from src/libraries/JANA/Engine/JPool.h rename to src/libraries/JANA/Topology/JPool.h diff --git a/src/libraries/JANA/Engine/JSubeventArrow.h b/src/libraries/JANA/Topology/JSubeventArrow.h similarity index 100% rename from src/libraries/JANA/Engine/JSubeventArrow.h rename to src/libraries/JANA/Topology/JSubeventArrow.h diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc new file mode 100644 index 000000000..9d5ab66d3 --- /dev/null +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -0,0 +1,377 @@ + +// Copyright 2020, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + + +#include "JTopologyBuilder.h" + +#include "JEventSourceArrow.h" +#include "JEventProcessorArrow.h" +#include "JEventMapArrow.h" +#include "JUnfoldArrow.h" +#include "JFoldArrow.h" +#include + + +using Event = std::shared_ptr; +using EventQueue = JMailbox; + +JTopologyBuilder::~JTopologyBuilder() { + for (auto arrow : arrows) { + delete arrow; + } + for (auto queue : queues) { + delete queue; + } + for (auto pool : pools) { + delete pool; + } + if (event_pool != nullptr) { + delete event_pool; + } +} + +std::string JTopologyBuilder::print_topology() { + JTablePrinter t; + t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0); + t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0); + t.AddColumn("Direction", JTablePrinter::Justify::Left, 0); + t.AddColumn("Place", JTablePrinter::Justify::Left, 0); + t.AddColumn("ID", JTablePrinter::Justify::Left, 0); + + // Build index lookup for queues + int i = 0; + std::map lookup; + for (JQueue* queue : queues) { + lookup[queue] = i; + i += 1; + } + // Build index lookup for pools + for (JPoolBase* pool : pools) { + lookup[pool] = i; + i += 1; + } + // Build table + + bool show_row = true; + + for (JArrow* arrow : arrows) { + + show_row = true; + for (PlaceRefBase* place : arrow->m_places) { + if (show_row) { + t | arrow->get_name(); + t | arrow->is_parallel(); + show_row = false; + } + else { + t | "" | "" ; + } + + t | ((place->is_input) ? "Input ": "Output"); + t | ((place->is_queue) ? "Queue ": "Pool"); + t | lookup[place->place_ref]; + } + } + return t.Render(); +} + + +/// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded. +/// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool, +/// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows. +/// You have to furnish those yourself. +void JTopologyBuilder::set_configure_fn(std::function configure_fn) { + m_configure_topology = std::move(configure_fn); +} + +void JTopologyBuilder::create_topology() { + mapping.initialize(static_cast(m_affinity), + static_cast(m_locality)); + + event_pool = new JEventPool(m_components, + m_event_pool_size, + m_location_count, + m_limit_total_events_in_flight); + event_pool->init(); + + if (m_configure_topology) { + m_configure_topology(*this); + LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END; + } + else { + attach_top_level(JEventLevel::Run); + LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END; + } + int id=0; + for (auto* queue : queues) { + queue->set_logger(m_queue_logger); + queue->set_id(id); + id += 1; + } + for (auto* arrow : arrows) { + arrow->set_logger(m_arrow_logger); + } +} + + +void JTopologyBuilder::acquire_services(JServiceLocator *sl) { + + m_components = sl->get(); + + // We default event pool size to be equal to nthreads + // We parse the 'nthreads' parameter two different ways for backwards compatibility. + if (m_params->Exists("nthreads")) { + if (m_params->GetParameterValue("nthreads") == "Ncores") { + m_event_pool_size = JCpuInfo::GetNumCpus(); + } else { + m_event_pool_size = m_params->GetParameterValue("nthreads"); + } + } + + m_params->SetDefaultParameter("jana:event_pool_size", m_event_pool_size, + "Sets the initial size of the event pool. Having too few events starves the workers; having too many consumes memory and introduces overhead from extra factory initializations") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:limit_total_events_in_flight", m_limit_total_events_in_flight, + "Controls whether the event pool is allowed to automatically grow beyond jana:event_pool_size") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:event_queue_threshold", m_event_queue_threshold, + "Max number of events allowed on the main event queue. Higher => Better load balancing; Lower => Fewer events in flight") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:event_source_chunksize", m_event_source_chunksize, + "Max number of events that a JEventSource may enqueue at once. Higher => less queue contention; Lower => better load balancing") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:event_processor_chunksize", m_event_processor_chunksize, + "Max number of events that the JEventProcessors may dequeue at once. Higher => less queue contention; Lower => better load balancing") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing, + "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:affinity", m_affinity, + "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("jana:locality", m_locality, + "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.") + ->SetIsAdvanced(true); + m_params->SetDefaultParameter("record_call_stack", m_enable_call_graph_recording, + "Records a trace of who called each factory. Reduces performance but necessary for plugins such as janadot.") + ->SetIsAdvanced(true); + + m_arrow_logger = m_logging->get_logger("JArrow"); + m_queue_logger = m_logging->get_logger("JQueue"); +}; + + +void JTopologyBuilder::attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink) { + + std::stringstream ss; + ss << current_level; + + LOG_DEBUG(GetLogger()) << "JTopologyBuilder: Attaching components at lower level = " << current_level << LOG_END; + + JEventPool* pool = new JEventPool(m_components, + m_event_pool_size, + m_location_count, + m_limit_total_events_in_flight, + current_level); + pool->init(); + pools.push_back(pool); // Transfers ownership + + + std::vector sources_at_level; + for (JEventSource* source : m_components->get_evt_srces()) { + if (source->GetLevel() == current_level) { + sources_at_level.push_back(source); + } + } + std::vector procs_at_level; + for (JEventProcessor* proc : m_components->get_evt_procs()) { + if (proc->GetLevel() == current_level) { + procs_at_level.push_back(proc); + } + } + std::vector unfolders_at_level; + for (JEventUnfolder* unfolder : m_components->get_unfolders()) { + if (unfolder->GetLevel() == current_level) { + unfolders_at_level.push_back(unfolder); + } + } + + + if (sources_at_level.size() != 0) { + throw JException("Support for lower-level event sources coming soon!"); + } + if (unfolders_at_level.size() != 0) { + throw JException("Support for lower-level event unfolders coming soon!"); + } + if (procs_at_level.size() == 0) { + throw JException("For now we require you to provide at least one JEventProcessor"); + } + + auto q1 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + queues.push_back(q1); + + auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + queues.push_back(q2); + + auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap", q1, q2, nullptr); + arrows.push_back(proc_arrow); + proc_arrow->set_chunksize(m_event_processor_chunksize); + proc_arrow->set_logger(m_arrow_logger); + if (found_sink) { + proc_arrow->set_is_sink(false); + } + + for (auto proc: procs_at_level) { + proc_arrow->add_processor(proc); + } + + parent_unfolder->attach_child_in(pool); + parent_unfolder->attach_child_out(q1); + parent_folder->attach_child_in(q2); + parent_folder->attach_child_out(pool); + parent_unfolder->attach(proc_arrow); + proc_arrow->attach(parent_folder); +} + + +void JTopologyBuilder::attach_top_level(JEventLevel current_level) { + + std::stringstream ss; + ss << current_level; + auto level_str = ss.str(); + + std::vector sources_at_level; + for (JEventSource* source : m_components->get_evt_srces()) { + if (source->GetLevel() == current_level) { + sources_at_level.push_back(source); + } + } + if (sources_at_level.size() == 0) { + // Skip level entirely for now. Consider eventually supporting + // folding low levels into higher levels without corresponding unfold + LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; + JEventLevel next = next_level(current_level); + if (next == JEventLevel::None) { + LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; + return; + } + return attach_top_level(next); + } + LOG_DEBUG(GetLogger()) << "JTopologyBuilder: Attaching components at top level = " << current_level << LOG_END; + + // We've now found our top level. No matter what, we need an event pool for this level + JEventPool* pool_at_level = new JEventPool(m_components, + m_event_pool_size, + m_location_count, + m_limit_total_events_in_flight, + current_level); + pool_at_level->init(); + pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology + + // There are two possibilities at this point: + // a. This is the only level, in which case we wire up the arrows and exit + // b. We have an unfolder/folder pair, in which case we wire everything up, and then recursively attach_lower_level(). + // We use the presence of an unfolder as our test for whether or not a lower level should be included. This is because + // the folder might be trivial and hence omitted by the user. (Note that some folder is always needed in order to return + // the higher-level event to the pool). + // The user always needs to provide an unfolder because I can't think of a trivial unfolder that would be useful. + + std::vector unfolders_at_level; + for (JEventUnfolder* unfolder : m_components->get_unfolders()) { + if (unfolder->GetLevel() == current_level) { + unfolders_at_level.push_back(unfolder); + } + } + + std::vector procs_at_level; + for (JEventProcessor* proc : m_components->get_evt_procs()) { + if (proc->GetLevel() == current_level) { + procs_at_level.push_back(proc); + } + } + + if (unfolders_at_level.size() == 0) { + // No unfolders, so this is the only level + // Attach the source to the map/tap just like before + // + // We might want to print a friendly warning message communicating why any lower-level + // components are being ignored, like so: + // skip_lower_level(next_level(current_level)); + + LOG_DEBUG(GetLogger()) << "JTopologyBuilder: No unfolders found at level " << current_level << ", finishing here." << LOG_END; + + auto queue = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + queues.push_back(queue); + + auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, queue, pool_at_level); + arrows.push_back(src_arrow); + src_arrow->set_chunksize(m_event_source_chunksize); + + auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", queue, nullptr, pool_at_level); + arrows.push_back(proc_arrow); + proc_arrow->set_chunksize(m_event_processor_chunksize); + + for (auto proc: procs_at_level) { + proc_arrow->add_processor(proc); + } + src_arrow->attach(proc_arrow); + } + else if (unfolders_at_level.size() != 1) { + throw JException("At most one unfolder must be provided for each level in the event hierarchy!"); + } + else { + + auto q1 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + + queues.push_back(q1); + queues.push_back(q2); + + auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, q1, pool_at_level); + arrows.push_back(src_arrow); + src_arrow->set_chunksize(m_event_source_chunksize); + + auto *map_arrow = new JEventMapArrow(level_str+"Map", q1, q2);; + arrows.push_back(map_arrow); + map_arrow->set_chunksize(m_event_source_chunksize); + src_arrow->attach(map_arrow); + + // TODO: We are using q2 temporarily knowing that it will be overwritten in attach_lower_level. + // It would be better to rejigger how we validate PlaceRefs and accept empty placerefs/fewer ctor args + auto *unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0], q2, pool_at_level, q2); + arrows.push_back(unfold_arrow); + unfold_arrow->set_chunksize(m_event_source_chunksize); + map_arrow->attach(unfold_arrow); + + // child_in, child_out, parent_out + auto *fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel(), q2, pool_at_level, pool_at_level); + // TODO: Support user-provided folders + fold_arrow->set_chunksize(m_event_source_chunksize); + + bool found_sink = (procs_at_level.size() > 0); + attach_lower_level(unfolders_at_level[0]->GetChildLevel(), unfold_arrow, fold_arrow, found_sink); + + // Push fold arrow back _after_ attach_lower_level so that arrows can be iterated over in order + arrows.push_back(fold_arrow); + + if (procs_at_level.size() != 0) { + + auto q3 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + queues.push_back(q3); + + auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", q3, nullptr, pool_at_level); + arrows.push_back(proc_arrow); + proc_arrow->set_chunksize(m_event_processor_chunksize); + + for (auto proc: procs_at_level) { + proc_arrow->add_processor(proc); + } + + fold_arrow->attach_parent_out(q3); + fold_arrow->attach(proc_arrow); + } + } + +} + diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h new file mode 100644 index 000000000..098a324e5 --- /dev/null +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -0,0 +1,86 @@ + +// Copyright 2020, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + +#pragma once + +#include +#include +#include +#include // TODO: Should't be here + +#include +#include +#include + + +class JParameterManager; +class JLoggingService; +class JComponentManager; +class JArrow; +class JQueue; +class JPoolBase; +class JQueue; +class JFoldArrow; +class JUnfoldArrow; +class JEventPool; + +class JTopologyBuilder : public JService { +public: + // Services + Service m_params {this}; + Service m_logging {this}; + std::shared_ptr m_components; + + // The topology itself + std::vector arrows; + std::vector queues; // Queues shared between arrows + std::vector pools; // Pools shared between arrows + + // Topology configuration + size_t m_event_pool_size = 4; + size_t m_event_queue_threshold = 80; + size_t m_event_source_chunksize = 40; + size_t m_event_processor_chunksize = 1; + size_t m_location_count = 1; + bool m_enable_call_graph_recording = false; + bool m_enable_stealing = false; + bool m_limit_total_events_in_flight = true; + int m_affinity = 0; + int m_locality = 0; + + // Things that probably shouldn't be here + std::function m_configure_topology; + JEventPool* event_pool = nullptr; // TODO: Move into pools eventually + JPerfMetrics metrics; + JProcessorMapping mapping; + + JLogger m_arrow_logger; + JLogger m_queue_logger; + +public: + + JTopologyBuilder() = default; + + ~JTopologyBuilder() override; + + void acquire_services(JServiceLocator *sl) override; + + /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded. + /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool, + /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows. + /// You have to furnish those yourself. + void set_configure_fn(std::function configure_fn); + + void create_topology(); + + void attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink); + + void attach_top_level(JEventLevel current_level); + + std::string print_topology(); + + +}; + + diff --git a/src/libraries/JANA/Engine/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h similarity index 99% rename from src/libraries/JANA/Engine/JUnfoldArrow.h rename to src/libraries/JANA/Topology/JUnfoldArrow.h index 181a2f989..1916c3980 100644 --- a/src/libraries/JANA/Engine/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -3,7 +3,7 @@ #pragma once -#include +#include #include #include diff --git a/src/libraries/JANA/Utils/JEventPool.h b/src/libraries/JANA/Utils/JEventPool.h index cd9343627..d04b0f928 100644 --- a/src/libraries/JANA/Utils/JEventPool.h +++ b/src/libraries/JANA/Utils/JEventPool.h @@ -8,7 +8,7 @@ #include #include -#include +#include class JEventPool : public JPool> { diff --git a/src/programs/unit_tests/CMakeLists.txt b/src/programs/unit_tests/CMakeLists.txt index 3932b2573..91b8ae937 100644 --- a/src/programs/unit_tests/CMakeLists.txt +++ b/src/programs/unit_tests/CMakeLists.txt @@ -1,53 +1,51 @@ set(TEST_SOURCES - catch.hpp - ArrowActivationTests.cc - JObjectTests.cc - JServiceLocatorTests.cc - JServiceLocatorTests.h - QueueTests.cc - TopologyTests.cc - SchedulerTests.cc - JEventTests.cc - JEventTests.h - ExactlyOnceTests.cc - ExactlyOnceTests.h - TerminationTests.cc - TerminationTests.h - UserExceptionTests.cc - UserExceptionTests.h - JEventGroupTests.cc - JFactoryTests.h - JFactoryTests.cc - NEventNSkipTests.cc - JEventGetAllTests.cc - JParameterManagerTests.cc - JStatusBitsTests.cc - TimeoutTests.cc - ScaleTests.cc - BarrierEventTests.cc - BarrierEventTests.h - GetObjectsTests.cc - JCallGraphRecorderTests.cc - JEventProcessorSequentialTests.cc - JFactoryDefTagsTests.cc - SubeventTests.cc - JAutoactivableTests.cc - JTablePrinterTests.cc - JMultiFactoryTests.cc - JPoolTests.cc - ArrowTests.cc - MultiLevelTopologyTests.cc - UnfoldTests.cc - JComponentTests.cc - Components/JEventSourceTests.cc + + Topology/ArrowTests.cc + Topology/JPoolTests.cc + Topology/MultiLevelTopologyTests.cc + Topology/QueueTests.cc + Topology/SubeventTests.cc + Topology/TopologyTests.cc + + Components/BarrierEventTests.cc + Components/JObjectTests.cc + Components/ExactlyOnceTests.cc + Components/GetObjectsTests.cc + Components/NEventNSkipTests.cc + Components/JComponentTests.cc + Components/JEventGetAllTests.cc Components/JEventProcessorTests.cc + Components/JEventProcessorSequentialTests.cc + Components/JEventSourceTests.cc + Components/JEventTests.cc + Components/JFactoryDefTagsTests.cc + Components/JFactoryTests.cc + Components/JMultiFactoryTests.cc + Components/UnfoldTests.cc + Components/UserExceptionTests.cc + + Services/JServiceLocatorTests.cc + Services/JParameterManagerTests.cc + + Engine/ArrowActivationTests.cc + Engine/ScaleTests.cc + Engine/SchedulerTests.cc + Engine/TerminationTests.cc + Engine/TimeoutTests.cc + + Utils/JAutoactivableTests.cc + Utils/JEventGroupTests.cc + Utils/JTablePrinterTests.cc + Utils/JStatusBitsTests.cc + Utils/JCallGraphRecorderTests.cc + ) if (${USE_PODIO}) list(APPEND TEST_SOURCES - PodioTests.cc - ) + Components/PodioTests.cc + ) endif() add_executable(jana-unit-tests ${TEST_SOURCES}) diff --git a/src/programs/unit_tests/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc similarity index 100% rename from src/programs/unit_tests/BarrierEventTests.cc rename to src/programs/unit_tests/Components/BarrierEventTests.cc diff --git a/src/programs/unit_tests/BarrierEventTests.h b/src/programs/unit_tests/Components/BarrierEventTests.h similarity index 100% rename from src/programs/unit_tests/BarrierEventTests.h rename to src/programs/unit_tests/Components/BarrierEventTests.h diff --git a/src/programs/unit_tests/ExactlyOnceTests.cc b/src/programs/unit_tests/Components/ExactlyOnceTests.cc similarity index 100% rename from src/programs/unit_tests/ExactlyOnceTests.cc rename to src/programs/unit_tests/Components/ExactlyOnceTests.cc diff --git a/src/programs/unit_tests/ExactlyOnceTests.h b/src/programs/unit_tests/Components/ExactlyOnceTests.h similarity index 100% rename from src/programs/unit_tests/ExactlyOnceTests.h rename to src/programs/unit_tests/Components/ExactlyOnceTests.h diff --git a/src/programs/unit_tests/GetObjectsTests.cc b/src/programs/unit_tests/Components/GetObjectsTests.cc similarity index 100% rename from src/programs/unit_tests/GetObjectsTests.cc rename to src/programs/unit_tests/Components/GetObjectsTests.cc diff --git a/src/programs/unit_tests/JComponentTests.cc b/src/programs/unit_tests/Components/JComponentTests.cc similarity index 100% rename from src/programs/unit_tests/JComponentTests.cc rename to src/programs/unit_tests/Components/JComponentTests.cc diff --git a/src/programs/unit_tests/JEventGetAllTests.cc b/src/programs/unit_tests/Components/JEventGetAllTests.cc similarity index 100% rename from src/programs/unit_tests/JEventGetAllTests.cc rename to src/programs/unit_tests/Components/JEventGetAllTests.cc diff --git a/src/programs/unit_tests/JEventProcessorSequentialTests.cc b/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc similarity index 100% rename from src/programs/unit_tests/JEventProcessorSequentialTests.cc rename to src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc diff --git a/src/programs/unit_tests/JEventTests.cc b/src/programs/unit_tests/Components/JEventTests.cc similarity index 100% rename from src/programs/unit_tests/JEventTests.cc rename to src/programs/unit_tests/Components/JEventTests.cc diff --git a/src/programs/unit_tests/JEventTests.h b/src/programs/unit_tests/Components/JEventTests.h similarity index 100% rename from src/programs/unit_tests/JEventTests.h rename to src/programs/unit_tests/Components/JEventTests.h diff --git a/src/programs/unit_tests/JFactoryDefTagsTests.cc b/src/programs/unit_tests/Components/JFactoryDefTagsTests.cc similarity index 100% rename from src/programs/unit_tests/JFactoryDefTagsTests.cc rename to src/programs/unit_tests/Components/JFactoryDefTagsTests.cc diff --git a/src/programs/unit_tests/JFactoryTests.cc b/src/programs/unit_tests/Components/JFactoryTests.cc similarity index 100% rename from src/programs/unit_tests/JFactoryTests.cc rename to src/programs/unit_tests/Components/JFactoryTests.cc diff --git a/src/programs/unit_tests/JFactoryTests.h b/src/programs/unit_tests/Components/JFactoryTests.h similarity index 100% rename from src/programs/unit_tests/JFactoryTests.h rename to src/programs/unit_tests/Components/JFactoryTests.h diff --git a/src/programs/unit_tests/JMultiFactoryTests.cc b/src/programs/unit_tests/Components/JMultiFactoryTests.cc similarity index 100% rename from src/programs/unit_tests/JMultiFactoryTests.cc rename to src/programs/unit_tests/Components/JMultiFactoryTests.cc diff --git a/src/programs/unit_tests/JObjectTests.cc b/src/programs/unit_tests/Components/JObjectTests.cc similarity index 100% rename from src/programs/unit_tests/JObjectTests.cc rename to src/programs/unit_tests/Components/JObjectTests.cc diff --git a/src/programs/unit_tests/NEventNSkipTests.cc b/src/programs/unit_tests/Components/NEventNSkipTests.cc similarity index 100% rename from src/programs/unit_tests/NEventNSkipTests.cc rename to src/programs/unit_tests/Components/NEventNSkipTests.cc diff --git a/src/programs/unit_tests/PodioTests.cc b/src/programs/unit_tests/Components/PodioTests.cc similarity index 100% rename from src/programs/unit_tests/PodioTests.cc rename to src/programs/unit_tests/Components/PodioTests.cc diff --git a/src/programs/unit_tests/UnfoldTests.cc b/src/programs/unit_tests/Components/UnfoldTests.cc similarity index 98% rename from src/programs/unit_tests/UnfoldTests.cc rename to src/programs/unit_tests/Components/UnfoldTests.cc index 2efa6699e..59a0a7a87 100644 --- a/src/programs/unit_tests/UnfoldTests.cc +++ b/src/programs/unit_tests/Components/UnfoldTests.cc @@ -1,7 +1,7 @@ #include -#include -#include +#include +#include namespace jana { namespace unfoldtests { diff --git a/src/programs/unit_tests/UserExceptionTests.cc b/src/programs/unit_tests/Components/UserExceptionTests.cc similarity index 100% rename from src/programs/unit_tests/UserExceptionTests.cc rename to src/programs/unit_tests/Components/UserExceptionTests.cc diff --git a/src/programs/unit_tests/UserExceptionTests.h b/src/programs/unit_tests/Components/UserExceptionTests.h similarity index 100% rename from src/programs/unit_tests/UserExceptionTests.h rename to src/programs/unit_tests/Components/UserExceptionTests.h diff --git a/src/programs/unit_tests/ArrowActivationTests.cc b/src/programs/unit_tests/Engine/ArrowActivationTests.cc similarity index 96% rename from src/programs/unit_tests/ArrowActivationTests.cc rename to src/programs/unit_tests/Engine/ArrowActivationTests.cc index ae5f2fdc8..80ef5d860 100644 --- a/src/programs/unit_tests/ArrowActivationTests.cc +++ b/src/programs/unit_tests/Engine/ArrowActivationTests.cc @@ -5,8 +5,7 @@ #include "catch.hpp" -#include -#include +#include "../Topology/TestTopologyComponents.h" #include @@ -34,7 +33,7 @@ TEST_CASE("ArrowActivationTests") { auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); auto sum_everything = new SumSink("sum_everything", q3, p2); - auto topology = std::make_shared(); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); @@ -49,7 +48,7 @@ TEST_CASE("ArrowActivationTests") { topology->pools.push_back(p2); auto logger = JLogger(JLogger::Level::OFF); - topology->m_logger = logger; + topology->SetLogger(logger); emit_rand_ints->set_logger(logger); multiply_by_two->set_logger(logger); subtract_one->set_logger(logger); diff --git a/src/programs/unit_tests/ScaleTests.cc b/src/programs/unit_tests/Engine/ScaleTests.cc similarity index 90% rename from src/programs/unit_tests/ScaleTests.cc rename to src/programs/unit_tests/Engine/ScaleTests.cc index 3ddffe149..e30962d67 100644 --- a/src/programs/unit_tests/ScaleTests.cc +++ b/src/programs/unit_tests/Engine/ScaleTests.cc @@ -53,9 +53,9 @@ TEST_CASE("ScaleNWorkerUpdate") { // threads = app.GetNThreads(); // However, we can't, because JApplication caches performance metrics based off of a ticker interval, and // Scale() doesn't invalidate the cache. We don't have a clean mechanism to manually force a cache invalidation - // from JApplication yet. So for now we will obtain the thread count directly from the JProcessingController. + // from JApplication yet. So for now we will obtain the thread count directly from the JArrowProcessingController. - auto pc = app.GetService(); + auto pc = app.GetService(); auto perf_summary = pc->measure_performance(); threads = perf_summary->thread_count; @@ -78,17 +78,17 @@ TEST_CASE("ScaleThroughputImprovement", "[.][performance]") { auto japc = app.GetService(); app.Run(false); std::this_thread::sleep_for(std::chrono::seconds(5)); - auto throughput_hz_1 = japc->measure_internal_performance()->latest_throughput_hz; + auto throughput_hz_1 = japc->measure_performance()->latest_throughput_hz; japc->print_report(); std::cout << "nthreads=1: throughput_hz=" << throughput_hz_1 << std::endl; app.Scale(2); std::this_thread::sleep_for(std::chrono::seconds(5)); - auto throughput_hz_2 = japc->measure_internal_performance()->latest_throughput_hz; + auto throughput_hz_2 = japc->measure_performance()->latest_throughput_hz; japc->print_report(); std::cout << "nthreads=2: throughput_hz=" << throughput_hz_2 << std::endl; app.Scale(4); std::this_thread::sleep_for(std::chrono::seconds(5)); - auto throughput_hz_4 = japc->measure_internal_performance()->latest_throughput_hz; + auto throughput_hz_4 = japc->measure_performance()->latest_throughput_hz; japc->print_report(); std::cout << "nthreads=4: throughput_hz=" << throughput_hz_4 << std::endl; REQUIRE(throughput_hz_2 > throughput_hz_1*1.5); diff --git a/src/programs/unit_tests/ScaleTests.h b/src/programs/unit_tests/Engine/ScaleTests.h similarity index 100% rename from src/programs/unit_tests/ScaleTests.h rename to src/programs/unit_tests/Engine/ScaleTests.h diff --git a/src/programs/unit_tests/SchedulerTests.cc b/src/programs/unit_tests/Engine/SchedulerTests.cc similarity index 96% rename from src/programs/unit_tests/SchedulerTests.cc rename to src/programs/unit_tests/Engine/SchedulerTests.cc index 8b1008c36..0136fc2ad 100644 --- a/src/programs/unit_tests/SchedulerTests.cc +++ b/src/programs/unit_tests/Engine/SchedulerTests.cc @@ -6,8 +6,9 @@ #include "catch.hpp" #include -#include -#include +#include + +#include "../Topology/TestTopologyComponents.h" TEST_CASE("SchedulerTests") { @@ -27,7 +28,7 @@ TEST_CASE("SchedulerTests") { auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); auto sum_everything = new SumSink("sum_everything", q3, p2); - auto topology = std::make_shared(); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); @@ -39,7 +40,7 @@ TEST_CASE("SchedulerTests") { topology->arrows.push_back(sum_everything); auto logger = JLogger(JLogger::Level::INFO); - topology->m_logger = logger; + topology->SetLogger(logger); emit_rand_ints->set_logger(logger); multiply_by_two->set_logger(logger); subtract_one->set_logger(logger); @@ -113,7 +114,7 @@ TEST_CASE("SchedulerRoundRobinBehaviorTests") { auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); auto sum_everything = new SumSink("sum_everything", q3, p2); - auto topology = std::make_shared(); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); @@ -125,7 +126,7 @@ TEST_CASE("SchedulerRoundRobinBehaviorTests") { topology->arrows.push_back(sum_everything); auto logger = JLogger(JLogger::Level::INFO); - topology->m_logger = logger; + topology->SetLogger(logger); emit_rand_ints->set_logger(logger); multiply_by_two->set_logger(logger); subtract_one->set_logger(logger); diff --git a/src/programs/unit_tests/TerminationTests.cc b/src/programs/unit_tests/Engine/TerminationTests.cc similarity index 94% rename from src/programs/unit_tests/TerminationTests.cc rename to src/programs/unit_tests/Engine/TerminationTests.cc index 06133bbd3..4e939f92b 100644 --- a/src/programs/unit_tests/TerminationTests.cc +++ b/src/programs/unit_tests/Engine/TerminationTests.cc @@ -4,11 +4,9 @@ #include "TerminationTests.h" #include "catch.hpp" -#include "JANA/Engine/JArrowProcessingController.h" -#include - +#include #include -#include +#include diff --git a/src/programs/unit_tests/TerminationTests.h b/src/programs/unit_tests/Engine/TerminationTests.h similarity index 100% rename from src/programs/unit_tests/TerminationTests.h rename to src/programs/unit_tests/Engine/TerminationTests.h diff --git a/src/programs/unit_tests/TimeoutTests.cc b/src/programs/unit_tests/Engine/TimeoutTests.cc similarity index 100% rename from src/programs/unit_tests/TimeoutTests.cc rename to src/programs/unit_tests/Engine/TimeoutTests.cc diff --git a/src/programs/unit_tests/TimeoutTests.h b/src/programs/unit_tests/Engine/TimeoutTests.h similarity index 100% rename from src/programs/unit_tests/TimeoutTests.h rename to src/programs/unit_tests/Engine/TimeoutTests.h diff --git a/src/programs/unit_tests/JParameterManagerTests.cc b/src/programs/unit_tests/Services/JParameterManagerTests.cc similarity index 100% rename from src/programs/unit_tests/JParameterManagerTests.cc rename to src/programs/unit_tests/Services/JParameterManagerTests.cc diff --git a/src/programs/unit_tests/JServiceLocatorTests.cc b/src/programs/unit_tests/Services/JServiceLocatorTests.cc similarity index 100% rename from src/programs/unit_tests/JServiceLocatorTests.cc rename to src/programs/unit_tests/Services/JServiceLocatorTests.cc diff --git a/src/programs/unit_tests/JServiceLocatorTests.h b/src/programs/unit_tests/Services/JServiceLocatorTests.h similarity index 100% rename from src/programs/unit_tests/JServiceLocatorTests.h rename to src/programs/unit_tests/Services/JServiceLocatorTests.h diff --git a/src/programs/unit_tests/ArrowTests.cc b/src/programs/unit_tests/Topology/ArrowTests.cc similarity index 98% rename from src/programs/unit_tests/ArrowTests.cc rename to src/programs/unit_tests/Topology/ArrowTests.cc index 312008267..4a3b8877d 100644 --- a/src/programs/unit_tests/ArrowTests.cc +++ b/src/programs/unit_tests/Topology/ArrowTests.cc @@ -1,6 +1,6 @@ #include -#include +#include namespace jana { namespace arrowtests { diff --git a/src/programs/unit_tests/JPoolTests.cc b/src/programs/unit_tests/Topology/JPoolTests.cc similarity index 98% rename from src/programs/unit_tests/JPoolTests.cc rename to src/programs/unit_tests/Topology/JPoolTests.cc index c1f3cb16c..6aa48680a 100644 --- a/src/programs/unit_tests/JPoolTests.cc +++ b/src/programs/unit_tests/Topology/JPoolTests.cc @@ -1,6 +1,6 @@ #include -#include +#include namespace jana { namespace jpooltests { diff --git a/src/programs/unit_tests/MapArrow.h b/src/programs/unit_tests/Topology/MapArrow.h similarity index 98% rename from src/programs/unit_tests/MapArrow.h rename to src/programs/unit_tests/Topology/MapArrow.h index d85f2992e..e24670129 100644 --- a/src/programs/unit_tests/MapArrow.h +++ b/src/programs/unit_tests/Topology/MapArrow.h @@ -5,7 +5,7 @@ #ifndef GREENFIELD_MAPARROW_H #define GREENFIELD_MAPARROW_H -#include +#include /// ParallelProcessor transforms S to T and it does so in a way which is thread-safe diff --git a/src/programs/unit_tests/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc similarity index 100% rename from src/programs/unit_tests/MultiLevelTopologyTests.cc rename to src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc diff --git a/src/programs/unit_tests/MultiLevelTopologyTests.h b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h similarity index 100% rename from src/programs/unit_tests/MultiLevelTopologyTests.h rename to src/programs/unit_tests/Topology/MultiLevelTopologyTests.h diff --git a/src/programs/unit_tests/QueueTests.cc b/src/programs/unit_tests/Topology/QueueTests.cc similarity index 96% rename from src/programs/unit_tests/QueueTests.cc rename to src/programs/unit_tests/Topology/QueueTests.cc index 3ba23d627..810aeb95a 100644 --- a/src/programs/unit_tests/QueueTests.cc +++ b/src/programs/unit_tests/Topology/QueueTests.cc @@ -2,7 +2,7 @@ // Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. -#include +#include #include "catch.hpp" diff --git a/src/programs/unit_tests/SubeventTests.cc b/src/programs/unit_tests/Topology/SubeventTests.cc similarity index 83% rename from src/programs/unit_tests/SubeventTests.cc rename to src/programs/unit_tests/Topology/SubeventTests.cc index aa1c5343e..6e7e2d505 100644 --- a/src/programs/unit_tests/SubeventTests.cc +++ b/src/programs/unit_tests/Topology/SubeventTests.cc @@ -7,9 +7,10 @@ #include #include -#include -#include "JANA/Engine/JArrowTopology.h" -#include "JANA/Engine/JTopologyBuilder.h" +#include +#include +#include +#include struct MyInput : public JObject { @@ -180,23 +181,26 @@ TEST_CASE("Basic subevent arrow functionality") { app.SetTimeoutEnabled(false); app.SetTicker(false); - auto topology = app.GetService()->create_empty(); - auto source_arrow = new JEventSourceArrow("simpleSource", - {new SimpleSource}, - &events_in, - topology->event_pool); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool); - proc_arrow->add_processor(new SimpleProcessor); - - topology->arrows.push_back(source_arrow); - topology->arrows.push_back(split_arrow); - topology->arrows.push_back(subprocess_arrow); - topology->arrows.push_back(merge_arrow); - topology->arrows.push_back(proc_arrow); - source_arrow->attach(split_arrow); - split_arrow->attach(subprocess_arrow); - subprocess_arrow->attach(merge_arrow); - merge_arrow->attach(proc_arrow); + auto topology = app.GetService(); + topology->set_configure_fn([&](JTopologyBuilder& topology) { + auto source_arrow = new JEventSourceArrow("simpleSource", + {new SimpleSource}, + &events_in, + topology.event_pool); + auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, + nullptr, topology.event_pool); + proc_arrow->add_processor(new SimpleProcessor); + + topology.arrows.push_back(source_arrow); + topology.arrows.push_back(split_arrow); + topology.arrows.push_back(subprocess_arrow); + topology.arrows.push_back(merge_arrow); + topology.arrows.push_back(proc_arrow); + source_arrow->attach(split_arrow); + split_arrow->attach(subprocess_arrow); + subprocess_arrow->attach(merge_arrow); + merge_arrow->attach(proc_arrow); + }); app.Run(true); } diff --git a/src/programs/unit_tests/TestTopologyComponents.h b/src/programs/unit_tests/Topology/TestTopologyComponents.h similarity index 97% rename from src/programs/unit_tests/TestTopologyComponents.h rename to src/programs/unit_tests/Topology/TestTopologyComponents.h index 2c69d9fc8..e4eaec88c 100644 --- a/src/programs/unit_tests/TestTopologyComponents.h +++ b/src/programs/unit_tests/Topology/TestTopologyComponents.h @@ -7,9 +7,9 @@ #include #include -#include +#include #include -#include +#include "MapArrow.h" struct RandIntSource : public JPipelineArrow { diff --git a/src/programs/unit_tests/TopologyTests.cc b/src/programs/unit_tests/Topology/TopologyTests.cc similarity index 96% rename from src/programs/unit_tests/TopologyTests.cc rename to src/programs/unit_tests/Topology/TopologyTests.cc index fd281995f..f66f08ef6 100644 --- a/src/programs/unit_tests/TopologyTests.cc +++ b/src/programs/unit_tests/Topology/TopologyTests.cc @@ -3,12 +3,12 @@ // Subject to the terms in the LICENSE file found in the top-level directory. #include "catch.hpp" -#include "JANA/Engine/JTopologyBuilder.h" + +#include "JANA/Topology/JTopologyBuilder.h" #include -#include +#include "TestTopologyComponents.h" #include -#include #include @@ -20,9 +20,6 @@ JArrowMetrics::Status step(JArrow* arrow) { return status; } -void log_status(JArrowTopology& /*topology*/) { - -} @@ -44,7 +41,7 @@ TEST_CASE("JTopology: Basic functionality") { auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); auto sum_everything = new SumSink("sum_everything", q3, p2); - auto topology = std::make_shared(); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); @@ -56,7 +53,7 @@ TEST_CASE("JTopology: Basic functionality") { topology->arrows.push_back(sum_everything); auto logger = JLogger(JLogger::Level::INFO); - topology->m_logger = logger; + topology->SetLogger(logger); emit_rand_ints->set_logger(logger); multiply_by_two->set_logger(logger); subtract_one->set_logger(logger); @@ -245,21 +242,17 @@ TEST_CASE("JTopology: Basic functionality") { REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Finalized); REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Finalized); - log_status(*topology); - } SECTION("Running from inside JApplication returns the correct answer") { JApplication app; - auto builder = app.GetService(); - builder->set(topology); + app.ProvideService(topology); // Override the builtin one REQUIRE(sum_everything->sum == 0); app.Run(true); auto scheduler = app.GetService()->get_scheduler(); - auto ts = scheduler->get_topology_state(); REQUIRE(ts.current_topology_status == JScheduler::TopologyStatus::Finalized); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); diff --git a/src/programs/unit_tests/JAutoactivableTests.cc b/src/programs/unit_tests/Utils/JAutoactivableTests.cc similarity index 100% rename from src/programs/unit_tests/JAutoactivableTests.cc rename to src/programs/unit_tests/Utils/JAutoactivableTests.cc diff --git a/src/programs/unit_tests/JCallGraphRecorderTests.cc b/src/programs/unit_tests/Utils/JCallGraphRecorderTests.cc similarity index 100% rename from src/programs/unit_tests/JCallGraphRecorderTests.cc rename to src/programs/unit_tests/Utils/JCallGraphRecorderTests.cc diff --git a/src/programs/unit_tests/JEventGroupTests.cc b/src/programs/unit_tests/Utils/JEventGroupTests.cc similarity index 100% rename from src/programs/unit_tests/JEventGroupTests.cc rename to src/programs/unit_tests/Utils/JEventGroupTests.cc diff --git a/src/programs/unit_tests/JStatusBitsTests.cc b/src/programs/unit_tests/Utils/JStatusBitsTests.cc similarity index 100% rename from src/programs/unit_tests/JStatusBitsTests.cc rename to src/programs/unit_tests/Utils/JStatusBitsTests.cc diff --git a/src/programs/unit_tests/JTablePrinterTests.cc b/src/programs/unit_tests/Utils/JTablePrinterTests.cc similarity index 100% rename from src/programs/unit_tests/JTablePrinterTests.cc rename to src/programs/unit_tests/Utils/JTablePrinterTests.cc