From 8db9e6fac91e73d8bc14f50e5b717124f1704277 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 26 Apr 2024 13:30:40 -0400 Subject: [PATCH 01/12] Get rid of std::call_once in JMultifactory --- src/libraries/JANA/JMultifactory.cc | 30 +++++++++++++++++++---------- src/libraries/JANA/JMultifactory.h | 3 --- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/libraries/JANA/JMultifactory.cc b/src/libraries/JANA/JMultifactory.cc index 2a215a834..4a5842248 100644 --- a/src/libraries/JANA/JMultifactory.cc +++ b/src/libraries/JANA/JMultifactory.cc @@ -9,31 +9,35 @@ void JMultifactory::Execute(const std::shared_ptr& event) { + std::lock_guard lock(m_mutex); #ifdef JANA2_HAVE_PODIO if (mNeedPodio) { mPodioFrame = GetOrCreateFrame(event); } #endif - auto run_number = event->GetRunNumber(); - try { - std::call_once(m_is_initialized, &JMultifactory::Init, this); - } - catch(std::exception &e) { - // Rethrow as a JException so that we can add context information - throw JException(e.what()); + if (m_status == Status::Uninitialized) { + try { + Init(); + m_status = Status::Initialized; + } + catch(std::exception &e) { + // Rethrow as a JException so that we can add context information + throw JException(e.what()); + } } + auto run_number = event->GetRunNumber(); if (m_last_run_number == -1) { // This is the very first run try { BeginRun(event); + m_last_run_number = run_number; } catch(std::exception &e) { // Rethrow as a JException so that we can add context information throw JException(e.what()); } - m_last_run_number = run_number; } else if (m_last_run_number != run_number) { // This is a later run, and it has changed @@ -46,12 +50,12 @@ void JMultifactory::Execute(const std::shared_ptr& event) { } try { BeginRun(event); + m_last_run_number = run_number; } catch(std::exception &e) { // Rethrow as a JException so that we can add context information throw JException(e.what()); } - m_last_run_number = run_number; } try { Process(event); @@ -63,8 +67,14 @@ void JMultifactory::Execute(const std::shared_ptr& event) { } void JMultifactory::Release() { + std::lock_guard lock(m_mutex); try { - std::call_once(m_is_finished, &JMultifactory::Finish, this); + // Only call Finish() if we actually initialized + // Only call Finish() once + if (m_status == Status::Initialized) { + Finish(); + m_status = Status::Finalized; + } } catch(std::exception &e) { // Rethrow as a JException so that we can add context information diff --git a/src/libraries/JANA/JMultifactory.h b/src/libraries/JANA/JMultifactory.h index 893643f10..ffb7912d8 100644 --- a/src/libraries/JANA/JMultifactory.h +++ b/src/libraries/JANA/JMultifactory.h @@ -64,9 +64,6 @@ class JMultifactory : public jana::omni::JComponent, JFactorySet mHelpers; // This has ownership UNTIL JFactorySet::Add() takes it over - std::once_flag m_is_initialized; - std::once_flag m_is_finished; - int32_t m_last_run_number = -1; // Remember where we are in the stream so that the correct sequence of callbacks get called. // However, don't worry about a Status variable. Every time Execute() gets called, so does Process(). // The JMultifactoryHelpers will control calls to Execute(). From 7d8116ffa21c9815caf227e8d0930eaa361b6bb3 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 26 Apr 2024 17:30:21 -0400 Subject: [PATCH 02/12] JEventSource no longer uses exceptions for flow control This addresses issue #212. Note that this doesn't break backwards compatibility yet because it has to be enabled via SetCallbackStyle(CallbackStyle::Classic) --- src/libraries/JANA/JEventSource.h | 105 +++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 731f6aedb..32c2cf1e9 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -21,6 +21,8 @@ class JFactory; class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutputs { public: + + enum class Result { Success, FailureTryAgain, FailureFinished }; /// ReturnStatus describes what happened the last time a GetEvent() was attempted. /// If GetEvent() reaches an error state, it should throw a JException instead. @@ -54,6 +56,17 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu virtual void Open() {} + // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::Classic. + // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception. + // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another + // entry in the file, or another message waiting at the socket, the user reads the data into the JEvent and returns + // Result::Success, at which point JANA pushes the JEvent onto the downstream queue. If there is no data waiting yet, + // the user returns Result::FailureTryAgain, at which point JANA recycles the JEvent to the pool. If there is no more + // data, the user returns Result::FailureFinished, at which point JANA recycles the JEvent to the pool and calls Close(). + + virtual Result Emit(JEvent&) { return Result::Success; }; + + /// `Close` is called by JANA when it is finished accepting events from this event source. Here is where you should /// cleanly close files, sockets, etc. Although GetEvent() knows when (for instance) there are no more events in a /// file, the logic for closing needs to live here because there are other ways a computation may end besides @@ -80,7 +93,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu /// the event stream, the implementor should throw the corresponding `RETURN_STATUS`. The user should NEVER throw /// `RETURN_STATUS SUCCESS` because this will hurt performance. Instead, they should simply return normally. - virtual void GetEvent(std::shared_ptr) = 0; + virtual void GetEvent(std::shared_ptr) {}; virtual void Preprocess(const JEvent&) {}; @@ -198,10 +211,98 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu throw ex; } } - + ReturnStatus DoNext(std::shared_ptr event) { std::lock_guard lock(m_mutex); // In general, DoNext must be synchronized. + + if (m_callback_style == CallbackStyle::Compatibility) { + return DoNextCompatibility(event); + } + + auto first_evt_nr = m_nskip; + auto last_evt_nr = m_nevents + m_nskip; + + try { + if (m_status == Status::Uninitialized) { + DoInitialize(false); + } + if (m_status == Status::Initialized) { + if (m_nevents != 0 && (m_event_count == last_evt_nr)) { + // We exit early (and recycle) because we hit our jana:nevents limit + DoFinalize(false); + return ReturnStatus::Finished; // ReturnStatus::Finished is a failure condition + } + // If we reach this point, we will need to actually read an event + + // We configure the event + event->SetEventNumber(m_event_count); // Default event number to event count + event->SetJApplication(m_app); + event->SetJEventSource(this); + event->SetSequential(false); + event->GetJCallGraphRecorder()->Reset(); + + // Now we call the new-style interface + auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) + auto result = Emit(*event); + event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); + + if (result == Result::Success) { + // We end up here if we read an entry in our file or retrieved a message from our socket, + // and believe we could obtain another one immediately if we wanted to + for (auto* output : m_outputs) { + output->InsertCollection(*event); + } + m_event_count += 1; + if (m_event_count < first_evt_nr) { + // We immediately throw away this whole event because of nskip + // (although really we should be handling this with Seek()) + return ReturnStatus::TryAgain; // Failure condition + } + return ReturnStatus::Success; + } + else if (result == Result::FailureFinished) { + // We end up here if we tried to read an entry in a file, but found EOF + // or if we received a message from a socket that contained no data and indicated no more data will be coming + DoFinalize(false); + return ReturnStatus::Finished; + } + else if (result == Result::FailureTryAgain) { + // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet + // or if we polled the socket, found no new messages, but still expect messages later + return ReturnStatus::TryAgain; + } + else { + throw JException("Invalid JEventSource::Result value!"); + } + } + else { // status == Finalized + return ReturnStatus::Finished; + } + } + catch (JException& ex) { + ex.plugin_name = m_plugin_name; + ex.component_name = m_type_name; + throw ex; + } + catch (std::exception& e){ + auto ex = JException("Exception in JEventSource::Emit(): %s", e.what()); + ex.nested_exception = std::current_exception(); + ex.plugin_name = m_plugin_name; + ex.component_name = m_type_name; + throw ex; + } + catch (...) { + auto ex = JException("Unknown exception in JEventSource::Emit()"); + ex.nested_exception = std::current_exception(); + ex.plugin_name = m_plugin_name; + ex.component_name = m_type_name; + throw ex; + } + } + + ReturnStatus DoNextCompatibility(std::shared_ptr event) { + auto first_evt_nr = m_nskip; auto last_evt_nr = m_nevents + m_nskip; From 80406481bb359211a4d724e502a808d5f6aa146f Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 26 Apr 2024 17:43:42 -0400 Subject: [PATCH 03/12] Merge JEventSource::ReturnStatus, Result --- .../JANA/Engine/JEventSourceArrow.cc | 4 +- src/libraries/JANA/JEventSource.h | 38 +++++++++---------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.cc b/src/libraries/JANA/Engine/JEventSourceArrow.cc index 8312890c0..9669ee910 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.cc +++ b/src/libraries/JANA/Engine/JEventSourceArrow.cc @@ -32,11 +32,11 @@ void JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Stat auto source_status = m_sources[m_current_source]->DoNext(*event); - if (source_status == JEventSource::ReturnStatus::Finished) { + if (source_status == JEventSource::Result::FailureFinished) { m_current_source++; // TODO: Adjust nskip and nevents for the new source } - else if (source_status == JEventSource::ReturnStatus::TryAgain){ + else if (source_status == JEventSource::Result::FailureTryAgain){ // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater success = false; arrow_status = JArrowMetrics::Status::ComeBackLater; diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 32c2cf1e9..a279b2a1e 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -22,12 +22,10 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu public: + /// Result describes what happened the last time a GetEvent() was attempted. + /// If Emit() or GetEvent() reaches an error state, it should throw a JException instead. enum class Result { Success, FailureTryAgain, FailureFinished }; - /// ReturnStatus describes what happened the last time a GetEvent() was attempted. - /// If GetEvent() reaches an error state, it should throw a JException instead. - enum class ReturnStatus { Success, TryAgain, Finished }; - // TODO: Deprecate me! /// The user is supposed to _throw_ RETURN_STATUS::kNO_MORE_EVENTS or kBUSY from GetEvent() enum class RETURN_STATUS { kSUCCESS, kNO_MORE_EVENTS, kBUSY, kTRY_AGAIN, kERROR, kUNKNOWN }; @@ -212,7 +210,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu } } - ReturnStatus DoNext(std::shared_ptr event) { + Result DoNext(std::shared_ptr event) { std::lock_guard lock(m_mutex); // In general, DoNext must be synchronized. @@ -231,7 +229,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu if (m_nevents != 0 && (m_event_count == last_evt_nr)) { // We exit early (and recycle) because we hit our jana:nevents limit DoFinalize(false); - return ReturnStatus::Finished; // ReturnStatus::Finished is a failure condition + return Result::FailureFinished; } // If we reach this point, we will need to actually read an event @@ -257,27 +255,27 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu if (m_event_count < first_evt_nr) { // We immediately throw away this whole event because of nskip // (although really we should be handling this with Seek()) - return ReturnStatus::TryAgain; // Failure condition + return Result::FailureTryAgain; } - return ReturnStatus::Success; + return Result::Success; } else if (result == Result::FailureFinished) { // We end up here if we tried to read an entry in a file, but found EOF // or if we received a message from a socket that contained no data and indicated no more data will be coming DoFinalize(false); - return ReturnStatus::Finished; + return Result::FailureFinished; } else if (result == Result::FailureTryAgain) { // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet // or if we polled the socket, found no new messages, but still expect messages later - return ReturnStatus::TryAgain; + return Result::FailureTryAgain; } else { throw JException("Invalid JEventSource::Result value!"); } } else { // status == Finalized - return ReturnStatus::Finished; + return Result::FailureFinished; } } catch (JException& ex) { @@ -301,7 +299,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu } } - ReturnStatus DoNextCompatibility(std::shared_ptr event) { + Result DoNextCompatibility(std::shared_ptr event) { auto first_evt_nr = m_nskip; auto last_evt_nr = m_nevents + m_nskip; @@ -318,11 +316,11 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu GetEvent(event); event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); m_event_count += 1; - return ReturnStatus::TryAgain; // Reject this event and recycle it + return Result::FailureTryAgain; // Reject this event and recycle it } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) { // Declare ourselves finished due to nevents DoFinalize(false); // Close out the event source as soon as it declares itself finished - return ReturnStatus::Finished; + return Result::FailureFinished; } else { // Actually emit an event. // GetEvent() expects the following things from its incoming JEvent @@ -338,22 +336,22 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu } event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); m_event_count += 1; - return ReturnStatus::Success; // Don't reject this event! + return Result::Success; // Don't reject this event! } } else if (m_status == Status::Finalized) { - return ReturnStatus::Finished; + return Result::FailureFinished; } else { - throw JException("Invalid ReturnStatus"); + throw JException("Invalid m_status"); } } catch (RETURN_STATUS rs) { if (rs == RETURN_STATUS::kNO_MORE_EVENTS) { DoFinalize(false); - return ReturnStatus::Finished; + return Result::FailureFinished; } else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) { - return ReturnStatus::TryAgain; + return Result::FailureTryAgain; } else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) { JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN"); @@ -362,7 +360,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu throw ex; } else { - return ReturnStatus::Success; + return Result::Success; } } catch (JException& ex) { From e687880d97f5eb3469ef9f5a6b3ffede2b1a72d6 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 26 Apr 2024 19:05:15 -0400 Subject: [PATCH 04/12] Rename callback styles --- src/libraries/JANA/JEventProcessor.h | 4 ++-- src/libraries/JANA/JEventSource.h | 4 ++-- src/libraries/JANA/JEventUnfolder.h | 4 ++-- src/libraries/JANA/Omni/JComponentFwd.h | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index fa5fa4fe2..4993b32ef 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -71,7 +71,7 @@ class JEventProcessor : public jana::omni::JComponent, // Also we don't have // a Preprocess(), so we don't technically need Init() here even - if (m_callback_style != CallbackStyle::Declarative) { + if (m_callback_style != CallbackStyle::DeclarativeMode) { DoReduce(e); // This does all the locking! } } @@ -101,7 +101,7 @@ class JEventProcessor : public jana::omni::JComponent, for (auto* input : m_inputs) { input->GetCollection(*e); } - if (m_callback_style == CallbackStyle::Declarative) { + if (m_callback_style == CallbackStyle::DeclarativeMode) { Process(e->GetRunNumber(), e->GetEventNumber(), e->GetEventIndex()); } else { diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index a279b2a1e..778a4a335 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -54,7 +54,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu virtual void Open() {} - // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::Classic. + // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::ExpertMode. // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception. // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another // entry in the file, or another message waiting at the socket, the user reads the data into the JEvent and returns @@ -214,7 +214,7 @@ class JEventSource : public jana::omni::JComponent, public jana::omni::JHasOutpu std::lock_guard lock(m_mutex); // In general, DoNext must be synchronized. - if (m_callback_style == CallbackStyle::Compatibility) { + if (m_callback_style == CallbackStyle::LegacyMode) { return DoNextCompatibility(event); } diff --git a/src/libraries/JANA/JEventUnfolder.h b/src/libraries/JANA/JEventUnfolder.h index abdfd0c13..bd877e915 100644 --- a/src/libraries/JANA/JEventUnfolder.h +++ b/src/libraries/JANA/JEventUnfolder.h @@ -104,7 +104,7 @@ class JEventUnfolder : public jana::omni::JComponent, for (auto* input : m_inputs) { input->PrefetchCollection(parent); } - if (m_callback_style != CallbackStyle::Declarative) { + if (m_callback_style != CallbackStyle::DeclarativeMode) { Preprocess(parent); } } @@ -137,7 +137,7 @@ class JEventUnfolder : public jana::omni::JComponent, for (auto* resource : m_resources) { resource->ChangeRun(parent.GetRunNumber(), m_app); } - if (m_callback_style == CallbackStyle::Declarative) { + if (m_callback_style == CallbackStyle::DeclarativeMode) { ChangeRun(parent.GetRunNumber()); } else { diff --git a/src/libraries/JANA/Omni/JComponentFwd.h b/src/libraries/JANA/Omni/JComponentFwd.h index 53c8c95de..321b8551c 100644 --- a/src/libraries/JANA/Omni/JComponentFwd.h +++ b/src/libraries/JANA/Omni/JComponentFwd.h @@ -21,7 +21,7 @@ namespace omni { struct JComponent { enum class Status { Uninitialized, Initialized, Finalized }; - enum class CallbackStyle { Compatibility, Classic, Declarative }; + enum class CallbackStyle { LegacyMode, ExpertMode, DeclarativeMode }; struct ParameterBase; struct ServiceBase; @@ -31,7 +31,7 @@ struct JComponent { std::vector m_services; JEventLevel m_level = JEventLevel::PhysicsEvent; - CallbackStyle m_callback_style = CallbackStyle::Compatibility; + CallbackStyle m_callback_style = CallbackStyle::LegacyMode; std::string m_prefix; std::string m_plugin_name; std::string m_type_name; From 892beea15795f606cfc5ebf699d0069b26400b8c Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 26 Apr 2024 22:15:18 -0400 Subject: [PATCH 05/12] Add test case for JEventSource::Emit --- src/programs/unit_tests/CMakeLists.txt | 1 + .../Components/JEventSourceTests.cc | 73 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/programs/unit_tests/Components/JEventSourceTests.cc diff --git a/src/programs/unit_tests/CMakeLists.txt b/src/programs/unit_tests/CMakeLists.txt index 5a97cc828..17c78c377 100644 --- a/src/programs/unit_tests/CMakeLists.txt +++ b/src/programs/unit_tests/CMakeLists.txt @@ -40,6 +40,7 @@ set(TEST_SOURCES MultiLevelTopologyTests.cc UnfoldTests.cc JComponentTests.cc + Components/JEventSourceTests.cc ) if (${USE_PODIO}) diff --git a/src/programs/unit_tests/Components/JEventSourceTests.cc b/src/programs/unit_tests/Components/JEventSourceTests.cc new file mode 100644 index 000000000..2ef433887 --- /dev/null +++ b/src/programs/unit_tests/Components/JEventSourceTests.cc @@ -0,0 +1,73 @@ + +#include "catch.hpp" + +#include + +struct MyEventSource : public JEventSource { + int open_count = 0; + int emit_count = 0; + int close_count = 0; + size_t events_in_file = 5; + + void Open() override { + LOG_INFO(GetLogger()) << "Open() called" << LOG_END; + open_count++; + } + Result Emit(JEvent&) override { + emit_count++; + + if (GetEventCount() >= events_in_file) { + LOG_INFO(GetLogger()) << "Emit() called, returning FailureFinished" << LOG_END; + return Result::FailureFinished; + } + LOG_INFO(GetLogger()) << "Emit() called, returning Success" << LOG_END; + return Result::Success; + } + void Close() override { + LOG_INFO(GetLogger()) << "Close() called" << LOG_END; + close_count++; + } +}; + +TEST_CASE("JEventSource_ExpertMode_EmitCount") { + + auto sut = new MyEventSource; + sut->SetCallbackStyle(MyEventSource::CallbackStyle::ExpertMode); + sut->SetTypeName("MyEventSource"); + + JApplication app; + app.SetParameterValue("log:global", "off"); + app.SetParameterValue("log:info", "MyEventSource"); + app.Add(sut); + + SECTION("ShutsSelfOff") { + LOG << "Running test: JEventSource_ExpertMode_EmitCount :: ShutsSelfOff" << LOG_END; + app.Run(); + REQUIRE(sut->open_count == 1); + REQUIRE(sut->emit_count == 6); // Emit called 5 times successfully and fails on the 6th + REQUIRE(sut->GetEventCount() == 5); // Emits 5 events successfully (including skipped) + REQUIRE(sut->close_count == 1); + } + + SECTION("LimitedByNEvents") { + LOG << "Running test: JEventSource_ExpertMode_EmitCount :: LimitedByNEvents" << LOG_END; + app.SetParameterValue("jana:nevents", 3); + app.Run(); + REQUIRE(sut->open_count == 1); + REQUIRE(sut->emit_count == 3); // Emit called 3 times successfully + REQUIRE(sut->GetEventCount() == 3); // Nevents limit discovered outside Emit + REQUIRE(sut->close_count == 1); + } + + SECTION("LimitedByNSkip") { + LOG << "Running test: JEventSource_ExpertMode_EmitCount :: LimitedByNSkip" << LOG_END; + app.SetParameterValue("jana:nskip", 3); + app.Run(); + REQUIRE(sut->open_count == 1); + REQUIRE(sut->emit_count == 6); // Emit called 5 times successfully and fails on the 6th + REQUIRE(sut->GetEventCount() == 5); // 5 events successfully emitted, 3 of which were (presumably) skipped + REQUIRE(sut->close_count == 1); + } +} + + From b45ed311a4c3dc5d9889f546bf9d83b64193ee6c Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 27 Apr 2024 17:57:23 -0400 Subject: [PATCH 06/12] JTest and Tutorial use exception-free JEventSource --- docs/Tutorial.md | 29 ++++++++++++-------- docs/tutorial.rst | 32 +++++++++++++--------- scripts/jana-generate.py | 39 ++++++++++++++++++++------- src/examples/Tutorial/RandomSource.cc | 28 +++++++++++++------ src/examples/Tutorial/RandomSource.h | 6 ++++- src/plugins/JTest/JTestParser.h | 10 ++++--- 6 files changed, 98 insertions(+), 46 deletions(-) diff --git a/docs/Tutorial.md b/docs/Tutorial.md index 54a38b935..aaca7c51c 100644 --- a/docs/Tutorial.md +++ b/docs/Tutorial.md @@ -135,7 +135,7 @@ extern "C" { void InitPlugin(JApplication *app) { InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); // <- ADD THIS LINE + app->Add(new RandomSource); // <- ADD THIS LINE } } ``` @@ -163,10 +163,12 @@ class RandomSource : public JEventSource { int m_max_emit_freq_hz = 100; // <- ADD THIS LINE public: + RandomSource(); RandomSource(std::string resource_name, JApplication* app); virtual ~RandomSource() = default; void Open() override; - void GetEvent(std::shared_ptr) override; + void Close() override; + Result Emit(JEvent& event) override; static std::string GetDescription(); }; ``` @@ -192,16 +194,18 @@ We can now use the value of `m_max_emit_freq_hz`, confident that it is consisten runtime configuration: ``` -void RandomSource::GetEvent(std::shared_ptr event) { +JEventSource::Result RandomSource::Emit(JEvent& event) { /// Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(22); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(22); /// Slow down event source // <- ADD THIS LINE auto delay_ms = std::chrono::milliseconds(1000/m_max_emit_freq_hz); // <- ADD THIS LINE std::this_thread::sleep_for(delay_ms); // <- ADD THIS LINE + + return Result::Success; } ``` @@ -280,7 +284,7 @@ logic, we want to be able to access them independently. #include "Hit.h" // ... -void RandomSource::GetEvent(std::shared_ptr event) { +Result RandomSource::Emit(JEvent& event) { // ... /// Insert simulated data into event // ADD ME @@ -290,8 +294,11 @@ void RandomSource::GetEvent(std::shared_ptr event) { hits.push_back(new Hit(0, 1, 1.0, 0)); // ADD ME hits.push_back(new Hit(1, 0, 1.0, 0)); // ADD ME hits.push_back(new Hit(1, 1, 1.0, 0)); // ADD ME - event->Insert(hits); // ADD ME - //event->Insert(hits, "fcal"); // If we used a tag + + event.Insert(hits); // ADD ME + //event.Insert(hits, "fcal"); // If we used a tag + + return Result::Success; } ``` @@ -317,7 +324,7 @@ void InitPlugin(JApplication* app) { InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); + app->Add(new RandomSource); app->Add(new JCsvWriter); // ADD ME //app->Add(new JCsvWriter("fcal")); // If we used a tag } @@ -535,7 +542,7 @@ void InitPlugin(JApplication* app) { InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); + app->Add(new RandomSource); app->Add(new JCsvWriter()); app->Add(new JFactoryGeneratorT); // ADD ME } @@ -586,7 +593,7 @@ void InitPlugin(JApplication* app) { InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - // app->Add(new RandomSource("random", app)); // REMOVE ME + // app->Add(new RandomSource); // REMOVE ME app->Add(new JEventSourceGeneratorT); // ADD ME app->Add(new JCsvWriter()); app->Add(new JFactoryGeneratorT); diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 8043df6eb..99dd6c92e 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -111,7 +111,7 @@ The modified ``QuickTuorial.cc`` file needs to have the new ``RandomSource.h`` h void InitPlugin(JApplication *app) { InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); // <- ADD THIS LINE + app->Add(new RandomSource); // <- ADD THIS LINE } } @@ -135,10 +135,12 @@ Because neither the source nor the processor are doing any ‘real work’, the int m_max_emit_freq_hz = 100; // <- ADD THIS LINE public: + RandomSource(); RandomSource(std::string resource_name, JApplication* app); virtual ~RandomSource() = default; void Open() override; - void GetEvent(std::shared_ptr) override; + void Close() override; + Result Emit(JEvent& event) override; static std::string GetDescription(); }; @@ -157,16 +159,18 @@ We can now use the value of ``m_max_emit_freq_hz``, confident that it is consist .. code-block:: console - void RandomSource::GetEvent(std::shared_ptr event) { + JEventSource::Result RandomSource::Emit(JEvent& event) { /// Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(22); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(22); /// Slow down event source // <- ADD THIS LINE auto delay_ms = std::chrono::milliseconds(1000/m_max_emit_freq_hz); // <- ADD THIS LINE std::this_thread::sleep_for(delay_ms); // <- ADD THIS LINE + + return Result::Success; } Finally, we can set this parameter on the command line and observe the throughput change accordingly: @@ -227,7 +231,7 @@ The pattern we use for inserting data into the event is simple: For data of type #include "Hit.h" // ... - void RandomSource::GetEvent(std::shared_ptr event) { + JEventSource::Result RandomSource::Emit(JEvent& event) { // ... /// Insert simulated data into event // ADD ME @@ -237,8 +241,10 @@ The pattern we use for inserting data into the event is simple: For data of type hits.push_back(new Hit(0, 1, 1.0, 0)); // ADD ME hits.push_back(new Hit(1, 0, 1.0, 0)); // ADD ME hits.push_back(new Hit(1, 1, 1.0, 0)); // ADD ME - event->Insert(hits); // ADD ME - //event->Insert(hits, "fcal"); // If we used a tag + event.Insert(hits); // ADD ME + //event.Insert(hits, "fcal"); // If we used a tag + + return Result::Success; } We now have ``Hits`` in our event stream. The next section will cover how the ``QuickTutorialProcessor`` should access them. However, we don’t need to create a custom JEventProcessor to examine our event stream. JANA provides a small utility called ``JCsvWriter`` which creates a CSV file containing all ``JObjects` of a certain type and tag. It can figure out how to do this thanks to ``JObject::Summarize``. You can examine the full code for ``JCsvWriter`` if you look under ``$JANA_HOME/include/JANA/JCsvWriter.h``. Be aware that ``JCsvWriter`` is very inefficient and should be used for debugging, not for production. @@ -257,7 +263,7 @@ To use ``JCsvWriter``, we merely register it with our ``JApplication``. If we ru InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); + app->Add(new RandomSource); app->Add(new JCsvWriter); // ADD ME //app->Add(new JCsvWriter("fcal")); // If we used a tag } @@ -423,7 +429,7 @@ Next, we register our ``SimpleClusterFactory`` with our JApplication. Because JA .. code-block:: console #include // ADD ME - #include "SimpleClusterFactory.h" // ADD ME + #include "SimpleClusterFactory.h" // ADD ME // ... extern "C" { @@ -432,7 +438,7 @@ Next, we register our ``SimpleClusterFactory`` with our JApplication. Because JA InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - app->Add(new RandomSource("random", app)); + app->Add(new RandomSource); app->Add(new JCsvWriter()); app->Add(new JFactoryGeneratorT); // ADD ME } @@ -469,7 +475,7 @@ In order to make this happen, we need to define a ``JEventSourceGenerator``. Thi InitJANAPlugin(app); app->Add(new QuickTutorialProcessor); - // app->Add(new RandomSource("random", app)); // REMOVE ME + // app->Add(new RandomSource); // REMOVE ME app->Add(new JEventSourceGeneratorT); // ADD ME app->Add(new JCsvWriter()); app->Add(new JFactoryGeneratorT); @@ -495,7 +501,7 @@ We fill out the definition in ``RandomSource.cc``: Note that ``JEventSourceGenerator`` puts some constraints on our ``JEventSource``. Specifically, we need to note that: -* Our ``JEventSource`` needs a two-argument constructor which accepts a string containing the resource name, and a ``JApplication pointer``. +* Our ``JEventSource`` needs a two-argument constructor which accepts a string containing the resource name, and a ``JApplication`` pointer. * Our ``JEventSource`` needs a static method ``GetDescription``, to help JANA report to the user which sources are available and which ended up being chosen. diff --git a/scripts/jana-generate.py b/scripts/jana-generate.py index 01f5e4451..e039383fa 100755 --- a/scripts/jana-generate.py +++ b/scripts/jana-generate.py @@ -191,13 +191,17 @@ class {name} : public JEventSource {{ /// Add member variables here public: + {name}(); + {name}(std::string resource_name, JApplication* app); virtual ~{name}() = default; void Open() override; - void GetEvent(std::shared_ptr) override; + void Close() override; + + Result Emit(JEvent& event) override; static std::string GetDescription(); @@ -227,10 +231,17 @@ class {name} : public JEventSource {{ /// to find the most appropriate JEventSource corresponding to that filename, instantiate and register it. /// For this to work, the JEventSource constructor has to have the following constructor arguments: +{name}::{name}() : JEventSource() {{ + SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); +}} + {name}::{name}(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) {{ SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); }} + void {name}::Open() {{ /// Open is called exactly once when processing begins. @@ -243,28 +254,38 @@ class {name} : public JEventSource {{ /// Open the file here! }} -void {name}::GetEvent(std::shared_ptr event) {{ +void {name}::Close() {{ + + /// Close is called exactly once when processing ends. This is where you should close your files or sockets. + /// It is important to do that here instead of in Emit() because we want everything to be cleanly closed + /// even when JANA is terminated via Ctrl-C or via a timeout. + +}} + +JEventSource::Result {name}::Emit(JEvent& event) {{ /// Calls to GetEvent are synchronized with each other, which means they can /// read and write state on the JEventSource without causing race conditions. /// Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(22); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(22); /// Insert whatever data was read into the event // std::vector hits; // hits.push_back(new Hit(0,0,1.0,0)); - // event->Insert(hits); + // event.Insert(hits); /// If you are reading a file of events and have reached the end, terminate the stream like this: - // // Close file pointer! - // throw RETURN_STATUS::kNO_MORE_EVENTS; + /// Note that you should close any file handles or sockets in Close(), not here! + // return Result::FailureFinished; /// If you are streaming events and there are no new events in the message queue, - /// tell JANA that GetEvent() was temporarily unsuccessful like this: - // throw RETURN_STATUS::kBUSY; + /// tell JANA that Emit() was temporarily unsuccessful like this: + // return Result::FailureTryAgain; + + return Result::Success; }} std::string {name}::GetDescription() {{ diff --git a/src/examples/Tutorial/RandomSource.cc b/src/examples/Tutorial/RandomSource.cc index 829eaa34d..70a8d4268 100644 --- a/src/examples/Tutorial/RandomSource.cc +++ b/src/examples/Tutorial/RandomSource.cc @@ -18,8 +18,14 @@ /// to find the most appropriate JEventSource corresponding to that filename, instantiate and register it. /// For this to work, the JEventSource constructor has to have the following constructor arguments: +RandomSource::RandomSource() : JEventSource() { + SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); +} + RandomSource::RandomSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); } void RandomSource::Open() { @@ -37,15 +43,19 @@ void RandomSource::Open() { /// Open the file here! } -void RandomSource::GetEvent(std::shared_ptr event) { +void RandomSource::Close() { + // Close the file pointer here! +} + +JEventSource::Result RandomSource::Emit(JEvent& event) { /// Calls to GetEvent are synchronized with each other, which means they can /// read and write state on the JEventSource without causing race conditions. /// Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(22); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(22); /// Slow down event source auto delay_ms = std::chrono::milliseconds(1000/m_max_emit_freq_hz); @@ -57,15 +67,17 @@ void RandomSource::GetEvent(std::shared_ptr event) { hits.push_back(new Hit(0, 1, 1.0, 0)); hits.push_back(new Hit(1, 0, 1.0, 0)); hits.push_back(new Hit(1, 1, 1.0, 0)); - event->Insert(hits); + event.Insert(hits); - /// If you are reading a file of events and have reached the end, terminate the stream like this: - // // Close file pointer! - // throw RETURN_STATUS::kNO_MORE_EVENTS; + /// If you are reading a file of events and have reached the end + /// Note that you should close the file handle in Close(), not here. + // return Result::FailureFinished; /// If you are streaming events and there are no new events in the message queue, /// tell JANA that GetEvent() was temporarily unsuccessful like this: - // throw RETURN_STATUS::kBUSY; + // return Result::FailureTryAgain; + + return Result::Success; } std::string RandomSource::GetDescription() { diff --git a/src/examples/Tutorial/RandomSource.h b/src/examples/Tutorial/RandomSource.h index 3884908e8..87fb80c61 100644 --- a/src/examples/Tutorial/RandomSource.h +++ b/src/examples/Tutorial/RandomSource.h @@ -14,13 +14,17 @@ class RandomSource : public JEventSource { int m_max_emit_freq_hz = 100; public: + RandomSource(); + RandomSource(std::string resource_name, JApplication* app); virtual ~RandomSource() = default; void Open() override; - void GetEvent(std::shared_ptr) override; + Result Emit(JEvent&) override; + + void Close() override; static std::string GetDescription(); diff --git a/src/plugins/JTest/JTestParser.h b/src/plugins/JTest/JTestParser.h index 81cb7f36c..284ab9669 100644 --- a/src/plugins/JTest/JTestParser.h +++ b/src/plugins/JTest/JTestParser.h @@ -30,6 +30,7 @@ class JTestParser : public JEventSource { JTestParser() { SetTypeName(NAME_OF_THIS); + SetCallbackStyle(CallbackStyle::ExpertMode); } static std::string GetDescription() { @@ -44,7 +45,7 @@ class JTestParser : public JEventSource { app->SetDefaultParameter("jtest:parser_bytes_spread", m_write_spread, "Spread of bytes written during parsing"); } - void GetEvent(std::shared_ptr event) { + Result Emit(JEvent& event) { if ((m_events_generated % 40) == 0) { // "Read" new entangled event every 40 events @@ -58,12 +59,13 @@ class JTestParser : public JEventSource { // Emit a shared pointer to the entangled event buffer auto eec = new JTestEntangledEventData; eec->buffer = m_latest_entangled_buffer; - event->Insert(eec); + event.Insert(eec); m_events_generated++; - event->SetEventNumber(m_events_generated); - event->SetRunNumber(1); + event.SetEventNumber(m_events_generated); + event.SetRunNumber(1); + return Result::Success; } }; From f2c6716a2e661bf2b4f207e8e0a9ff437ff4c339 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 27 Apr 2024 18:38:18 -0400 Subject: [PATCH 07/12] Migrate examples to new event source API --- src/examples/DstExample/DstExample.cc | 2 +- src/examples/DstExample/DstExampleSource.cc | 26 ++++++++++++++----- src/examples/DstExample/DstExampleSource.h | 6 ++++- .../EventGroupExample/GroupedEventSource.h | 14 +++++----- .../SubeventCUDAExample.cu | 12 +++++---- .../SubeventExample/SubeventExample.cc | 16 +++++++----- src/examples/TimesliceExample/MyFileReader.h | 14 ++++++---- 7 files changed, 58 insertions(+), 32 deletions(-) diff --git a/src/examples/DstExample/DstExample.cc b/src/examples/DstExample/DstExample.cc index 5514010fc..f0c59dadb 100644 --- a/src/examples/DstExample/DstExample.cc +++ b/src/examples/DstExample/DstExample.cc @@ -17,7 +17,7 @@ void InitPlugin(JApplication* app) { LOG << "Loading DstExample" << LOG_END; app->Add(new DstExampleProcessor); - app->Add(new DstExampleSource("dummy", app)); + app->Add(new DstExampleSource); app->Add(new JFactoryGeneratorT()); } } diff --git a/src/examples/DstExample/DstExampleSource.cc b/src/examples/DstExample/DstExampleSource.cc index a817df689..518c49545 100644 --- a/src/examples/DstExample/DstExampleSource.cc +++ b/src/examples/DstExample/DstExampleSource.cc @@ -9,8 +9,15 @@ #include #include "DataObjects.h" + +DstExampleSource::DstExampleSource() : JEventSource() { + SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); +} + DstExampleSource::DstExampleSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); } void DstExampleSource::Open() { @@ -25,26 +32,29 @@ void DstExampleSource::Open() { /// Open the file here! } -void DstExampleSource::GetEvent(std::shared_ptr event) { +void DstExampleSource::Close() { +} + +JEventSource::Result DstExampleSource::Emit(JEvent& event) { /// Calls to GetEvent are synchronized with each other, which means they can /// read and write state on the JEventSource without causing race conditions. /// Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(22); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(22); /// Limit ourselves to 1 event if (current_event_number > 2) { - throw RETURN_STATUS::kNO_MORE_EVENTS; + return Result::FailureFinished; } /// Insert some canned MyJObjects std::vector my_jobjects; my_jobjects.push_back(new MyJObject(7,7,1.8)); my_jobjects.push_back(new MyJObject(8,8,9.9)); - auto my_jobj_fac = event->Insert(my_jobjects); + auto my_jobj_fac = event.Insert(my_jobjects); /// Enable automatic upcast to JObject my_jobj_fac->EnableGetAs(); @@ -54,7 +64,7 @@ void DstExampleSource::GetEvent(std::shared_ptr event) { my_renderables.push_back(new MyRenderable(0,0,22.2)); my_renderables.push_back(new MyRenderable(0,1,17.0)); my_renderables.push_back(new MyRenderable(1,0,21.9)); - auto my_ren_fac = event->Insert(my_renderables); + auto my_ren_fac = event.Insert(my_renderables); /// Enable automatic upcast to Renderable my_ren_fac->EnableGetAs(); @@ -62,11 +72,13 @@ void DstExampleSource::GetEvent(std::shared_ptr event) { /// Insert some canned MyRenderableJObjects std::vector my_renderable_jobjects; my_renderable_jobjects.push_back(new MyRenderableJObject(1,1,1.1)); - auto my_both_fac = event->Insert(my_renderable_jobjects, "from_source"); + auto my_both_fac = event.Insert(my_renderable_jobjects, "from_source"); /// Enable automatic upcast to both JObjects and Renderable my_both_fac->EnableGetAs(); my_both_fac->EnableGetAs(); + + return Result::Success; } std::string DstExampleSource::GetDescription() { diff --git a/src/examples/DstExample/DstExampleSource.h b/src/examples/DstExample/DstExampleSource.h index 59ef8dace..ef0a9226e 100644 --- a/src/examples/DstExample/DstExampleSource.h +++ b/src/examples/DstExample/DstExampleSource.h @@ -15,13 +15,17 @@ class DstExampleSource : public JEventSource { /// Add member variables here public: + DstExampleSource(); + DstExampleSource(std::string resource_name, JApplication* app); virtual ~DstExampleSource() = default; void Open() override; - void GetEvent(std::shared_ptr) override; + void Close() override; + + Result Emit(JEvent&) override; static std::string GetDescription(); diff --git a/src/examples/EventGroupExample/GroupedEventSource.h b/src/examples/EventGroupExample/GroupedEventSource.h index 34b198d48..6933ef935 100644 --- a/src/examples/EventGroupExample/GroupedEventSource.h +++ b/src/examples/EventGroupExample/GroupedEventSource.h @@ -21,25 +21,26 @@ class GroupedEventSource : public JEventSource { public: GroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) { // TODO: Get EventGroupManager from ServiceLocator instead + SetCallbackStyle(CallbackStyle::ExpertMode); m_remaining_events_in_group = 5; m_current_group_id = 0; m_current_event_number = 0; }; - void GetEvent(std::shared_ptr event) override { + Result Emit(JEvent& event) override { if (m_current_group_id == 5) { - throw RETURN_STATUS::kNO_MORE_EVENTS; + return Result::FailureFinished; } // TODO: We can hold on to the pointer instead of doing the lookup everytime auto current_group = m_egm.GetEventGroup(m_current_group_id); current_group->StartEvent(); - event->Insert(current_group); - event->GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); - event->SetEventNumber(++m_current_event_number); - event->SetRunNumber(m_current_group_id); + event.Insert(current_group); + event.GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); + event.SetEventNumber(++m_current_event_number); + event.SetRunNumber(m_current_group_id); m_remaining_events_in_group -= 1; if (m_remaining_events_in_group == 0) { @@ -48,6 +49,7 @@ class GroupedEventSource : public JEventSource { m_current_group_id += 1; } + return Result::Success; } }; diff --git a/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu b/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu index 6718e647a..a9f985389 100644 --- a/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu +++ b/src/examples/SubeventCUDAExample/SubeventCUDAExample.cu @@ -77,19 +77,21 @@ struct MyProcessor : public JSubeventProcessor { struct SimpleSource : public JEventSource { - SimpleSource(std::string name) : JEventSource(name) {}; + SimpleSource() { + SetCallbackStyle(CallbackStyle::ExpertMode); + }; - void GetEvent(std::shared_ptr event) override { - auto evt = event->GetEventNumber(); + Result Emit(JEvent& event) override { + auto evt = event.GetEventNumber(); std::vector < MyInput * > inputs; inputs.push_back(new MyInput(22, 3.6, evt, 0)); inputs.push_back(new MyInput(23, 3.5, evt, 1)); inputs.push_back(new MyInput(24, 3.4, evt, 2)); inputs.push_back(new MyInput(25, 3.3, evt, 3)); inputs.push_back(new MyInput(26, 3.2, evt, 4)); - event->Insert(inputs); + event.Insert(inputs); LOG << "Emitting event " << event->GetEventNumber() << LOG_END; - // throw JEventSource::RETURN_STATUS::kNO_MORE_EVENTS; + return Result::Success; } }; diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index 0417a83f7..4da50d513 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -39,18 +39,20 @@ struct MyProcessor : public JSubeventProcessor { struct SimpleSource : public JEventSource { - SimpleSource(std::string name) : JEventSource(name) {}; - void GetEvent(std::shared_ptr event) override { - auto evt = event->GetEventNumber(); + SimpleSource() : JEventSource() { + SetCallbackStyle(CallbackStyle::ExpertMode); + }; + Result Emit(JEvent& event) override { + auto evt = event.GetEventNumber(); std::vector inputs; inputs.push_back(new MyInput(22,3.6,evt,0)); inputs.push_back(new MyInput(23,3.5,evt,1)); inputs.push_back(new MyInput(24,3.4,evt,2)); inputs.push_back(new MyInput(25,3.3,evt,3)); inputs.push_back(new MyInput(26,3.2,evt,4)); - event->Insert(inputs); - LOG << "Emitting event " << event->GetEventNumber() << LOG_END; - // throw JEventSource::RETURN_STATUS::kNO_MORE_EVENTS; + event.Insert(inputs); + LOG << "Emitting event " << event.GetEventNumber() << LOG_END; + return Result::Success; } }; @@ -93,7 +95,7 @@ int main() { app.SetTimeoutEnabled(false); app.SetTicker(false); - auto source = new SimpleSource("simpleSource"); + auto source = new SimpleSource(); source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work // here because we aren't using JComponentManager to manage the EventSource diff --git a/src/examples/TimesliceExample/MyFileReader.h b/src/examples/TimesliceExample/MyFileReader.h index 10e5f989e..84f85c174 100644 --- a/src/examples/TimesliceExample/MyFileReader.h +++ b/src/examples/TimesliceExample/MyFileReader.h @@ -14,13 +14,16 @@ struct MyFileReader : public JEventSource { MyFileReader() { SetTypeName(NAME_OF_THIS); + SetCallbackStyle(CallbackStyle::ExpertMode); } void Open() override { } - void GetEvent(std::shared_ptr event) override { + void Close() override { } - auto event_nr = event->GetEventNumber(); + Result Emit(JEvent&) override { + + auto event_nr = event.GetEventNumber(); auto hits_out = std::make_unique(); @@ -29,7 +32,7 @@ struct MyFileReader : public JEventSource { hits_out->push_back(ExampleHit(event_nr, 0, 49, 49, 49, 1)); hits_out->push_back(ExampleHit(event_nr, 0, 7.6, 7.6, 7.6, 2)); - LOG_DEBUG(GetLogger()) << "MySource: Emitted " << GetLevel() << " " << event->GetEventNumber() << "\n" + LOG_DEBUG(GetLogger()) << "MySource: Emitted " << GetLevel() << " " << event.GetEventNumber() << "\n" << TabulateHits(hits_out.get()) << LOG_END; @@ -39,12 +42,13 @@ struct MyFileReader : public JEventSource { if (GetLevel() == JEventLevel::Timeslice) { TimesliceInfoCollection info; info.push_back(TimesliceInfo(event_nr, 0)); // event nr, run nr - event->InsertCollection(std::move(info), "ts_info"); + event.InsertCollection(std::move(info), "ts_info"); } else { EventInfoCollection info; info.push_back(EventInfo(event_nr, 0, 0)); // event nr, timeslice nr, run nr - event->InsertCollection(std::move(info), "evt_info"); + event.InsertCollection(std::move(info), "evt_info"); } + return Result::Success; } }; From 2cfa896ffdc2f8e49f5de1efbe37a086c33cec30 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 27 Apr 2024 18:43:34 -0400 Subject: [PATCH 08/12] Migrate JTestRoot to new event source API --- src/plugins/JTestRoot/JTestRoot.cc | 2 +- src/plugins/JTestRoot/JTestRootEventSource.cc | 13 ++++++++----- src/plugins/JTestRoot/JTestRootEventSource.h | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/plugins/JTestRoot/JTestRoot.cc b/src/plugins/JTestRoot/JTestRoot.cc index 9bf090f19..d60bfbfcd 100644 --- a/src/plugins/JTestRoot/JTestRoot.cc +++ b/src/plugins/JTestRoot/JTestRoot.cc @@ -19,7 +19,7 @@ void InitPlugin(JApplication* app) { InitJANAPlugin(app); LOG << "Loading JTestRoot" << LOG_END; - app->Add(new JTestRootEventSource("dummy_root_object_source", app)); + app->Add(new JTestRootEventSource); app->Add(new JTestRootProcessor); app->Add(new JFactoryGeneratorT); } diff --git a/src/plugins/JTestRoot/JTestRootEventSource.cc b/src/plugins/JTestRoot/JTestRootEventSource.cc index 15ce74c01..02760b6f5 100644 --- a/src/plugins/JTestRoot/JTestRootEventSource.cc +++ b/src/plugins/JTestRoot/JTestRootEventSource.cc @@ -15,11 +15,12 @@ #include #include -JTestRootEventSource::JTestRootEventSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { +JTestRootEventSource::JTestRootEventSource() { SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); } -void JTestRootEventSource::GetEvent(std::shared_ptr event) { +JEventSource::Result JTestRootEventSource::Emit(JEvent& event) { /// Generate an event by inserting objects into "event". /// (n.b. a normal event source would read these from a file or stream) @@ -28,8 +29,8 @@ void JTestRootEventSource::GetEvent(std::shared_ptr event) { // Configure event and run numbers static size_t current_event_number = 1; - event->SetEventNumber(current_event_number++); - event->SetRunNumber(222); + event.SetEventNumber(current_event_number++); + event.SetRunNumber(222); // Generate hit objects. We use random numbers to give some variation // and make things look a little more realistic @@ -44,5 +45,7 @@ void JTestRootEventSource::GetEvent(std::shared_ptr event) { } // Add Hit objects to event - event->Insert(hits); + event.Insert(hits); + return Result::Success; } + diff --git a/src/plugins/JTestRoot/JTestRootEventSource.h b/src/plugins/JTestRoot/JTestRootEventSource.h index f2c6ea3df..1a41f3930 100644 --- a/src/plugins/JTestRoot/JTestRootEventSource.h +++ b/src/plugins/JTestRoot/JTestRootEventSource.h @@ -15,7 +15,7 @@ class JTestRootEventSource : public JEventSource { JTestRootEventSource(std::string resource_name, JApplication* app); virtual ~JTestRootEventSource() = default; - void GetEvent(std::shared_ptr) override; + Result Emit(JEvent& event) override; protected: std::default_random_engine generator; From 3548f66a0f59d9e05697932e0aef7252586a6e1a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 27 Apr 2024 19:33:09 -0400 Subject: [PATCH 09/12] Migrate all remaining event sources not in unit_tests --- .../BlockingGroupedEventSource.h | 19 ++++++++------ .../MetadataExample/RandomTrackSource.cc | 15 ++++++----- .../MetadataExample/RandomTrackSource.h | 2 +- src/libraries/JANA/Podio/JEventSourcePodio.h | 18 +++++++------ src/libraries/JANA/Streaming/JDiscreteJoin.h | 16 ++++++------ src/libraries/JANA/Streaming/JEventBuilder.h | 16 ++++++------ .../JANA/Streaming/JStreamingEventSource.h | 16 ++++++------ src/plugins/streamDet/DecodeDASSource.cc | 25 +++++++++---------- src/plugins/streamDet/DecodeDASSource.h | 5 ++-- 9 files changed, 71 insertions(+), 61 deletions(-) diff --git a/src/examples/EventGroupExample/BlockingGroupedEventSource.h b/src/examples/EventGroupExample/BlockingGroupedEventSource.h index 44f7ab28c..917877950 100644 --- a/src/examples/EventGroupExample/BlockingGroupedEventSource.h +++ b/src/examples/EventGroupExample/BlockingGroupedEventSource.h @@ -26,6 +26,7 @@ class BlockingGroupedEventSource : public JEventSource { BlockingGroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) { // TODO: Get EventGroupManager from ServiceLocator instead + SetCallbackStyle(CallbackStyle::ExpertMode); m_pending_group_id = 1; }; @@ -49,13 +50,13 @@ class BlockingGroupedEventSource : public JEventSource { /// GetEvent polls the queue of submitted TridasEvents and feeds them into JEvents along with a /// JEventGroup. A downstream EventProcessor may report the event as being finished. Once all /// events in the eventgroup are finished, the corresponding call to SubmitAndWait will unblock. - void GetEvent(std::shared_ptr event) override { + Result Emit(JEvent& event) override { std::pair next_event; { std::lock_guard lock(m_pending_mutex); if (m_pending_events.empty()) { - throw RETURN_STATUS::kTRY_AGAIN; + return Result::FailureTryAgain; } else { next_event = m_pending_events.front(); @@ -64,16 +65,18 @@ class BlockingGroupedEventSource : public JEventSource { } // Hydrate JEvent with both the TridasEvent and the group pointer. - event->Insert(next_event.first); // TridasEvent - event->Insert(next_event.second); // JEventGroup + event.Insert(next_event.first); // TridasEvent + event.Insert(next_event.second); // JEventGroup // Tell JANA not to assume ownership of these objects! - event->GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); - event->GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); + event.GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); + event.GetFactory()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER); // JANA always needs an event number and a run number, so extract these from the Tridas data somehow - event->SetEventNumber(next_event.first->event_number); - event->SetRunNumber(next_event.first->run_number); + event.SetEventNumber(next_event.first->event_number); + event.SetRunNumber(next_event.first->run_number); + + return Result::Success; } }; diff --git a/src/examples/MetadataExample/RandomTrackSource.cc b/src/examples/MetadataExample/RandomTrackSource.cc index caf2ab2a3..4f2c57183 100644 --- a/src/examples/MetadataExample/RandomTrackSource.cc +++ b/src/examples/MetadataExample/RandomTrackSource.cc @@ -20,6 +20,7 @@ RandomTrackSource::RandomTrackSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { SetTypeName(NAME_OF_THIS); // Provide JANA with class name + SetCallbackStyle(CallbackStyle::ExpertMode); } void RandomTrackSource::Open() { @@ -39,7 +40,7 @@ void RandomTrackSource::Open() { m_max_run_number = 10; } -void RandomTrackSource::GetEvent(std::shared_ptr event) { +RandomTrackSource::Result RandomTrackSource::Emit(JEvent& event) { /// Calls to GetEvent are synchronized with each other, which means they can /// read and write state on the JEventSource without causing race conditions. @@ -51,22 +52,24 @@ void RandomTrackSource::GetEvent(std::shared_ptr event) { } if (m_current_run_number > m_max_run_number) { - throw RETURN_STATUS::kNO_MORE_EVENTS; + return Result::FailureFinished; } - event->SetEventNumber(m_current_event_number); - event->SetRunNumber(m_current_run_number); + event.SetEventNumber(m_current_event_number); + event.SetRunNumber(m_current_run_number); /// Insert whatever data was read into the event std::vector tracks; tracks.push_back(new Track(0,0,0,0,0,0,0,0)); - event->Insert(tracks, "generated"); + event.Insert(tracks, "generated"); // Insert metadata corresponding to these tracks // TODO: Overload event->Insert() to make this look nicer JMetadata metadata; metadata.elapsed_time_ns = std::chrono::nanoseconds {5}; - event->GetFactory("generated")->SetMetadata(metadata); + event.GetFactory("generated")->SetMetadata(metadata); + + return Result::Success; } std::string RandomTrackSource::GetDescription() { diff --git a/src/examples/MetadataExample/RandomTrackSource.h b/src/examples/MetadataExample/RandomTrackSource.h index 087cc2b13..ab4bf5401 100644 --- a/src/examples/MetadataExample/RandomTrackSource.h +++ b/src/examples/MetadataExample/RandomTrackSource.h @@ -23,7 +23,7 @@ class RandomTrackSource : public JEventSource { void Open() override; - void GetEvent(std::shared_ptr) override; + Result Emit(JEvent&) override; static std::string GetDescription(); diff --git a/src/libraries/JANA/Podio/JEventSourcePodio.h b/src/libraries/JANA/Podio/JEventSourcePodio.h index 3cc3498d0..6da37471a 100644 --- a/src/libraries/JANA/Podio/JEventSourcePodio.h +++ b/src/libraries/JANA/Podio/JEventSourcePodio.h @@ -25,7 +25,7 @@ class JEventSourcePodio : public JEventSource { /// the PODIO frame is part of the contract between JANA and PODIO. We /// don't prevent users from accessing the PODIO frame directly, but /// strongly discourage them from using this for anything other than debugging. - void GetEvent(std::shared_ptr) final; + Result Emit(JEvent&) final; /// User overrides NextFrame so that they can populate the frame however @@ -70,27 +70,29 @@ struct InsertingVisitor { template