Skip to content

Commit

Permalink
Fix JMergeArrow
Browse files Browse the repository at this point in the history
This fixes a new problem with reservations, but also an old problem with a worker getting stuck doing no-op merges
  • Loading branch information
nathanwbrei committed Dec 28, 2023
1 parent 3b77b2d commit 0edfcf8
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/libraries/JANA/Engine/JSubeventArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,14 @@ void JMergeArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t locatio

std::vector<std::shared_ptr<JEvent>*> outputs;
for (const auto& input : inputs) {
LOG_TRACE(m_logger) << "JMergeArrow: Processing input with parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << ", sub=" << input.id << " and total=" << input.total << LOG_END;
// Problem: Are we sure we are updating the event in a way which is effectively thread-safe?
// Should we be doing this insert, or should the caller?
(*(input.parent))->template Insert<OutputT>(input.data);
if (input.total == 1) {
// Goes straight into "ready"
outputs.push_back(input.parent);
LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
}
else {
auto pair = m_in_progress.find(input.parent);
Expand All @@ -237,25 +239,24 @@ void JMergeArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t locatio
else if (pair->second == 1) {
pair->second -= 1;
outputs.push_back(input.parent);
LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
}
else {
pair->second -= 1;
}
}
}
}
LOG_DEBUG(m_logger) << "MergeArrow consumed " << inputs.size() << " subevents, produced " << outputs.size() << " events" << LOG_END;
auto end_latency_time = std::chrono::steady_clock::now();
auto out_status = OutQueue::Status::Ready;

size_t outputs_size = outputs.size();
if (outputs_size > 0) {
assert(m_outbox != nullptr);
out_status = m_outbox->push(outputs, downstream_accepts, location_id);
}
auto outputs_size = outputs.size();
auto out_status = m_outbox->push(outputs, downstream_accepts, location_id);

auto end_queue_time = std::chrono::steady_clock::now();

JArrowMetrics::Status status;
if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready) {
if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready && inputs.size() > 0) {
status = JArrowMetrics::Status::KeepGoing;
}
else {
Expand Down

0 comments on commit 0edfcf8

Please sign in to comment.