From c99c0b0d95c5173eec583a46e92d24d1e33b1fb3 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sun, 28 Apr 2024 14:36:43 -0400 Subject: [PATCH] Migrate remaining JEventProcessors to new callback style --- src/examples/StreamingExample/AHitBHitFuser.h | 18 ++++++++++-------- src/libraries/JANA/JCsvWriter.h | 7 ++++--- .../JANA/Podio/JEventProcessorPodio.cc | 11 +++++++---- .../JANA/Podio/JEventProcessorPodio.h | 6 ++++-- src/libraries/JANA/Utils/JAutoActivator.cc | 6 +++--- src/libraries/JANA/Utils/JAutoActivator.h | 2 +- src/plugins/JTest/JTestPlotter.h | 10 ++++++---- src/plugins/JTestRoot/JTestRootProcessor.cc | 5 +++-- src/plugins/JTestRoot/JTestRootProcessor.h | 3 +-- src/programs/unit_tests/BarrierEventTests.h | 11 +++++++---- src/programs/unit_tests/ExactlyOnceTests.h | 6 +++++- .../unit_tests/MultiLevelTopologyTests.h | 8 ++++++-- src/programs/unit_tests/ScaleTests.h | 5 ++++- src/programs/unit_tests/SubeventTests.cc | 7 +++++-- src/programs/unit_tests/TerminationTests.h | 5 ++++- src/programs/unit_tests/TimeoutTests.h | 6 ++++-- src/programs/unit_tests/UserExceptionTests.h | 6 ++++-- 17 files changed, 78 insertions(+), 44 deletions(-) diff --git a/src/examples/StreamingExample/AHitBHitFuser.h b/src/examples/StreamingExample/AHitBHitFuser.h index 46ee9a7c6..5f3e09951 100644 --- a/src/examples/StreamingExample/AHitBHitFuser.h +++ b/src/examples/StreamingExample/AHitBHitFuser.h @@ -17,17 +17,19 @@ class AHitBHitFuser : public JEventProcessor { public: AHitAnomalyDetector(JApplication* app = nullptr, size_t delay_ms=1000) : JEventProcessor(app) - , m_delay_ms(delay_ms) {}; + , m_delay_ms(delay_ms) { + SetCallbackStyle(CallbackStyle::ExpertMode); + }; void Init() override { } - void Process(const std::shared_ptr& event) override { + void Process(const JEvent& event) override { - auto a_hits = event->Get(); - auto b_hits = event->Get(); + auto a_hits = event.Get(); + auto b_hits = event.Get(); std::stringstream ss; - ss << "AHit/BHit fusion: Event #" << event->GetEventNumber() << " : {"; + ss << "AHit/BHit fusion: Event #" << event.GetEventNumber() << " : {"; for (auto & hit : a_hits) { ss << "(" << hit->E << "," << hit->t << "), "; } @@ -40,10 +42,10 @@ class AHitBHitFuser : public JEventProcessor { consume_cpu_ms(m_delay_ms); - auto raw_hits = event->Get("raw_hits"); + auto raw_hits = event.Get("raw_hits"); - std::cout << "Processing event #" << event->GetEventNumber() << std::endl; + std::cout << "Processing event #" << event.GetEventNumber() << std::endl; Serializer serializer; for (auto & hit : raw_hits) { AHit* calibrated_hit = new DetectorAHit(*hit); @@ -61,4 +63,4 @@ class AHitBHitFuser : public JEventProcessor { }; -#endif \ No newline at end of file +#endif diff --git a/src/libraries/JANA/JCsvWriter.h b/src/libraries/JANA/JCsvWriter.h index de6d7f677..626a61fe3 100644 --- a/src/libraries/JANA/JCsvWriter.h +++ b/src/libraries/JANA/JCsvWriter.h @@ -25,6 +25,7 @@ class JCsvWriter : public JEventProcessor { JCsvWriter(std::string tag = "") : m_tag(std::move(tag)) { SetTypeName(NAME_OF_THIS); + SetCallbackStyle(CallbackStyle::ExpertMode); }; void Init() override { @@ -41,10 +42,10 @@ class JCsvWriter : public JEventProcessor { m_dest_file.open(filename, std::fstream::out); } - void Process(const std::shared_ptr& event) override { + void Process(const JEvent& event) override { - auto event_nr = event->GetEventNumber(); - auto jobjs = event->Get(m_tag); + auto event_nr = event.GetEventNumber(); + auto jobjs = event.Get(m_tag); std::lock_guard lock(m_mutex); diff --git a/src/libraries/JANA/Podio/JEventProcessorPodio.cc b/src/libraries/JANA/Podio/JEventProcessorPodio.cc index 7825054dd..3ec69ff75 100644 --- a/src/libraries/JANA/Podio/JEventProcessorPodio.cc +++ b/src/libraries/JANA/Podio/JEventProcessorPodio.cc @@ -5,17 +5,20 @@ #include "JEventProcessorPodio.h" +JEventProcessorPodio::JEventProcessorPodio() { + SetCallbackStyle(CallbackStyle::ExpertMode); +} + void JEventProcessorPodio::Init() { - // TODO: Obtain m_output_filename, etc, from parameter manager // TODO: Does PODIO test that output file is writable and fail otherwise? // We want to throw an exception immediately so that we don't waste compute time - m_writer = std::make_unique(m_output_filename); + m_writer = std::make_unique(m_output_filename()); } -void JEventProcessorPodio::Process(const std::shared_ptr &event) { +void JEventProcessorPodio::Process(const JEvent& event) { - auto* frame = event->GetSingle(); + auto* frame = event.GetSingle(); // This will throw if no PODIO frame is found. There will be no PODIO frame if the event source doesn't insert any // PODIO classes, or there are no JFactoryPodioT's provided. // Is this really the behavior we want? The alternatives are to silently not write anything, or to print a warning. diff --git a/src/libraries/JANA/Podio/JEventProcessorPodio.h b/src/libraries/JANA/Podio/JEventProcessorPodio.h index 9d7d378db..c1f54d2cf 100644 --- a/src/libraries/JANA/Podio/JEventProcessorPodio.h +++ b/src/libraries/JANA/Podio/JEventProcessorPodio.h @@ -11,14 +11,16 @@ class JEventProcessorPodio : public JEventProcessor { - std::string m_output_filename = "podio_output.root"; + Parameter m_output_filename {this, "podio:output_filename", "podio_output.root", "Output filename for JEventProcessorPodio"}; + std::set m_output_include_collections; std::set m_output_exclude_collections; std::unique_ptr m_writer; public: + JEventProcessorPodio(); void Init() override; - void Process(const std::shared_ptr&) override; + void Process(const JEvent&) override; void Finish() override; }; diff --git a/src/libraries/JANA/Utils/JAutoActivator.cc b/src/libraries/JANA/Utils/JAutoActivator.cc index a2d199204..e986e8f6e 100644 --- a/src/libraries/JANA/Utils/JAutoActivator.cc +++ b/src/libraries/JANA/Utils/JAutoActivator.cc @@ -69,13 +69,13 @@ void JAutoActivator::Init() { } } -void JAutoActivator::Process(const std::shared_ptr &event) { +void JAutoActivator::Process(const JEvent& event) { for (const auto &pair: m_auto_activated_factories) { auto name = pair.first; auto tag = pair.second; - auto factory = event->GetFactory(name, tag); + auto factory = event.GetFactory(name, tag); if (factory != nullptr) { - factory->Create(event); // This will do nothing if factory is already created + factory->Create(event.shared_from_this()); // This will do nothing if factory is already created } else { LOG_ERROR(GetLogger()) << "Could not find factory with typename=" << name << ", tag=" << tag << LOG_END; diff --git a/src/libraries/JANA/Utils/JAutoActivator.h b/src/libraries/JANA/Utils/JAutoActivator.h index 35d7eb410..c3658a682 100644 --- a/src/libraries/JANA/Utils/JAutoActivator.h +++ b/src/libraries/JANA/Utils/JAutoActivator.h @@ -19,7 +19,7 @@ class JAutoActivator : public JEventProcessor { static std::pair Split(std::string factory_name); void AddAutoActivatedFactory(string factory_name, string factory_tag); void Init() override; - void Process(const std::shared_ptr& event) override; + void Process(const JEvent&) override; private: vector> m_auto_activated_factories; diff --git a/src/plugins/JTest/JTestPlotter.h b/src/plugins/JTest/JTestPlotter.h index ed9503a83..a8b882ef6 100644 --- a/src/plugins/JTest/JTestPlotter.h +++ b/src/plugins/JTest/JTestPlotter.h @@ -21,7 +21,9 @@ class JTestPlotter : public JEventProcessor { public: JTestPlotter() { + SetPrefix("jtest:plotter"); SetTypeName(NAME_OF_THIS); + SetCallbackStyle(CallbackStyle::ExpertMode); } void Init() override { @@ -32,14 +34,14 @@ class JTestPlotter : public JEventProcessor { app->SetDefaultParameter("jtest:plotter_bytes_spread", m_write_spread, "Spread of bytes written during plotting"); } - void Process(const std::shared_ptr& aEvent) override { + void Process(const JEvent& event) override { // Read the track data - auto td = aEvent->GetSingle(); + auto td = event.GetSingle(); read_memory(td->buffer); // Read the extra data objects inserted by JTestTracker - aEvent->Get(); + event.Get(); // Everything that happens after here is in a critical section std::lock_guard lock(m_mutex); @@ -50,7 +52,7 @@ class JTestPlotter : public JEventProcessor { // Write the histogram data auto hd = new JTestHistogramData; write_memory(hd->buffer, m_write_bytes, m_write_spread); - aEvent->Insert(hd); + event.Insert(hd); } }; diff --git a/src/plugins/JTestRoot/JTestRootProcessor.cc b/src/plugins/JTestRoot/JTestRootProcessor.cc index 2629bb5e5..3e53c0cb0 100644 --- a/src/plugins/JTestRoot/JTestRootProcessor.cc +++ b/src/plugins/JTestRoot/JTestRootProcessor.cc @@ -16,11 +16,12 @@ JTestRootProcessor::JTestRootProcessor() { SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name + SetCallbackStyle(CallbackStyle::ExpertMode); } -void JTestRootProcessor::Process(const std::shared_ptr &event) { +void JTestRootProcessor::Process(const JEvent&) { // Get the cluster objects - auto clusters = event->Get(); + auto clusters = event.Get(); // Lock mutex so operations on ROOT objects are serialized std::lock_guardlock(m_mutex); diff --git a/src/plugins/JTestRoot/JTestRootProcessor.h b/src/plugins/JTestRoot/JTestRootProcessor.h index 4840634cd..07fe3cbef 100644 --- a/src/plugins/JTestRoot/JTestRootProcessor.h +++ b/src/plugins/JTestRoot/JTestRootProcessor.h @@ -9,14 +9,13 @@ class JTestRootProcessor : public JEventProcessor { // Shared state (e.g. histograms, TTrees, TFiles) live - std::mutex m_mutex; public: JTestRootProcessor(); virtual ~JTestRootProcessor() = default; - void Process(const std::shared_ptr& event) override; + void Process(const JEvent&) override; }; diff --git a/src/programs/unit_tests/BarrierEventTests.h b/src/programs/unit_tests/BarrierEventTests.h index 761b98096..5843c9854 100644 --- a/src/programs/unit_tests/BarrierEventTests.h +++ b/src/programs/unit_tests/BarrierEventTests.h @@ -46,14 +46,17 @@ class BarrierSource : public JEventSource { struct BarrierProcessor : public JEventProcessor { public: - void Process(const std::shared_ptr& event) override { + BarrierProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Process(const JEvent& event) override { - if (event->GetSequential()) { + if (event.GetSequential()) { global_resource += 1; - LOG << "Barrier event = " << event->GetEventNumber() << ", writing global var = " << global_resource << LOG_END; + LOG << "Barrier event = " << event.GetEventNumber() << ", writing global var = " << global_resource << LOG_END; } else { - LOG << "Processing non-barrier event = " << event->GetEventNumber() << ", reading global var = " << global_resource << LOG_END; + LOG << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; } } }; diff --git a/src/programs/unit_tests/ExactlyOnceTests.h b/src/programs/unit_tests/ExactlyOnceTests.h index 62b3f0e19..983af8787 100644 --- a/src/programs/unit_tests/ExactlyOnceTests.h +++ b/src/programs/unit_tests/ExactlyOnceTests.h @@ -41,11 +41,15 @@ struct SimpleProcessor : public JEventProcessor { std::atomic_int init_count {0}; std::atomic_int finish_count {0}; + SimpleProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Init() override { init_count += 1; } - void Process(const std::shared_ptr&) override { + void Process(const JEvent&) override { } void Finish() override { diff --git a/src/programs/unit_tests/MultiLevelTopologyTests.h b/src/programs/unit_tests/MultiLevelTopologyTests.h index 46e86b917..2798c88da 100644 --- a/src/programs/unit_tests/MultiLevelTopologyTests.h +++ b/src/programs/unit_tests/MultiLevelTopologyTests.h @@ -84,15 +84,19 @@ struct MyEventProcessor : public JEventProcessor { std::atomic_int process_called_count {0}; std::atomic_int finish_called_count {0}; + MyEventProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Init() override { init_called_count++; } - void Process(const std::shared_ptr& event) override { + void Process(const JEvent& event) override { process_called_count++; // TODO: Trigger cluster factory // TODO: Validate that the clusters make sense - jout << "MyEventProcessor: Processing " << event->GetEventNumber() << jendl; + jout << "MyEventProcessor: Processing " << event.GetEventNumber() << jendl; REQUIRE(init_called_count == 1); REQUIRE(finish_called_count == 0); } diff --git a/src/programs/unit_tests/ScaleTests.h b/src/programs/unit_tests/ScaleTests.h index 23e7cb49d..7e7e0aaaf 100644 --- a/src/programs/unit_tests/ScaleTests.h +++ b/src/programs/unit_tests/ScaleTests.h @@ -25,7 +25,10 @@ struct DummySource : public JEventSource { struct DummyProcessor : public JEventProcessor { - void Process(const std::shared_ptr &) override { + DummyProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Process(const JEvent&) override { consume_cpu_ms(100); std::this_thread::sleep_for(std::chrono::nanoseconds(1)); } diff --git a/src/programs/unit_tests/SubeventTests.cc b/src/programs/unit_tests/SubeventTests.cc index 1d20470e4..aa1c5343e 100644 --- a/src/programs/unit_tests/SubeventTests.cc +++ b/src/programs/unit_tests/SubeventTests.cc @@ -160,8 +160,11 @@ TEST_CASE("Basic subevent arrow functionality") { }; struct SimpleProcessor : public JEventProcessor { - void Process(const std::shared_ptr& event) { - auto outputs = event->Get(); + SimpleProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Process(const JEvent& event) { + auto outputs = event.Get(); REQUIRE(outputs.size() == 4); REQUIRE(outputs[0]->z == 25.6f); REQUIRE(outputs[1]->z == 26.5f); diff --git a/src/programs/unit_tests/TerminationTests.h b/src/programs/unit_tests/TerminationTests.h index bdfde313c..27a985411 100644 --- a/src/programs/unit_tests/TerminationTests.h +++ b/src/programs/unit_tests/TerminationTests.h @@ -63,10 +63,13 @@ struct CountingProcessor : public JEventProcessor { std::atomic_int processed_count {0}; std::atomic_int finish_call_count {0}; + CountingProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } void Init() override {} - void Process(const std::shared_ptr& /*event*/) override { + void Process(const JEvent&) override { processed_count += 1; // jout << "Processing " << event->GetEventNumber() << jendl; REQUIRE(finish_call_count == 0); diff --git a/src/programs/unit_tests/TimeoutTests.h b/src/programs/unit_tests/TimeoutTests.h index d1587f497..b8e990fd8 100644 --- a/src/programs/unit_tests/TimeoutTests.h +++ b/src/programs/unit_tests/TimeoutTests.h @@ -61,11 +61,13 @@ struct ProcessorWithTimeout : public JEventProcessor { int first_event_delay_ms = 0) : timeout_on_event_nr(timeout_on_event_nr) , first_event_delay_ms(first_event_delay_ms) - { } + { + SetCallbackStyle(CallbackStyle::ExpertMode); + } void Init() override {} - void Process(const std::shared_ptr&) override { + void Process(const JEvent&) override { processed_count += 1; if (processed_count == 1) { std::this_thread::sleep_for(std::chrono::milliseconds(first_event_delay_ms)); diff --git a/src/programs/unit_tests/UserExceptionTests.h b/src/programs/unit_tests/UserExceptionTests.h index 7fa6e5f26..aa27a520a 100644 --- a/src/programs/unit_tests/UserExceptionTests.h +++ b/src/programs/unit_tests/UserExceptionTests.h @@ -50,7 +50,9 @@ struct FlakyProcessor : public JEventProcessor { : init_excepts(init_excepts) , process_excepts(process_excepts) , finish_excepts(finish_excepts) - {}; + { + SetCallbackStyle(CallbackStyle::ExpertMode); + }; void Init() override { if (init_excepts) { @@ -58,7 +60,7 @@ struct FlakyProcessor : public JEventProcessor { } }; - void Process(const std::shared_ptr&) override { + void Process(const JEvent&) override { if (process_excepts) { throw JException("Unable to process!"); }