Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: Separate "Engine" and "Topology" layers #293

Merged
merged 8 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

#include <JANA/JApplication.h>
#include <JANA/JObject.h>
#include <JANA/Engine/JSubeventArrow.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>
#include "JANA/Engine/JTopologyBuilder.h"

#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Topology/JEventProcessorArrow.h>
#include <JANA/Topology/JSubeventArrow.h>
#include "JANA/Topology/JTopologyBuilder.h"


struct MyInput : public JObject {
Expand Down Expand Up @@ -82,16 +85,6 @@ struct SimpleProcessor : public JEventProcessor {

int main() {

MyProcessor processor;
JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

JApplication app;
app.SetParameterValue("log:info", "JWorker,JScheduler,JArrowProcessingController,JEventProcessorArrow");
app.SetTimeoutEnabled(false);
Expand All @@ -100,25 +93,39 @@ int main() {
auto source = new SimpleSource();
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource
MyProcessor processor;

auto topology = app.GetService<JTopologyBuilder>();
topology->set_configure_fn([&](JTopologyBuilder& builder) {

JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
builder.arrows.push_back(split_arrow);
builder.arrows.push_back(subprocess_arrow);
builder.arrows.push_back(merge_arrow);
builder.arrows.push_back(proc_arrow);

source_arrow->attach(split_arrow);
split_arrow->attach(subprocess_arrow);
subprocess_arrow->attach(merge_arrow);
merge_arrow->attach(proc_arrow);
});

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

topology->arrows.push_back(source_arrow);
topology->arrows.push_back(split_arrow);
topology->arrows.push_back(subprocess_arrow);
topology->arrows.push_back(merge_arrow);
topology->arrows.push_back(proc_arrow);

source_arrow->attach(split_arrow);
split_arrow->attach(subprocess_arrow);
subprocess_arrow->attach(merge_arrow);
merge_arrow->attach(proc_arrow);

app.Run(true);

Expand Down
41 changes: 20 additions & 21 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,31 @@ set(JANA2_SOURCES
JMultifactory.h
JService.cc

Engine/JArrow.h
Engine/JArrowMetrics.h
Engine/JArrowPerfSummary.cc
Engine/JArrowPerfSummary.h
Engine/JArrowProcessingController.cc
Engine/JArrowProcessingController.h
Engine/JArrowTopology.cc
Engine/JArrowTopology.h
Engine/JEventProcessorArrow.cc
Engine/JEventProcessorArrow.h
Engine/JEventSourceArrow.cc
Engine/JEventSourceArrow.h
Engine/JEventMapArrow.h
Engine/JEventMapArrow.cc
Engine/JPool.h

Engine/JMailbox.h
Engine/JScheduler.cc
Engine/JScheduler.h
Engine/JSubeventArrow.h
Engine/JWorker.h
Engine/JWorker.cc
Engine/JWorkerMetrics.h
Engine/JTopologyBuilder.h
Engine/JPerfMetrics.cc
Engine/JPerfMetrics.h
Engine/JPerfSummary.cc
Engine/JPerfSummary.h

Topology/JArrow.h
Topology/JArrowMetrics.h
Topology/JEventProcessorArrow.cc
Topology/JEventProcessorArrow.h
Topology/JEventSourceArrow.cc
Topology/JEventSourceArrow.h
Topology/JEventMapArrow.h
Topology/JEventMapArrow.cc
Topology/JPool.h
Topology/JMailbox.h
Topology/JSubeventArrow.h
Topology/JTopologyBuilder.h
Topology/JTopologyBuilder.cc

Services/JComponentManager.cc
Services/JComponentManager.h
Expand All @@ -60,15 +61,11 @@ set(JANA2_SOURCES
Services/JParameterManager.h
Services/JPluginLoader.cc
Services/JPluginLoader.h
Services/JProcessingController.h
Services/JServiceLocator.h
Services/JEventGroupTracker.h

Status/JComponentSummary.h
Status/JComponentSummary.cc
Status/JPerfMetrics.cc
Status/JPerfMetrics.h
Status/JPerfSummary.h

Streaming/JDiscreteJoin.h
Streaming/JEventBuilder.h
Expand Down Expand Up @@ -245,6 +242,7 @@ file(GLOB jana_cli_headers "CLI/*.h*")
file(GLOB jana_compat_headers "Compatibility/*.h*")
file(GLOB jana_podio_headers "Podio/*.h*")
file(GLOB jana_omni_headers "Omni/*.h*")
file(GLOB jana_topology_headers "Topology/*.h*")

install(FILES ${jana_headers} DESTINATION include/JANA)
install(FILES ${jana_engine_headers} DESTINATION include/JANA/Engine)
Expand All @@ -256,6 +254,7 @@ install(FILES ${jana_calibs_headers} DESTINATION include/JANA/Calibrations)
install(FILES ${jana_cli_headers} DESTINATION include/JANA/CLI)
install(FILES ${jana_compat_headers} DESTINATION include/JANA/Compatibility)
install(FILES ${jana_omni_headers} DESTINATION include/JANA/Omni)
install(FILES ${jana_topology_headers} DESTINATION include/JANA/Topology)

if (${USE_PODIO})
install(FILES ${jana_podio_headers} DESTINATION include/JANA/Podio)
Expand Down
22 changes: 11 additions & 11 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// Subject to the terms in the LICENSE file found in the top-level directory.

#include <JANA/Engine/JArrowProcessingController.h>
#include <JANA/Engine/JArrowPerfSummary.h>
#include <JANA/Engine/JPerfSummary.h>
#include <JANA/Topology/JTopologyBuilder.h>
#include <JANA/Utils/JCpuInfo.h>
#include <JANA/JLogger.h>

Expand All @@ -18,6 +19,8 @@ void JArrowProcessingController::acquire_services(JServiceLocator * sl) {
m_worker_logger = ls->get_logger("JWorker");
m_scheduler_logger = ls->get_logger("JScheduler");

m_topology = sl->get<JTopologyBuilder>();

// Obtain timeouts from parameter manager
auto params = sl->get<JParameterManager>();
params->SetDefaultParameter("jana:timeout", m_timeout_s, "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
Expand Down Expand Up @@ -157,14 +160,14 @@ bool JArrowProcessingController::is_timed_out() {
// Probably want to refactor so that we only make one such call per ticker iteration.
// Since we are storing our metrics summary anyway, we could call measure_performance()
// and have print_report(), print_final_report(), is_timed_out(), etc use the cached version
auto metrics = measure_internal_performance();
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->event_pool_size / metrics->thread_count) {
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
else if (!m_topology->limit_total_events_in_flight) {
else if (!m_topology->m_limit_total_events_in_flight) {
// New events are constantly emitted, each of which may contain jfactorysets which need to be warmed up
timeout_s = m_warmup_timeout_s;
}
Expand Down Expand Up @@ -219,16 +222,16 @@ JArrowProcessingController::~JArrowProcessingController() {
}

void JArrowProcessingController::print_report() {
auto metrics = measure_internal_performance();
auto metrics = measure_performance();
LOG_INFO(m_logger) << "Running" << *metrics << LOG_END;
}

void JArrowProcessingController::print_final_report() {
auto metrics = measure_internal_performance();
auto metrics = measure_performance();
LOG_INFO(m_logger) << "Final Report" << *metrics << LOG_END;
}

std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_internal_performance() {
std::unique_ptr<const JPerfSummary> JArrowProcessingController::measure_performance() {

// Measure perf on all Workers first, as this will prompt them to publish
// any ArrowMetrics they have collected
Expand Down Expand Up @@ -275,12 +278,9 @@ std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_int
? std::numeric_limits<double>::infinity()
: m_perf_summary.avg_throughput_hz / tighter_bottleneck;

return std::unique_ptr<JArrowPerfSummary>(new JArrowPerfSummary(m_perf_summary));
return std::unique_ptr<JPerfSummary>(new JPerfSummary(m_perf_summary));
}

std::unique_ptr<const JPerfSummary> JArrowProcessingController::measure_performance() {
return measure_internal_performance();
}



Expand Down
40 changes: 18 additions & 22 deletions src/libraries/JANA/Engine/JArrowProcessingController.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,51 @@
#ifndef JANA2_JARROWPROCESSINGCONTROLLER_H
#define JANA2_JARROWPROCESSINGCONTROLLER_H

#include <JANA/Services/JProcessingController.h>

#include <JANA/Engine/JArrow.h>
#include <JANA/Topology/JTopologyBuilder.h>
#include <JANA/Engine/JWorker.h>
#include <JANA/Engine/JArrowTopology.h>
#include <JANA/Engine/JArrowPerfSummary.h>
#include <JANA/Engine/JPerfSummary.h>

#include <vector>

class JArrowProcessingController : public JProcessingController {
class JArrowProcessingController : public JService {
public:

explicit JArrowProcessingController(std::shared_ptr<JArrowTopology> topology) : m_topology(topology) {};
~JArrowProcessingController() override;
void acquire_services(JServiceLocator *) override;

void initialize() override;
void run(size_t nthreads) override;
void scale(size_t nthreads) override;
void initialize();
void run(size_t nthreads);
void scale(size_t nthreads);
void request_pause();
void wait_until_paused();
void request_stop() override;
void wait_until_stopped() override;
void request_stop();
void wait_until_stopped();

bool is_stopped() override;
bool is_finished() override;
bool is_timed_out() override;
bool is_excepted() override;
bool is_stopped();
bool is_finished();
bool is_timed_out();
bool is_excepted();

std::vector<JException> get_exceptions() const override;
std::vector<JException> get_exceptions() const;

std::unique_ptr<const JPerfSummary> measure_performance() override;
std::unique_ptr<const JArrowPerfSummary> measure_internal_performance();
std::unique_ptr<const JPerfSummary> measure_performance();

void print_report() override;
void print_final_report() override;
void print_report();
void print_final_report();

// This is so we can test
inline JScheduler* get_scheduler() { return m_scheduler; }


private:
std::shared_ptr<JTopologyBuilder> m_topology;

using jclock_t = std::chrono::steady_clock;
int m_timeout_s = 8;
int m_warmup_timeout_s = 30;

JArrowPerfSummary m_perf_summary;
std::shared_ptr<JArrowTopology> m_topology; // Owned by JArrowProcessingController
JPerfSummary m_perf_summary;
JScheduler* m_scheduler = nullptr;

std::vector<JWorker*> m_workers;
Expand Down
27 changes: 0 additions & 27 deletions src/libraries/JANA/Engine/JArrowTopology.cc

This file was deleted.

Loading
Loading