Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization: Remove constraints on arrow firing #378

Merged
merged 22 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions docs/howto/other-howtos.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,46 +91,44 @@ The `JTest` plugin lets you test JANA's performance for different workloads. It

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
jtest:parser:cputime_ms | int | 0 | Time spent during parsing
jtest:parser:cputime_spread | int | 0.25 | Spread of time spent during parsing
jtest:parser:bytes | int | 2000000 | Bytes written during parsing
jtest:parser:bytes_spread | double | 0.25 | Spread of bytes written during parsing
jtest:disentangler:cputime_ms | int | 20 | Time spent during disentangling
jtest:disentangler:cputime_spread | double | 0.25 | Spread of time spent during disentangling
jtest:disentangler:bytes | int | 500000 | Bytes written during disentangling
jtest:disentangler:bytes_spread | double | 0.25 | Spread of bytes written during disentangling
jtest:tracker:cputime_ms | int | 200 | Time spent during tracking
jtest:tracker:cputime_spread | double | 0.25 | Spread of time spent during tracking
jtest:tracker:bytes | int | 1000 | Bytes written during tracking
jtest:tracker:bytes_spread | double | 0.25 | Spread of bytes written during tracking
jtest:plotter:cputime_ms | int | 0 | Time spent during plotting
jtest:plotter:cputime_spread | double | 0.25 | Spread of time spent during plotting
jtest:plotter:bytes | int | 1000 | Bytes written during plotting
jtest:plotter:bytes_spread | double | 0.25 | Spread of bytes written during plotting
| jtest:parser:cputime_ms | int | 0 | Time spent during parsing |
| jtest:parser:cputime_spread | int | 0.25 | Spread of time spent during parsing |
| jtest:parser:bytes | int | 2000000 | Bytes written during parsing |
| jtest:parser:bytes_spread | double | 0.25 | Spread of bytes written during parsing |
| jtest:disentangler:cputime_ms | int | 20 | Time spent during disentangling |
| jtest:disentangler:cputime_spread | double | 0.25 | Spread of time spent during disentangling |
| jtest:disentangler:bytes | int | 500000 | Bytes written during disentangling |
| jtest:disentangler:bytes_spread | double | 0.25 | Spread of bytes written during disentangling |
| jtest:tracker:cputime_ms | int | 200 | Time spent during tracking |
| jtest:tracker:cputime_spread | double | 0.25 | Spread of time spent during tracking |
| jtest:tracker:bytes | int | 1000 | Bytes written during tracking |
| jtest:tracker:bytes_spread | double | 0.25 | Spread of bytes written during tracking |
| jtest:plotter:cputime_ms | int | 0 | Time spent during plotting |
| jtest:plotter:cputime_spread | double | 0.25 | Spread of time spent during plotting |
| jtest:plotter:bytes | int | 1000 | Bytes written during plotting |
| jtest:plotter:bytes_spread | double | 0.25 | Spread of bytes written during plotting |



The following parameters are used for benchmarking:

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
benchmark:nsamples | int | 15 | Number of measurements made for each thread count
benchmark:minthreads | int | 1 | Minimum thread count
benchmark:maxthreads | int | ncores | Maximum thread count
benchmark:threadstep | int | 1 | Thread count increment
benchmark:resultsdir | string | JANA_Test_Results | Directory name for benchmark test results
| benchmark:nsamples | int | 15 | Number of measurements made for each thread count |
| benchmark:minthreads | int | 1 | Minimum thread count |
| benchmark:maxthreads | int | ncores | Maximum thread count |
| benchmark:threadstep | int | 1 | Thread count increment |
| benchmark:resultsdir | string | JANA_Test_Results | Directory name for benchmark test results |


The following parameters are more advanced, but may come in handy when doing performance tuning:

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
jana:event_pool_size | int | nthreads | The number of events which may be in-flight at once
jana:limit_total_events_in_flight | bool | 1 | Whether the number of in-flight events should be limited
jana:affinity | int | 0 | Thread pinning strategy. 0: None. 1: Minimize number of memory localities. 2: Minimize number of hyperthreads.
jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local
jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty.
jana:event_queue_threshold | int | 80 | Mailbox buffer size
| jana:max_inflight_events | int | nthreads | The number of events which may be in-flight at once. Should be at least `nthreads`, more gives better load balancing. |
| jana:affinity | int | 0 | Thread pinning strategy. 0: None. 1: Minimize number of memory localities. 2: Minimize number of hyperthreads. |
| jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local |
| jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty. |


