Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another round of fixes for GlueX #387

Merged
merged 11 commits into from
Nov 27, 2024
22 changes: 12 additions & 10 deletions .github/halld_recon_build.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#!/bin/bash

echo "mounting cvmfs"
echo "Mounting CVMFS"
yum -y install fuse
chmod 666 /dev/fuse
mkdir -p /cvmfs/oasis.opensciencegrid.org
mount -t cvmfs oasis.opensciencegrid.org /cvmfs/oasis.opensciencegrid.org

cd /workspace/halld_recon
ln -s /cvmfs/oasis.opensciencegrid.org/gluex/group /group
export BUILD_SCRIPTS=/group/halld/Software/build_scripts
source $BUILD_SCRIPTS/gluex_env_boot_jlab.sh --bs $BUILD_SCRIPTS

export BMS_OSNAME_OVERRIDE="Linux_Alma9-x86_64-gcc11.4.1-cntr"
# The BMS_OSNAME override is needed because Alma9 retroactively switched its system gcc
# from 11.4.1 to 11.5. We cannot create a new Alma9 container that uses gcc11.4.1, but
# the CVMFS artifact repository hasn't been updated to reflect that.

source /group/halld/Software/build_scripts/gluex_env_boot_jlab.sh
gxenv /workspace/JANA2/.github/halld_recon_build_prereqs_version.xml

