Skip to content

Commit

Permalink
Migrate remaining JEventProcessors to new callback style
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Apr 28, 2024
1 parent 97f5695 commit 181b71f
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 46 deletions.
18 changes: 10 additions & 8 deletions src/examples/StreamingExample/AHitBHitFuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ class AHitBHitFuser : public JEventProcessor {
public:
AHitAnomalyDetector(JApplication* app = nullptr, size_t delay_ms=1000)
: JEventProcessor(app)
, m_delay_ms(delay_ms) {};
, m_delay_ms(delay_ms) {
SetCallbackStyle(CallbackStyle::ExpertMode);
};

void Init() override {

}
void Process(const std::shared_ptr<const JEvent>& event) override {
void Process(const JEvent& event) override {

auto a_hits = event->Get<AHit>();
auto b_hits = event->Get<BHit>();
auto a_hits = event.Get<AHit>();
auto b_hits = event.Get<BHit>();
std::stringstream ss;
ss << "AHit/BHit fusion: Event #" << event->GetEventNumber() << " : {";
ss << "AHit/BHit fusion: Event #" << event.GetEventNumber() << " : {";
for (auto & hit : a_hits) {
ss << "(" << hit->E << "," << hit->t << "), ";
}
Expand All @@ -40,10 +42,10 @@ class AHitBHitFuser : public JEventProcessor {
consume_cpu_ms(m_delay_ms);


auto raw_hits = event->Get<AHit>("raw_hits");
auto raw_hits = event.Get<AHit>("raw_hits");


std::cout << "Processing event #" << event->GetEventNumber() << std::endl;
std::cout << "Processing event #" << event.GetEventNumber() << std::endl;
Serializer<AHit> serializer;
for (auto & hit : raw_hits) {
AHit* calibrated_hit = new DetectorAHit(*hit);
Expand All @@ -61,4 +63,4 @@ class AHitBHitFuser : public JEventProcessor {
};


#endif
#endif
7 changes: 4 additions & 3 deletions src/libraries/JANA/JCsvWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class JCsvWriter : public JEventProcessor {

JCsvWriter(std::string tag = "") : m_tag(std::move(tag)) {
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
};

void Init() override {
Expand All @@ -41,10 +42,10 @@ class JCsvWriter : public JEventProcessor {
m_dest_file.open(filename, std::fstream::out);
}

void Process(const std::shared_ptr<const JEvent>& event) override {
void Process(const JEvent& event) override {

auto event_nr = event->GetEventNumber();
auto jobjs = event->Get<T>(m_tag);
auto event_nr = event.GetEventNumber();
auto jobjs = event.Get<T>(m_tag);

std::lock_guard<std::mutex> lock(m_mutex);

Expand Down
11 changes: 7 additions & 4 deletions src/libraries/JANA/Podio/JEventProcessorPodio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

#include "JEventProcessorPodio.h"

JEventProcessorPodio::JEventProcessorPodio() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void JEventProcessorPodio::Init() {
// TODO: Obtain m_output_filename, etc, from parameter manager
// TODO: Does PODIO test that output file is writable and fail otherwise?
// We want to throw an exception immediately so that we don't waste compute time

m_writer = std::make_unique<podio::ROOTFrameWriter>(m_output_filename);
m_writer = std::make_unique<podio::ROOTFrameWriter>(m_output_filename());
}

void JEventProcessorPodio::Process(const std::shared_ptr<const JEvent> &event) {
void JEventProcessorPodio::Process(const JEvent& event) {

auto* frame = event->GetSingle<podio::Frame>();
auto* frame = event.GetSingle<podio::Frame>();
// This will throw if no PODIO frame is found. There will be no PODIO frame if the event source doesn't insert any
// PODIO classes, or there are no JFactoryPodioT's provided.
// Is this really the behavior we want? The alternatives are to silently not write anything, or to print a warning.
Expand Down
6 changes: 4 additions & 2 deletions src/libraries/JANA/Podio/JEventProcessorPodio.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@

class JEventProcessorPodio : public JEventProcessor {

std::string m_output_filename = "podio_output.root";
Parameter<std::string> m_output_filename {this, "podio:output_filename", "podio_output.root", "Output filename for JEventProcessorPodio"};

std::set<std::string> m_output_include_collections;
std::set<std::string> m_output_exclude_collections;
std::unique_ptr<podio::ROOTFrameWriter> m_writer;

public:
JEventProcessorPodio();
void Init() override;
void Process(const std::shared_ptr<const JEvent>&) override;
void Process(const JEvent&) override;
void Finish() override;

};
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Utils/JAutoActivator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ void JAutoActivator::Init() {
}
}

void JAutoActivator::Process(const std::shared_ptr<const JEvent> &event) {
void JAutoActivator::Process(const JEvent& event) {
for (const auto &pair: m_auto_activated_factories) {
auto name = pair.first;
auto tag = pair.second;
auto factory = event->GetFactory(name, tag);
auto factory = event.GetFactory(name, tag);
if (factory != nullptr) {
factory->Create(event); // This will do nothing if factory is already created
factory->Create(event.shared_from_this()); // This will do nothing if factory is already created
}
else {
LOG_ERROR(GetLogger()) << "Could not find factory with typename=" << name << ", tag=" << tag << LOG_END;
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Utils/JAutoActivator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class JAutoActivator : public JEventProcessor {
static std::pair<std::string, std::string> Split(std::string factory_name);
void AddAutoActivatedFactory(string factory_name, string factory_tag);
void Init() override;
void Process(const std::shared_ptr<const JEvent>& event) override;
void Process(const JEvent&) override;

private:
vector<pair<string,string>> m_auto_activated_factories;
Expand Down
10 changes: 6 additions & 4 deletions src/plugins/JTest/JTestPlotter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class JTestPlotter : public JEventProcessor {
public:

JTestPlotter() {
SetPrefix("jtest:plotter");
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Init() override {
Expand All @@ -32,14 +34,14 @@ class JTestPlotter : public JEventProcessor {
app->SetDefaultParameter("jtest:plotter_bytes_spread", m_write_spread, "Spread of bytes written during plotting");
}

void Process(const std::shared_ptr<const JEvent>& aEvent) override {
void Process(const JEvent& event) override {

// Read the track data
auto td = aEvent->GetSingle<JTestTrackData>();
auto td = event.GetSingle<JTestTrackData>();
read_memory(td->buffer);

// Read the extra data objects inserted by JTestTracker
aEvent->Get<JTestTracker::JTestTrackAuxilliaryData>();
event.Get<JTestTracker::JTestTrackAuxilliaryData>();

// Everything that happens after here is in a critical section
std::lock_guard<std::mutex> lock(m_mutex);
Expand All @@ -50,7 +52,7 @@ class JTestPlotter : public JEventProcessor {
// Write the histogram data
auto hd = new JTestHistogramData;
write_memory(hd->buffer, m_write_bytes, m_write_spread);
aEvent->Insert(hd);
event.Insert(hd);
}

};
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/JTestRoot/JTestRootProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

JTestRootProcessor::JTestRootProcessor() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void JTestRootProcessor::Process(const std::shared_ptr<const JEvent> &event) {
void JTestRootProcessor::Process(const JEvent&) {
// Get the cluster objects
auto clusters = event->Get<Cluster>();
auto clusters = event.Get<Cluster>();

// Lock mutex so operations on ROOT objects are serialized
std::lock_guard<std::mutex>lock(m_mutex);
Expand Down
3 changes: 1 addition & 2 deletions src/plugins/JTestRoot/JTestRootProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
class JTestRootProcessor : public JEventProcessor {

// Shared state (e.g. histograms, TTrees, TFiles) live
std::mutex m_mutex;

public:

JTestRootProcessor();
virtual ~JTestRootProcessor() = default;

void Process(const std::shared_ptr<const JEvent>& event) override;
void Process(const JEvent&) override;
};


Expand Down
11 changes: 7 additions & 4 deletions src/programs/unit_tests/BarrierEventTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ class BarrierSource : public JEventSource {
struct BarrierProcessor : public JEventProcessor {

public:
void Process(const std::shared_ptr<const JEvent>& event) override {
BarrierProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
void Process(const JEvent& event) override {

if (event->GetSequential()) {
if (event.GetSequential()) {
global_resource += 1;
LOG << "Barrier event = " << event->GetEventNumber() << ", writing global var = " << global_resource << LOG_END;
LOG << "Barrier event = " << event.GetEventNumber() << ", writing global var = " << global_resource << LOG_END;
}
else {
LOG << "Processing non-barrier event = " << event->GetEventNumber() << ", reading global var = " << global_resource << LOG_END;
LOG << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END;
}
}
};
Expand Down
3 changes: 0 additions & 3 deletions src/programs/unit_tests/ExactlyOnceTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ struct SimpleProcessor : public JEventProcessor {
init_count += 1;
}

void Process(const std::shared_ptr<const JEvent>&) override {
}

void Finish() override {
finish_count += 1;
}
Expand Down
8 changes: 6 additions & 2 deletions src/programs/unit_tests/MultiLevelTopologyTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,19 @@ struct MyEventProcessor : public JEventProcessor {
std::atomic_int process_called_count {0};
std::atomic_int finish_called_count {0};

MyEventProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Init() override {
init_called_count++;
}

void Process(const std::shared_ptr<const JEvent>& event) override {
void Process(const JEvent& event) override {
process_called_count++;
// TODO: Trigger cluster factory
// TODO: Validate that the clusters make sense
jout << "MyEventProcessor: Processing " << event->GetEventNumber() << jendl;
jout << "MyEventProcessor: Processing " << event.GetEventNumber() << jendl;
REQUIRE(init_called_count == 1);
REQUIRE(finish_called_count == 0);
}
Expand Down
5 changes: 4 additions & 1 deletion src/programs/unit_tests/ScaleTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ struct DummySource : public JEventSource {

struct DummyProcessor : public JEventProcessor {

void Process(const std::shared_ptr<const JEvent> &) override {
DummyProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
void Process(const JEvent&) override {
consume_cpu_ms(100);
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
Expand Down
7 changes: 5 additions & 2 deletions src/programs/unit_tests/SubeventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ TEST_CASE("Basic subevent arrow functionality") {
};

struct SimpleProcessor : public JEventProcessor {
void Process(const std::shared_ptr<const JEvent>& event) {
auto outputs = event->Get<MyOutput>();
SimpleProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
void Process(const JEvent& event) {
auto outputs = event.Get<MyOutput>();
REQUIRE(outputs.size() == 4);
REQUIRE(outputs[0]->z == 25.6f);
REQUIRE(outputs[1]->z == 26.5f);
Expand Down
5 changes: 4 additions & 1 deletion src/programs/unit_tests/TerminationTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ struct CountingProcessor : public JEventProcessor {
std::atomic_int processed_count {0};
std::atomic_int finish_call_count {0};

CountingProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Init() override {}

void Process(const std::shared_ptr<const JEvent>& /*event*/) override {
void Process(const JEvent&) override {
processed_count += 1;
// jout << "Processing " << event->GetEventNumber() << jendl;
REQUIRE(finish_call_count == 0);
Expand Down
6 changes: 4 additions & 2 deletions src/programs/unit_tests/TimeoutTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ struct ProcessorWithTimeout : public JEventProcessor {
int first_event_delay_ms = 0)
: timeout_on_event_nr(timeout_on_event_nr)
, first_event_delay_ms(first_event_delay_ms)
{ }
{
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Init() override {}

void Process(const std::shared_ptr<const JEvent>&) override {
void Process(const JEvent&) override {
processed_count += 1;
if (processed_count == 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(first_event_delay_ms));
Expand Down
6 changes: 4 additions & 2 deletions src/programs/unit_tests/UserExceptionTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ struct FlakyProcessor : public JEventProcessor {
: init_excepts(init_excepts)
, process_excepts(process_excepts)
, finish_excepts(finish_excepts)
{};
{
SetCallbackStyle(CallbackStyle::ExpertMode);
};

void Init() override {
if (init_excepts) {
throw JException("Unable to init!");
}
};

void Process(const std::shared_ptr<const JEvent>&) override {
void Process(const JEvent&) override {
if (process_excepts) {
throw JException("Unable to process!");
}
Expand Down

0 comments on commit 181b71f

Please sign in to comment.