From 317596652b680c2a61d1d6a920a85b3ef044b831 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 28 Oct 2023 18:50:22 -0400 Subject: [PATCH] Test cases pass again --- src/libraries/JANA/Engine/JScheduler.cc | 11 ++++------- src/programs/tests/ArrowActivationTests.cc | 23 ++++++++++++---------- src/programs/tests/SchedulerTests.cc | 12 ++++------- src/programs/tests/TopologyTests.cc | 7 ++++++- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index 13ed2810f..b0b503c07 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -106,7 +106,9 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l if (found_inactive_source || found_inactive_stage_or_sink) { // Deactivate arrow - as.status = ArrowStatus::Inactive; + // Because we are deactivating it from Active state, we know the topology has not been paused. Hence we can finalize immediately. + assignment->finalize(); + as.status = ArrowStatus::Finalized; m_topology_state.active_or_draining_arrow_count--; LOG_DEBUG(logger) << "Deactivated arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; @@ -121,11 +123,6 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l LOG_DEBUG(logger) << "Draining arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; } - if (found_inactive_source) { - // Sources are finalized immediately, unlike other arrows. This is to facilitate closing resources like sockets and file handles - assignment->finalize(); - } - // Test if this was the last arrow running if (m_topology_state.active_or_draining_arrow_count == 0) { LOG_DEBUG(logger) << "All arrows are inactive. Deactivating topology." << LOG_END; @@ -312,7 +309,7 @@ void JScheduler::pause_arrow_unprotected(size_t index) { return; // pause() is a no-op unless running } LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => Inactive" << LOG_END; - m_topology_state.active_or_draining_arrow_count++; + m_topology_state.active_or_draining_arrow_count--; for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--; } diff --git a/src/programs/tests/ArrowActivationTests.cc b/src/programs/tests/ArrowActivationTests.cc index 0fe2b2dd3..70a6aa584 100644 --- a/src/programs/tests/ArrowActivationTests.cc +++ b/src/programs/tests/ArrowActivationTests.cc @@ -131,28 +131,31 @@ TEST_CASE("ActivableDeactivationTests") { source.logger = logger; JScheduler scheduler(topology); + scheduler.logger = logger; JScheduler::TopologyState state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); scheduler.run_topology(1); state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); + + auto result = steppe(emit_rand_ints); - steppe(emit_rand_ints); + scheduler.next_assignment(0, emit_rand_ints, result); state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); // TODO: Test that finalize was called exactly once } // TEST_CASE diff --git a/src/programs/tests/SchedulerTests.cc b/src/programs/tests/SchedulerTests.cc index 85e8bd34b..03abff2eb 100644 --- a/src/programs/tests/SchedulerTests.cc +++ b/src/programs/tests/SchedulerTests.cc @@ -41,6 +41,7 @@ TEST_CASE("SchedulerTests") { emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); + // scheduler.logger = JLogger(JLogger::Level::DEBUG); scheduler.run_topology(1); JArrow* assignment; @@ -49,9 +50,6 @@ TEST_CASE("SchedulerTests") { SECTION("When run sequentially, RRS returns nullptr => topology finished") { - auto logger = JLogger(JLogger::Level::OFF); - - last_result = JArrowMetrics::Status::ComeBackLater; assignment = nullptr; do { @@ -65,15 +63,13 @@ TEST_CASE("SchedulerTests") { JScheduler::TopologyState state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Inactive); - REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Inactive); - REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Inactive); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Finalized); } SECTION("When run sequentially, topology finished => RRS returns nullptr") { - auto logger = JLogger(JLogger::Level::OFF); - JScheduler scheduler(topology); last_result = JArrowMetrics::Status::ComeBackLater; assignment = nullptr; diff --git a/src/programs/tests/TopologyTests.cc b/src/programs/tests/TopologyTests.cc index cbe26e2eb..654750448 100644 --- a/src/programs/tests/TopologyTests.cc +++ b/src/programs/tests/TopologyTests.cc @@ -194,6 +194,7 @@ TEST_CASE("JTopology: Basic functionality") { scheduler.run_topology(1); auto ts = scheduler.get_topology_state(); + JArrowMetrics::Status status; REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Active); REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Active); @@ -201,8 +202,9 @@ TEST_CASE("JTopology: Basic functionality") { REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { - step(emit_rand_ints); + status = step(emit_rand_ints); } + scheduler.next_assignment(0, emit_rand_ints, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -213,6 +215,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(multiply_by_two); } + scheduler.next_assignment(0, multiply_by_two, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -223,6 +226,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(subtract_one); } + scheduler.next_assignment(0, subtract_one, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -233,6 +237,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(sum_everything); } + scheduler.next_assignment(0, sum_everything, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);