echo "rootsys"
echo $ROOTSYS
chmod +x $JANA_HOME/bin/*
cd src
scons install -j12
echo "ROOTSYS=$ROOTSYS"
cd /workspace/halld_recon/src
scons install -j12 DEBUG=1 OPTIMIZATION=0 SHOWBUILD=1

105 changes: 74 additions & 31 deletions src/libraries/JANA/Engine/JExecutionEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,23 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
// We only pick up a new task if the topology is running or draining.

for (size_t arrow_id=0; arrow_id<m_arrow_states.size(); ++arrow_id) {
// Each call to FindNextReadyTask_Unsafe() starts with a different m_next_arrow_id to ensure balanced arrow assignments
size_t arrow_count = m_arrow_states.size();
m_next_arrow_id += 1;
m_next_arrow_id %= arrow_count;

for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
size_t arrow_id = i % arrow_count;

auto& state = m_arrow_states[arrow_id];
if (!state.is_parallel && (state.active_tasks != 0)) {
// We've found a sequential arrow that is already running. Nothing we can do here.
// We've found a sequential arrow that is already active. Nothing we can do here.
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
continue;
}

if (state.status != ArrowState::Status::Running) {
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
continue;
}
// TODO: Support next_visit_time so that we don't hammer blocked event sources
Expand All @@ -530,6 +538,7 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
auto port = arrow->get_next_port_index();
JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
if (event != nullptr || port == -1) {
LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
// We've found a task that is ready!
state.active_tasks += 1;

Expand All @@ -550,40 +559,46 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
}
return;
}
else {
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
}
}
}

// Because we reached this point, we know that there aren't any tasks ready,
// so we check whether more are potentially coming. If not, we can pause the topology.
// Note that our worker threads will still wait at ExchangeTask() until they get
// shut down separately during Scale().

bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
// We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
// This also leaves a cleaner narrative in the logs.

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
auto* arrow = m_topology->arrows[arrow_id];
LOG_TRACE(GetLogger()) << "Scheduler: arrow=" << arrow->get_name() << ", is_source=" << state.is_source << ", active_tasks=" << state.active_tasks << ", is_parallel=" << state.is_parallel << LOG_END;
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
}
}
LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}
LOG_DEBUG(GetLogger()) << "Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}

worker.last_arrow_id = -1;
Expand Down Expand Up @@ -668,13 +683,21 @@ bool JExecutionEngine::IsTimeoutEnabled() const {
JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {

std::unique_lock<std::mutex> lock(m_mutex);
if (arrow_id >= m_topology->arrows.size()) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
return JArrow::FireResult::NotRunYet;
}
JArrow* arrow = m_topology->arrows[arrow_id];
LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
<< ", index=" << arrow_id << ", location=" << location_id << LOG_END;

ArrowState& arrow_state = m_arrow_states[arrow_id];
if (arrow_state.status == ArrowState::Status::Finished) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
return JArrow::FireResult::Finished;
}
if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
return JArrow::FireResult::NotRunYet;
}
arrow_state.active_tasks += 1;
Expand All @@ -683,21 +706,40 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
JEvent* event = nullptr;
if (port != -1) {
event = arrow->pull(port, location_id);
if (event == nullptr) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
arrow_state.active_tasks -= 1;
return JArrow::FireResult::NotRunYet;
}
else {
LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
}
}
else {
LOG_WARN(GetLogger()) << "No input events" << LOG_END;
}
lock.unlock();

size_t output_count;
JArrow::OutputData outputs;
JArrow::FireResult result = JArrow::FireResult::NotRunYet;

if (event != nullptr || port == -1) {
arrow->fire(event, outputs, output_count, result);
lock.lock();
arrow->push(outputs, output_count, location_id);
arrow_state.active_tasks -= 1;
lock.unlock();
LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
arrow->fire(event, outputs, output_count, result);
LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
if (output_count == 0) {
LOG_WARN(GetLogger()) << "No output events" << LOG_END;
}
else {
for (size_t i=0; i<output_count; ++i) {
LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
}
}

lock.lock();
arrow->push(outputs, output_count, location_id);
arrow_state.active_tasks -= 1;
lock.unlock();
return result;
}

Expand Down Expand Up @@ -778,6 +820,7 @@ std::string ToString(JExecutionEngine::RunStatus runstatus) {
case JExecutionEngine::RunStatus::Pausing: return "Pausing";
case JExecutionEngine::RunStatus::Draining: return "Draining";
case JExecutionEngine::RunStatus::Finished: return "Finished";
default: return "CorruptedRunStatus";
}
}

1 change: 1 addition & 0 deletions src/libraries/JANA/Engine/JExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class JExecutionEngine : public JService {
std::atomic<InterruptStatus> m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised };
std::atomic_bool m_print_worker_report_requested {false};
std::atomic_bool m_send_worker_report_requested {false};
size_t m_next_arrow_id=0;

// Metrics
size_t m_event_count_at_start = 0;
Expand Down
13 changes: 8 additions & 5 deletions src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_

// First check to see if we need to handle a barrier event before attempting to emit another event
if (m_barrier_active) {

auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount();

// A barrier event has been emitted by the source.
if (m_pending_barrier_event != nullptr) {

// This barrier event is pending until the topology drains
if (m_sources[m_current_source]->GetEmittedEventCount() -
m_sources[m_current_source]->GetFinishedEventCount() == 1) {
if ((emitted_event_count - finished_event_count) == 1) {
LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;

// Topology has drained; only remaining in-flight event is the barrier event itself,
Expand All @@ -44,7 +48,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_
}
else {
// Topology has _not_ finished draining, all we can do is wait
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END;
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;

assert(event == nullptr);
Expand All @@ -56,8 +60,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_
else {
// This barrier event has already been sent into the topology and we need to wait
// until it is finished before emitting any more events
if (m_sources[m_current_source]->GetFinishedEventCount() ==
m_sources[m_current_source]->GetEmittedEventCount()) {
if (finished_event_count == emitted_event_count) {

// Barrier event has finished.
LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
Expand Down
3 changes: 1 addition & 2 deletions src/libraries/JANA/Utils/JApplicationInspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ void InspectTopology(JApplication* app) {

void Fire(JApplication* app, int arrow_id) {
auto engine = app->GetService<JExecutionEngine>();
auto result = engine->Fire(arrow_id, 0);
std::cout << to_string(result);
engine->Fire(arrow_id, 0);
}

void InspectComponents(JApplication* app) {
Expand Down
28 changes: 27 additions & 1 deletion src/programs/unit_tests/Components/BarrierEventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "JANA/Utils/JBenchUtils.h"
#include "catch.hpp"

int global_resource = 0;
size_t global_resource = 0;


struct BarrierSource : public JEventSource {
Expand Down Expand Up @@ -93,6 +93,32 @@ struct BarrierProcessor : public JEventProcessor {
};


TEST_CASE("BarrierEventTests_SingleThread") {
global_resource = 0;
JApplication app;
app.Add(new BarrierProcessor);
app.Add(new BarrierSource);
app.SetParameterValue("nthreads", 1);
app.SetParameterValue("jana:nevents", 40);
//app.SetParameterValue("jana:log:show_threadstamp", true);
//app.SetParameterValue("jana:loglevel", "debug");
app.Run(true);
};


TEST_CASE("BarrierEventTests_Legacy_SingleThread") {
global_resource = 0;
JApplication app;
app.Add(new LegacyBarrierProcessor);
app.Add(new BarrierSource);
app.SetParameterValue("nthreads", 1);
app.SetParameterValue("jana:nevents", 40);
//app.SetParameterValue("jana:log:show_threadstamp", true);
//app.SetParameterValue("jana:loglevel", "debug");
app.Run(true);
};


TEST_CASE("BarrierEventTests") {
global_resource = 0;
JApplication app;
Expand Down
42 changes: 41 additions & 1 deletion src/programs/unit_tests/Components/JFactoryTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


#include "JANA/Components/JComponentFwd.h"
#include "JANA/JApplicationFwd.h"
#include "JANA/JFactory.h"
#include "catch.hpp"
#include "JFactoryTests.h"
Expand Down Expand Up @@ -566,4 +567,43 @@ TEST_CASE("JFactory_ExceptionHandling") {
}
}


TEST_CASE("JFactory_GetObjects_Caching") {
JApplication app;
app.Add(new JFactoryGeneratorT<JFactoryT<JFactoryTestDummyObject>>());
app.Add(new JFactoryGeneratorT<JFactoryT<DifferentDummyObject>>());
auto source = new JFactoryTestDummySource;
auto event = std::make_shared<JEvent>(&app);
event->SetJEventSource(source);

SECTION("RepeatedGetObjectsAreCached") {
auto dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);

dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);
}

SECTION("DifferentGetObjectsAreNotCached") {
auto dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 0);

auto different = event->Get<DifferentDummyObject>();
REQUIRE(different.at(0)->E == 123.0);
REQUIRE(source->get_objects_count == 2);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 1);

different = event->Get<DifferentDummyObject>();
REQUIRE(different.at(0)->E == 123.0);
REQUIRE(source->get_objects_count == 2);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 1);
}
}
Loading