Skip to content

Commit

Permalink
Test cases pass again
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Oct 28, 2023
1 parent d1f9517 commit 3175966
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 26 deletions.
11 changes: 4 additions & 7 deletions src/libraries/JANA/Engine/JScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l
if (found_inactive_source || found_inactive_stage_or_sink) {

// Deactivate arrow
as.status = ArrowStatus::Inactive;
// Because we are deactivating it from Active state, we know the topology has not been paused. Hence we can finalize immediately.
assignment->finalize();
as.status = ArrowStatus::Finalized;
m_topology_state.active_or_draining_arrow_count--;

LOG_DEBUG(logger) << "Deactivated arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;
Expand All @@ -121,11 +123,6 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l
LOG_DEBUG(logger) << "Draining arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;
}

if (found_inactive_source) {
// Sources are finalized immediately, unlike other arrows. This is to facilitate closing resources like sockets and file handles
assignment->finalize();
}

// Test if this was the last arrow running
if (m_topology_state.active_or_draining_arrow_count == 0) {
LOG_DEBUG(logger) << "All arrows are inactive. Deactivating topology." << LOG_END;
Expand Down Expand Up @@ -312,7 +309,7 @@ void JScheduler::pause_arrow_unprotected(size_t index) {
return; // pause() is a no-op unless running
}
LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => Inactive" << LOG_END;
m_topology_state.active_or_draining_arrow_count++;
m_topology_state.active_or_draining_arrow_count--;
for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) {
m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--;
}
Expand Down
23 changes: 13 additions & 10 deletions src/programs/tests/ArrowActivationTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,31 @@ TEST_CASE("ActivableDeactivationTests") {
source.logger = logger;

JScheduler scheduler(topology);
scheduler.logger = logger;
JScheduler::TopologyState state = scheduler.get_topology_state();

REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized);
REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized);

scheduler.run_topology(1);
state = scheduler.get_topology_state();

REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active);

auto result = steppe(emit_rand_ints);

steppe(emit_rand_ints);
scheduler.next_assignment(0, emit_rand_ints, result);
state = scheduler.get_topology_state();

REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active);
REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active);

// TODO: Test that finalize was called exactly once
} // TEST_CASE
Expand Down
12 changes: 4 additions & 8 deletions src/programs/tests/SchedulerTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TEST_CASE("SchedulerTests") {
emit_rand_ints->set_chunksize(1);

JScheduler scheduler(topology);
// scheduler.logger = JLogger(JLogger::Level::DEBUG);
scheduler.run_topology(1);

JArrow* assignment;
Expand All @@ -49,9 +50,6 @@ TEST_CASE("SchedulerTests") {

SECTION("When run sequentially, RRS returns nullptr => topology finished") {

auto logger = JLogger(JLogger::Level::OFF);


last_result = JArrowMetrics::Status::ComeBackLater;
assignment = nullptr;
do {
Expand All @@ -65,15 +63,13 @@ TEST_CASE("SchedulerTests") {

JScheduler::TopologyState state = scheduler.get_topology_state();
REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Inactive);
REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Inactive);
REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Inactive);
REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Finalized);
REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Finalized);
REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Finalized);
}

SECTION("When run sequentially, topology finished => RRS returns nullptr") {

auto logger = JLogger(JLogger::Level::OFF);
JScheduler scheduler(topology);
last_result = JArrowMetrics::Status::ComeBackLater;
assignment = nullptr;

Expand Down
7 changes: 6 additions & 1 deletion src/programs/tests/TopologyTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,17 @@ TEST_CASE("JTopology: Basic functionality") {

scheduler.run_topology(1);
auto ts = scheduler.get_topology_state();
JArrowMetrics::Status status;

REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Active);
REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Active);
REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Active);
REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active);

for (int i = 0; i < 20; ++i) {
step(emit_rand_ints);
status = step(emit_rand_ints);
}
scheduler.next_assignment(0, emit_rand_ints, status);
ts = scheduler.get_topology_state();

REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
Expand All @@ -213,6 +215,7 @@ TEST_CASE("JTopology: Basic functionality") {
for (int i = 0; i < 20; ++i) {
step(multiply_by_two);
}
scheduler.next_assignment(0, multiply_by_two, status);
ts = scheduler.get_topology_state();

REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
Expand All @@ -223,6 +226,7 @@ TEST_CASE("JTopology: Basic functionality") {
for (int i = 0; i < 20; ++i) {
step(subtract_one);
}
scheduler.next_assignment(0, subtract_one, status);
ts = scheduler.get_topology_state();

REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
Expand All @@ -233,6 +237,7 @@ TEST_CASE("JTopology: Basic functionality") {
for (int i = 0; i < 20; ++i) {
step(sum_everything);
}
scheduler.next_assignment(0, sum_everything, status);
ts = scheduler.get_topology_state();

REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);
Expand Down

0 comments on commit 3175966

Please sign in to comment.