Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify JEventProcessor API #292

Merged
merged 3 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/examples/DstExample/DstExampleProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@

DstExampleProcessor::DstExampleProcessor() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void DstExampleProcessor::Init() {
LOG << "DstExampleProcessor::Init" << LOG_END;
// Open TFiles, set up TTree branches, etc
}

void DstExampleProcessor::Process(const std::shared_ptr<const JEvent> &event) {
LOG << "DstExampleProcessor::Process, Event #" << event->GetEventNumber() << LOG_END;
void DstExampleProcessor::Process(const JEvent& event) {
LOG << "DstExampleProcessor::Process, Event #" << event.GetEventNumber() << LOG_END;

/// Note that GetAllChildren won't trigger any new computations, it will only
/// project down results which already exist in the JEvent. In order to obtain
/// results from our DstExampleFactory, we need to trigger it explicitly using
/// either JEvent::Get or JEvent::GetAll.

event->Get<MyRenderableJObject>("from_factory");
event.Get<MyRenderableJObject>("from_factory");

/// Now we can project our event down to a map of Renderables and a separate
/// map of JObjects. Note we do this in parallel.
auto renderable_map = event->GetAllChildren<Renderable>();
auto jobject_map = event->GetAllChildren<JObject>();
auto renderable_map = event.GetAllChildren<Renderable>();
auto jobject_map = event.GetAllChildren<JObject>();

/// Senquentially, we iterate over all of our Renderables and JObjects and use
/// whatever functionality each interface provides.
Expand Down
2 changes: 1 addition & 1 deletion src/examples/DstExample/DstExampleProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DstExampleProcessor : public JEventProcessor {
virtual ~DstExampleProcessor() = default;

void Init() override;
void Process(const std::shared_ptr<const JEvent>& event) override;
void Process(const JEvent&) override;
void Finish() override;

};
Expand Down
14 changes: 9 additions & 5 deletions src/examples/EventGroupExample/GroupedEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
/// GroupedEventProcessor demonstrates basic usage of JEventGroups

class GroupedEventProcessor : public JEventProcessor {
std::mutex m_mutex;

public:
void Process(const std::shared_ptr<const JEvent>& event) override {
GroupedEventProcessor() {
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Process(const JEvent& event) override {

// In parallel, perform a random amount of (slow) computation
consume_cpu_ms(100, 1.0);

auto tridas_event = event->GetSingle<TridasEvent>();
auto tridas_event = event.GetSingle<TridasEvent>();
tridas_event->should_keep = true;

auto group = event->GetSingle<JEventGroup>();
auto group = event.GetSingle<JEventGroup>();

// Sequentially, process each event and report when a group finishes
std::lock_guard<std::mutex> lock(m_mutex);

LOG << "Processed group #" << group->GetGroupId() << ", event #" << event->GetEventNumber() << LOG_END;
LOG << "Processed group #" << group->GetGroupId() << ", event #" << event.GetEventNumber() << LOG_END;

bool finishes_group = group->FinishEvent();
if (finishes_group) {
Expand Down
11 changes: 6 additions & 5 deletions src/examples/MetadataExample/MetadataAggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

MetadataAggregator::MetadataAggregator() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void MetadataAggregator::Init() {
Expand All @@ -20,17 +21,17 @@ void MetadataAggregator::Init() {
LOG << "MetadataAggregator::Init" << LOG_END;
}

void MetadataAggregator::Process(const std::shared_ptr<const JEvent> &event) {
LOG << "MetadataAggregator::Process, Run #" << event->GetRunNumber() << ", Event #" << event->GetEventNumber() << LOG_END;
void MetadataAggregator::Process(const JEvent& event) {
LOG << "MetadataAggregator::Process, Run #" << event.GetRunNumber() << ", Event #" << event.GetEventNumber() << LOG_END;

// Acquire tracks in parallel
auto tracks = event->Get<Track>(m_track_factory);
auto tracks = event.Get<Track>(m_track_factory);

// Lock mutex, so we can update shared state sequentially
std::lock_guard<std::mutex>lock(m_mutex);

// Since the run number probably doesn't change too frequently we cache the last entry
int run_nr = event->GetRunNumber();
int run_nr = event.GetRunNumber();
if (run_nr != m_last_run_nr) {
m_last_run_nr = run_nr;
m_last_statistics = &m_statistics[m_last_run_nr]; // Get-or-create
Expand All @@ -39,7 +40,7 @@ void MetadataAggregator::Process(const std::shared_ptr<const JEvent> &event) {
// Update the statistics accumulator using the metadata from this event
m_last_statistics->event_count += 1;
m_last_statistics->total_track_count += tracks.size();
m_last_statistics->total_latency_ns += event->GetMetadata<Track>(m_track_factory).elapsed_time_ns;
m_last_statistics->total_latency_ns += event.GetMetadata<Track>(m_track_factory).elapsed_time_ns;
}

void MetadataAggregator::Finish() {
Expand Down
4 changes: 1 addition & 3 deletions src/examples/MetadataExample/MetadataAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class MetadataAggregator : public JEventProcessor {
std::chrono::nanoseconds total_latency_ns {0};
};

std::mutex m_mutex;

std::string m_track_factory = "smeared"; // So we can choose what we are measuring at runtime

std::map<int, Statistics> m_statistics; // Keyed off of run nr
Expand All @@ -43,7 +41,7 @@ class MetadataAggregator : public JEventProcessor {
virtual ~MetadataAggregator() = default;

void Init() override;
void Process(const std::shared_ptr<const JEvent>& event) override;
void Process(const JEvent&) override;
void Finish() override;

};
Expand Down
19 changes: 12 additions & 7 deletions src/examples/PodioExample/PodioExampleProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@ struct PrintingVisitor {
// TODO: C++20 visitor using a lambda overload set
// TODO: Less generic visitor, e.g. VisitCollection

void PodioExampleProcessor::Process(const std::shared_ptr<const JEvent> &event) {
PodioExampleProcessor::PodioExampleProcessor() {
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void PodioExampleProcessor::Process(const JEvent& event) {

// Obtain a typed collection just like you would in a JFactory
auto hits = event->GetCollection<ExampleHit>("hits");
auto hits_filtered = event->GetCollection<ExampleHit>("hits_filtered");
auto clusters = event->GetCollection<ExampleCluster>("clusters");
auto clusters_filtered = event->GetCollection<ExampleCluster>("clusters_filtered");
auto clusters_from_hits_filtered = event->GetCollection<ExampleCluster>("clusters_from_hits_filtered");
auto hits = event.GetCollection<ExampleHit>("hits");
auto hits_filtered = event.GetCollection<ExampleHit>("hits_filtered");
auto clusters = event.GetCollection<ExampleCluster>("clusters");
auto clusters_filtered = event.GetCollection<ExampleCluster>("clusters_filtered");
auto clusters_from_hits_filtered = event.GetCollection<ExampleCluster>("clusters_from_hits_filtered");

std::cout << "========================" << std::endl;
std::cout << "Event nr: " << event->GetEventNumber() << ", Cluster count: " << clusters->size() << std::endl;
std::cout << "Event nr: " << event.GetEventNumber() << ", Cluster count: " << clusters->size() << std::endl;

std::cout << "hits:" << std::endl;
for (const ExampleHit& hit : *hits) {
Expand Down
5 changes: 2 additions & 3 deletions src/examples/PodioExample/PodioExampleProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@

class PodioExampleProcessor : public JEventProcessor {
public:
void Process(const std::shared_ptr<const JEvent> &ptr) override;


PodioExampleProcessor();
void Process(const JEvent&) override;
};


Expand Down
12 changes: 7 additions & 5 deletions src/examples/StreamingExample/AHitAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ class AHitAnomalyDetector : public JEventProcessor {
public:
AHitAnomalyDetector(JApplication* app = nullptr, size_t delay_ms=1000)
: JEventProcessor(app)
, m_delay_ms(delay_ms) {};
, m_delay_ms(delay_ms) {
SetCallbackStyle(CallbackStyle::ExpertMode);
};

void Init() override {

}
void Process(const std::shared_ptr<const JEvent>& event) override {
void Process(const JEvent& event) override {

auto a_hits = event->Get<AHit>();
auto a_hits = event.Get<AHit>();
std::stringstream ss;
ss << "Anomaly detection: Event #" << event->GetEventNumber() << " : {";
ss << "Anomaly detection: Event #" << event.GetEventNumber() << " : {";
for (auto & hit : a_hits) {
ss << "(" << hit->E << "," << hit->x << "), ";
}
Expand All @@ -42,4 +44,4 @@ class AHitAnomalyDetector : public JEventProcessor {
};


#endif
#endif
18 changes: 10 additions & 8 deletions src/examples/StreamingExample/AHitBHitFuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ class AHitBHitFuser : public JEventProcessor {
public:
AHitAnomalyDetector(JApplication* app = nullptr, size_t delay_ms=1000)
: JEventProcessor(app)
, m_delay_ms(delay_ms) {};
, m_delay_ms(delay_ms) {
SetCallbackStyle(CallbackStyle::ExpertMode);
};

void Init() override {

}
void Process(const std::shared_ptr<const JEvent>& event) override {
void Process(const JEvent& event) override {

auto a_hits = event->Get<AHit>();
auto b_hits = event->Get<BHit>();
auto a_hits = event.Get<AHit>();
auto b_hits = event.Get<BHit>();
std::stringstream ss;
ss << "AHit/BHit fusion: Event #" << event->GetEventNumber() << " : {";
ss << "AHit/BHit fusion: Event #" << event.GetEventNumber() << " : {";
for (auto & hit : a_hits) {
ss << "(" << hit->E << "," << hit->t << "), ";
}
Expand All @@ -40,10 +42,10 @@ class AHitBHitFuser : public JEventProcessor {
consume_cpu_ms(m_delay_ms);


auto raw_hits = event->Get<AHit>("raw_hits");
auto raw_hits = event.Get<AHit>("raw_hits");


std::cout << "Processing event #" << event->GetEventNumber() << std::endl;
std::cout << "Processing event #" << event.GetEventNumber() << std::endl;
Serializer<AHit> serializer;
for (auto & hit : raw_hits) {
AHit* calibrated_hit = new DetectorAHit(*hit);
Expand All @@ -61,4 +63,4 @@ class AHitBHitFuser : public JEventProcessor {
};


#endif
#endif
13 changes: 8 additions & 5 deletions src/examples/SubeventCUDAExample/SubeventCUDAExample.cu
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,26 @@ struct SimpleSource : public JEventSource {
};

struct SimpleProcessor : public JEventProcessor {
std::mutex m_mutex;

void Process(const std::shared_ptr<const JEvent> &event) {
SimpleProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Process(const JEvent& event) {

std::lock_guard <std::mutex> guard(m_mutex);

auto outputs = event->Get<MyOutput>();
auto outputs = event.Get<MyOutput>();
// assert(outputs.size() == 4);
// assert(outputs[0]->z == 25.6f);
// assert(outputs[1]->z == 26.5f);
// assert(outputs[2]->z == 27.4f);
// assert(outputs[3]->z == 28.3f);
LOG << " Contents of event " << event->GetEventNumber() << LOG_END;
LOG << " Contents of event " << event.GetEventNumber() << LOG_END;
for (auto output: outputs) {
LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
}
LOG << " DONE with contents of event " << event->GetEventNumber() << LOG_END;
LOG << " DONE with contents of event " << event.GetEventNumber() << LOG_END;
}
};

Expand Down
12 changes: 7 additions & 5 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,24 @@ struct SimpleSource : public JEventSource {
};

struct SimpleProcessor : public JEventProcessor {
std::mutex m_mutex;
void Process(const std::shared_ptr<const JEvent>& event) {
SimpleProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
void Process(const JEvent& event) {

std::lock_guard<std::mutex> guard(m_mutex);

auto outputs = event->Get<MyOutput>();
auto outputs = event.Get<MyOutput>();
// assert(outputs.size() == 4);
// assert(outputs[0]->z == 25.6f);
// assert(outputs[1]->z == 26.5f);
// assert(outputs[2]->z == 27.4f);
// assert(outputs[3]->z == 28.3f);
LOG << " Contents of event " << event->GetEventNumber() << LOG_END;
LOG << " Contents of event " << event.GetEventNumber() << LOG_END;
for (auto output : outputs) {
LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
}
LOG << " DONE with contents of event " << event->GetEventNumber() << LOG_END;
LOG << " DONE with contents of event " << event.GetEventNumber() << LOG_END;
}
};

Expand Down
13 changes: 7 additions & 6 deletions src/examples/TimesliceExample/MyFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,35 @@ struct MyFileWriter : public JEventProcessor {

MyFileWriter() {
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void Init() {
m_writer = std::make_unique<podio::ROOTFrameWriter>("output.root");
}

void Process(const std::shared_ptr<const JEvent>& event) {
void Process(const JEvent& event) {

std::lock_guard<std::mutex> guard(m_mutex);
if (event->HasParent(JEventLevel::Timeslice)) {
if (event.HasParent(JEventLevel::Timeslice)) {

auto& ts = event->GetParent(JEventLevel::Timeslice);
auto& ts = event.GetParent(JEventLevel::Timeslice);
auto ts_nr = ts.GetEventNumber();

if (event->GetEventIndex() == 0) {
if (event.GetEventIndex() == 0) {
m_writer->writeFrame(*(m_ts_frame_in().at(0)), "timeslices");
}

LOG_DEBUG(GetLogger())
<< "Event " << event->GetEventNumber() << " from Timeslice " << ts_nr
<< "Event " << event.GetEventNumber() << " from Timeslice " << ts_nr
<< "\nClusters\n"
<< TabulateClusters(m_evt_clusters_in())
<< LOG_END;
}
else {

LOG_DEBUG(GetLogger())
<< "Event " << event->GetEventNumber()
<< "Event " << event.GetEventNumber()
<< "\nClusters\n"
<< TabulateClusters(m_evt_clusters_in())
<< LOG_END;
Expand Down
7 changes: 4 additions & 3 deletions src/examples/Tutorial/TutorialProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

TutorialProcessor::TutorialProcessor() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void TutorialProcessor::Init() {
Expand All @@ -20,12 +21,12 @@ void TutorialProcessor::Init() {
}
}

void TutorialProcessor::Process(const std::shared_ptr<const JEvent> &event) {
LOG << "TutorialProcessor::Process, Event #" << event->GetEventNumber() << LOG_END;
void TutorialProcessor::Process(const JEvent& event) {
LOG << "TutorialProcessor::Process, Event #" << event.GetEventNumber() << LOG_END;

/// Do everything we can in parallel
/// Warning: We are only allowed to use local variables and `event` here
auto hits = event->Get<Hit>();
auto hits = event.Get<Hit>();

/// Lock mutex
std::lock_guard<std::mutex>lock(m_mutex);
Expand Down
5 changes: 2 additions & 3 deletions src/examples/Tutorial/TutorialProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@

class TutorialProcessor : public JEventProcessor {

/// Shared state (e.g. histograms, TTrees, TFiles) live
/// Shared state (e.g. histograms, TTrees, TFiles) live here
double m_heatmap[100][100];
std::mutex m_mutex;

public:

TutorialProcessor();
virtual ~TutorialProcessor() = default;

void Init() override;
void Process(const std::shared_ptr<const JEvent>& event) override;
void Process(const JEvent& event) override;
void Finish() override;

};
Expand Down
Loading
Loading