Creating code skeletons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ void InitPlugin(JApplication* app) {

app->SetParameterValue("nthreads", 4);
app->SetParameterValue("jana:extended_report", true);
app->SetParameterValue("jana:limit_total_events_in_flight", true);
app->SetParameterValue("jana:event_pool_size", 16);
app->SetParameterValue("jana:max_inflight_events", 16);

// TODO: Consider making streamDet:sub_socket be the 'source_name', and use JESG to switch between JSES and DecodeDASSource
// TODO: Improve parametermanager interface
Expand Down
12 changes: 6 additions & 6 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ int main() {
auto topology = app.GetService<JTopologyBuilder>();
topology->set_configure_fn([&](JTopologyBuilder& builder) {

JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<JEvent*> events_in;
JMailbox<JEvent*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

Expand All @@ -108,12 +108,12 @@ int main() {
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);
source_arrow->attach(topology->event_pool, 0);
source_arrow->attach(&events_in, 1);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->attach(&events_out, 0);
proc_arrow->attach(topology->event_pool, 1);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
Expand Down
1 change: 1 addition & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(JANA2_SOURCES
Engine/JPerfMetrics.cc
Engine/JPerfSummary.cc

Topology/JArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Expand Down
6 changes: 1 addition & 5 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,10 @@ bool JArrowProcessingController::is_timed_out() {
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) {
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_max_inflight_events * 1.0) / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
else if (!m_topology->m_limit_total_events_in_flight) {
// New events are constantly emitted, each of which may contain jfactorysets which need to be warmed up
timeout_s = m_warmup_timeout_s;
}
else {
timeout_s = m_timeout_s;
}
Expand Down
14 changes: 7 additions & 7 deletions src/libraries/JANA/JEvent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ bool JEvent::HasParent(JEventLevel level) const {

const JEvent& JEvent::GetParent(JEventLevel level) const {
for (const auto& pair : mParents) {
if (pair.first == level) return *(*(pair.second));
if (pair.first == level) return *pair.second;
}
throw JException("Unable to find parent at level %s",
toString(level).c_str());
}

void JEvent::SetParent(std::shared_ptr<JEvent>* parent) {
JEventLevel level = parent->get()->GetLevel();
void JEvent::SetParent(JEvent* parent) {
JEventLevel level = parent->GetLevel();
for (const auto& pair : mParents) {
if (pair.first == level) throw JException("Event already has a parent at level %s",
toString(parent->get()->GetLevel()).c_str());
toString(parent->GetLevel()).c_str());
}
mParents.push_back({level, parent});
parent->get()->mReferenceCount.fetch_add(1);
parent->mReferenceCount.fetch_add(1);
}

std::shared_ptr<JEvent>* JEvent::ReleaseParent(JEventLevel level) {
JEvent* JEvent::ReleaseParent(JEventLevel level) {
if (mParents.size() == 0) {
throw JException("ReleaseParent failed: child has no parents!");
}
Expand All @@ -88,7 +88,7 @@ std::shared_ptr<JEvent>* JEvent::ReleaseParent(JEventLevel level) {
toString(level).c_str(), toString(pair.first).c_str());
}
mParents.pop_back();
auto remaining_refs = pair.second->get()->mReferenceCount.fetch_sub(1);
auto remaining_refs = pair.second->mReferenceCount.fetch_sub(1);
if (remaining_refs < 1) { // Remember, this was fetched _before_ the last subtraction
throw JException("Parent refcount has gone negative!");
}
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/JEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class JEvent : public std::enable_shared_from_this<JEvent> {
bool mIsBarrierEvent = false;

// Hierarchical event memory management
std::vector<std::pair<JEventLevel, std::shared_ptr<JEvent>*>> mParents;
std::vector<std::pair<JEventLevel, JEvent*>> mParents;
std::atomic_int mReferenceCount {1};
int64_t mEventIndex = -1;

Expand Down Expand Up @@ -89,8 +89,8 @@ class JEvent : public std::enable_shared_from_this<JEvent> {

bool HasParent(JEventLevel level) const;
const JEvent& GetParent(JEventLevel level) const;
void SetParent(std::shared_ptr<JEvent>* parent);
std::shared_ptr<JEvent>* ReleaseParent(JEventLevel level);
void SetParent(JEvent* parent);
JEvent* ReleaseParent(JEventLevel level);
void Release();

// Lifecycle
Expand Down
164 changes: 164 additions & 0 deletions src/libraries/JANA/JEventFolder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once

#include <JANA/Components/JComponent.h>
#include <JANA/Components/JHasInputs.h>
#include <JANA/Components/JHasOutputs.h>
#include <JANA/Components/JHasRunCallbacks.h>
#include <JANA/JEvent.h>

class JApplication;
class JEventFolder : public jana::components::JComponent,
public jana::components::JHasRunCallbacks,
public jana::components::JHasInputs,
public jana::components::JHasOutputs {

private:
int32_t m_last_run_number = -1;
JEventLevel m_child_level;
bool m_call_preprocess_upstream = true;


public:

JEventFolder() = default;
virtual ~JEventFolder() {};

virtual void Init() {};

virtual void Preprocess(const JEvent& /*parent*/) const {};

virtual void Fold(const JEvent& /*child*/, JEvent& /*parent*/, int /*item_nr*/) {
throw JException("Not implemented yet!");
};

virtual void Finish() {};


// Configuration

void SetParentLevel(JEventLevel level) { m_level = level; }

void SetChildLevel(JEventLevel level) { m_child_level = level; }

void SetCallPreprocessUpstream(bool call_upstream) { m_call_preprocess_upstream = call_upstream; }

JEventLevel GetChildLevel() { return m_child_level; }


public:
// Backend

void DoInit() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Uninitialized) {
throw JException("JEventFolder: Attempting to initialize twice or from an invalid state");
}
// TODO: Obtain overrides of collection names from param manager
for (auto* parameter : m_parameters) {
parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
}
for (auto* service : m_services) {
service->Fetch(m_app);
}
CallWithJExceptionWrapper("JEventFolder::Init", [&](){Init();});
m_status = Status::Initialized;
}

void DoPreprocess(const JEvent& child) {
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Initialized) {
throw JException("JEventFolder: Component needs to be initialized and not finalized before Fold can be called");
// TODO: Consider calling Initialize(with_lock=false) like we do elsewhere
}
}
for (auto* input : m_inputs) {
input->PrefetchCollection(child);
}
if (m_callback_style != CallbackStyle::DeclarativeMode) {
CallWithJExceptionWrapper("JEventFolder::Preprocess", [&](){
Preprocess(child);
});
}
}

void DoFold(const JEvent& child, JEvent& parent) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Initialized) {
throw JException("Component needs to be initialized and not finalized before Fold() can be called");
}
if (!m_call_preprocess_upstream) {
CallWithJExceptionWrapper("JEventFolder::Preprocess", [&](){
Preprocess(parent);
});
}
if (m_last_run_number != parent.GetRunNumber()) {
for (auto* resource : m_resources) {
resource->ChangeRun(parent.GetRunNumber(), m_app);
}
if (m_callback_style == CallbackStyle::DeclarativeMode) {
CallWithJExceptionWrapper("JEventFolder::ChangeRun", [&](){
ChangeRun(parent.GetRunNumber());
});
}
else {
CallWithJExceptionWrapper("JEventFolder::ChangeRun", [&](){
ChangeRun(parent);
});
}
m_last_run_number = parent.GetRunNumber();
}
for (auto* input : m_inputs) {
input->GetCollection(parent);
// TODO: This requires that all inputs come from the parent.
// However, eventually we will want to support inputs
// that come from the child.
}
for (auto* output : m_outputs) {
output->Reset();
}
auto child_number = child.GetEventIndex();
CallWithJExceptionWrapper("JEventFolder::Fold", [&](){
Fold(child, parent, child_number);
});

for (auto* output : m_outputs) {
output->InsertCollection(parent);
}
}

void DoFinish() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Finalized) {
CallWithJExceptionWrapper("JEventFolder::Finish", [&](){
Finish();
});
m_status = Status::Finalized;
}
}

void Summarize(JComponentSummary& summary) const override {
auto* us = new JComponentSummary::Component(
"Folder", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());

for (const auto* input : m_inputs) {
size_t subinput_count = input->names.size();
for (size_t i=0; i<subinput_count; ++i) {
us->AddInput(new JComponentSummary::Collection("", input->names[i], input->type_name, input->levels[i]));
}
}
for (const auto* output : m_outputs) {
size_t suboutput_count = output->collection_names.size();
for (size_t i=0; i<suboutput_count; ++i) {
us->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
}
}
summary.Add(us);
}

};


Loading
Loading