From 44dc0408671c0032a7cbab3566f79977f10e4ae0 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 25 Sep 2024 15:42:52 -0400 Subject: [PATCH 1/4] Remove dead code: JEventProcessor::AreEventsOrdered --- src/libraries/JANA/JEventProcessor.h | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index 46b0c35a7..34526a951 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -30,8 +30,6 @@ class JEventProcessor : public jana::components::JComponent, uint64_t GetEventCount() const { return m_event_count; }; - bool AreEventsOrdered() const { return m_receive_events_in_order; } - virtual void DoInitialize() { for (auto* parameter : m_parameters) { @@ -217,18 +215,10 @@ class JEventProcessor : public jana::components::JComponent, // void SetResourceName(std::string resource_name) { m_resource_name = std::move(resource_name); } - /// SetEventsOrdered allows the user to tell the parallelization engine that it needs to see - /// the event stream ordered by increasing event IDs. (Note that this requires all EventSources - /// emit event IDs which are consecutive.) Ordering by event ID makes for cleaner output, but comes - /// with a performance penalty, so it is best if this is enabled during debugging, and disabled otherwise. - - // void SetEventsOrdered(bool receive_events_in_order) { m_receive_events_in_order = receive_events_in_order; } - private: std::string m_resource_name; std::atomic_ullong m_event_count {0}; - bool m_receive_events_in_order = false; }; From 2cae212d58c25a5893c9e09d32f9b0202cda556a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 25 Sep 2024 16:13:34 -0400 Subject: [PATCH 2/4] Protect JEventProcessor::{Begin,Change,End}Run() callbacks This fixes a bug where some threads ran Process() before BeginRun() completed. --- src/libraries/JANA/JEventProcessor.h | 36 ++++++++++++++++------------ 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index 34526a951..6f2481504 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -8,6 +8,7 @@ #include #include #include +#include class JApplication; @@ -32,6 +33,7 @@ class JEventProcessor : public jana::components::JComponent, virtual void DoInitialize() { + std::lock_guard lock(m_mutex); for (auto* parameter : m_parameters) { parameter->Configure(*(m_app->GetJParameterManager()), m_prefix); } @@ -107,8 +109,12 @@ class JEventProcessor : public jana::components::JComponent, virtual void DoLegacyProcess(const std::shared_ptr& event) { - // DoLegacyProcess doesn't hold any locks, as it requires the user to hold a lock for it. - // Because of this, + // DoLegacyProcess holds a lock to make sure that {Begin,Change,End}Run() are always called before Process(). + // Note that in LegacyMode, Process() requires the user to manage a _separate_ lock for its critical section. + // This arrangement means that {Begin,Change,End}Run() will definitely be called at least once before `Process`, but there + // may be races when there are multiple run numbers present in the stream. This isn't a problem in practice for now, + // but future work should use ExpertMode or DeclarativeMode for this reason (but also for the usability improvements!) + if (m_callback_style != CallbackStyle::LegacyMode) { throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor"); } @@ -121,15 +127,19 @@ class JEventProcessor : public jana::components::JComponent, else if (m_status == Status::Finalized) { throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); } - if (m_last_run_number != run_number) { - if (m_last_run_number != -1) { - CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); + { + // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun(). + std::lock_guard lock(m_mutex); + if (m_last_run_number != run_number) { + if (m_last_run_number != -1) { + CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); + } + for (auto* resource : m_resources) { + resource->ChangeRun(event->GetRunNumber(), m_app); + } + m_last_run_number = run_number; + CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); }); } - for (auto* resource : m_resources) { - resource->ChangeRun(event->GetRunNumber(), m_app); - } - m_last_run_number = run_number; - CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); }); } CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); }); m_event_count += 1; @@ -168,26 +178,22 @@ class JEventProcessor : public jana::components::JComponent, // LegacyMode-specific callbacks virtual void Process(const std::shared_ptr& /*event*/) { - throw JException("Not implemented yet!"); } - + // ExpertMode-specific callbacks virtual void ProcessParallel(const JEvent& /*event*/) { } virtual void Process(const JEvent& /*event*/) { - throw JException("Not implemented yet!"); } // DeclarativeMode-specific callbacks virtual void ProcessParallel(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) { - throw JException("Not implemented yet!"); } virtual void Process(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) { - throw JException("Not implemented yet!"); } From d11a3f740022c6c8fa29dba026c27aeab814125d Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 25 Sep 2024 16:17:50 -0400 Subject: [PATCH 3/4] Deprecate JEventProcessor::GetType() --- src/libraries/JANA/JEventProcessor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index 6f2481504..6d0429060 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -200,7 +200,7 @@ class JEventProcessor : public jana::components::JComponent, virtual void Finish() {} - // TODO: Deprecate + [[deprecated]] virtual std::string GetType() const { return m_type_name; } From 0fdc68f5a021be4a92466e6823251a9d412fc4e1 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 25 Sep 2024 16:26:01 -0400 Subject: [PATCH 4/4] Update JHasOutputs::Output to support NOT_OBJECT_OWNER flag --- src/libraries/JANA/Components/JHasOutputs.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/libraries/JANA/Components/JHasOutputs.h b/src/libraries/JANA/Components/JHasOutputs.h index 461eb549a..4f84d72a1 100644 --- a/src/libraries/JANA/Components/JHasOutputs.h +++ b/src/libraries/JANA/Components/JHasOutputs.h @@ -31,6 +31,7 @@ struct JHasOutputs { template class Output : public OutputBase { std::vector m_data; + bool is_not_owner = false; public: Output(JHasOutputs* owner, std::string default_tag_name="") { @@ -43,9 +44,12 @@ struct JHasOutputs { protected: void InsertCollection(JEvent& event) override { - event.Insert(m_data, this->collection_names[0]); + auto fac = event.Insert(m_data, this->collection_names[0]); + fac->SetIsNotOwnerFlag(is_not_owner); } void Reset() override { } + + void SetIsNotOwnerFlag(bool not_owner=true) { is_not_owner = not_owner; } };