Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for timeslices #278

Merged
merged 34 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cd39d44
Add 'level' tag to JEvent
nathanwbrei Jan 23, 2024
cfa75f8
SubeventExample correctly installed to programs/
nathanwbrei Jan 24, 2024
37585a0
Topology building logic
nathanwbrei Feb 6, 2024
21e9033
Add TimesliceExample
nathanwbrei Jan 31, 2024
8c2e869
All components know their own level
nathanwbrei Feb 7, 2024
40f679b
Add JEventUnfolder
nathanwbrei Feb 9, 2024
be2d794
JFactorySet understands event level
nathanwbrei Feb 11, 2024
e216094
Add JUnfoldArrow + test case
nathanwbrei Feb 13, 2024
0ad9a92
Fix warnings
nathanwbrei Feb 13, 2024
e77f82a
JEventPool sets event level
nathanwbrei Feb 13, 2024
b3798cb
Fixes to UnfoldArrow
nathanwbrei Feb 15, 2024
2fd3876
Add FoldArrow
nathanwbrei Feb 15, 2024
3a19826
JApp, JCM understand JEventUnfolder
nathanwbrei Feb 20, 2024
30952e6
Skeleton of recursive topology building
nathanwbrei Feb 22, 2024
9b6cc03
JArrowTopology::event_pool is no longer a shared_ptr
nathanwbrei Feb 23, 2024
0fbad31
Add JEventMapArrow
nathanwbrei Feb 24, 2024
bcb4501
Rough cut of topology building with unfold/fold
nathanwbrei Feb 25, 2024
44b8c64
USE_ASAN no longer pulls in -O2
nathanwbrei Feb 26, 2024
b903205
Excise last references to removed 'jana:legacy_mode' parameter
nathanwbrei Feb 26, 2024
6087d8d
JTopologyBuilder: Test cases pass
nathanwbrei Feb 26, 2024
19bbb23
Rough cut of JTopologyBuilder::attach_lower_level
nathanwbrei Feb 27, 2024
bb2757a
Change representation of JEvent::mParents
nathanwbrei Feb 27, 2024
5ad990b
Finish attaching arrows between levels
nathanwbrei Feb 28, 2024
2a297bc
Fix unused variable warning
nathanwbrei Feb 28, 2024
86cc2a8
Rough cut of Timeslice test case
nathanwbrei Feb 29, 2024
7b58019
JTopologyBuilder: Build everything before calling configurer
nathanwbrei Feb 29, 2024
1d381b0
TopologyBuilder fix
nathanwbrei Mar 1, 2024
0ecdb5a
Move JEvent parent release logic into JEvent
nathanwbrei Mar 1, 2024
111703d
Test JFoldArrow
nathanwbrei Mar 1, 2024
482a22f
Small UnfoldTests fix
nathanwbrei Mar 3, 2024
ea6f636
Fix refcount<0 bug in TimeslicesTest
nathanwbrei Mar 3, 2024
adf865e
Identify sinks correctly
nathanwbrei Mar 3, 2024
085183e
Fix JFoldArrow processed count
nathanwbrei Mar 3, 2024
6787236
Flesh out TimesliceExample
nathanwbrei Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ if (${USE_XERCES})
endif()

if (${USE_ASAN})
add_compile_options(-fsanitize=address -g -O2)
add_compile_options(-fsanitize=address)
add_link_options(-fsanitize=address)
endif()

if (${USE_TSAN})
add_compile_options(-fsanitize=thread -g -O2)
add_compile_options(-fsanitize=thread)
add_link_options(-fsanitize=thread)
endif()

Expand Down
3 changes: 2 additions & 1 deletion src/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ add_subdirectory(BlockExample)
add_subdirectory(SubeventExample)
add_subdirectory(SubeventCUDAExample)
add_subdirectory(UnitTestingExample)
add_subdirectory(PodioExample)
add_subdirectory(PodioExample)
add_subdirectory(TimesliceExample)
2 changes: 0 additions & 2 deletions src/examples/StreamingExample/ZmqMain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ void InitPlugin(JApplication *app) {
app->Add(new AHitAnomalyDetector(app, 5000));
app->Add(new JFactoryGeneratorT<AHitParser>());

// So we don't have to put this on the cmd line every time
app->SetParameterValue("jana:legacy_mode", 0);
app->SetParameterValue("jana:extended_report", 0);

new std::thread(dummy_publisher_loop);
Expand Down
4 changes: 2 additions & 2 deletions src/examples/SubeventExample/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set (SubeventExample_SOURCES
add_executable(SubeventExample ${SubeventExample_SOURCES})

target_link_libraries(SubeventExample jana2)
set_target_properties(SubeventExample PROPERTIES PREFIX "" OUTPUT_NAME "SubeventExample" SUFFIX ".so")
install(TARGETS SubeventExample DESTINATION plugins)
set_target_properties(SubeventExample PROPERTIES PREFIX "" OUTPUT_NAME "SubeventExample")
install(TARGETS SubeventExample DESTINATION programs)


16 changes: 16 additions & 0 deletions src/examples/TimesliceExample/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@


if (USE_PODIO)
set (TimesliceExample_SOURCES
TimesliceExample.cc
)

add_library(TimesliceExample SHARED ${TimesliceExample_SOURCES})
target_link_libraries(TimesliceExample jana2 podio::podio PodioExampleDatamodel PodioExampleDatamodelDict podio::podioRootIO)
set_target_properties(TimesliceExample PROPERTIES PREFIX "" SUFFIX ".so" OUTPUT_NAME "TimesliceExample")
install(TARGETS TimesliceExample DESTINATION programs)

else()
message(STATUS "Skipping examples/TimesliceExample because USE_PODIO=Off")
endif()

18 changes: 18 additions & 0 deletions src/examples/TimesliceExample/MyDataModel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once

#include <JANA/JObject.h>

struct MyHit : public JObject {
int hit_id;
int energy, x, y;
};

struct MyCluster : public JObject {
int cluster_id;
int energy, x, y;
std::vector<int> hits;
};

39 changes: 39 additions & 0 deletions src/examples/TimesliceExample/MyEventFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@



// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JFactoryT.h>
#include <JANA/Omni/JOmniFactory.h>


struct MyClusterFactory : public JFactoryT<MyCluster> {

int init_call_count = 0;
int change_run_call_count = 0;
int process_call_count = 0;

MyClusterFactory() {
SetLevel(JEventLevel::Event);
}

void Init() override {
++init_call_count;
}

void ChangeRun(const std::shared_ptr<const JEvent>&) override {
++change_run_call_count;
}

void Process(const std::shared_ptr<const JEvent>& event) override {
++process_call_count;

auto protos = event->Get<MyCluster>("protos");
// TODO: Output something sensible
}
};


35 changes: 35 additions & 0 deletions src/examples/TimesliceExample/MyEventProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventProcessor.h>

struct ExampleEventProcessor : public JEventProcessor {

std::mutex m_mutex;

ExampleTimesliceProcessor() {
SetEventLevel(JEvent::Level::Event);
}

void Process(const std::shared_ptr<const JEvent>& event) {

std::lock_guard<std::mutex> guard(m_mutex);

auto outputs = event->Get<MyOutput>();
// assert(outputs.size() == 4);
// assert(outputs[0]->z == 25.6f);
// assert(outputs[1]->z == 26.5f);
// assert(outputs[2]->z == 27.4f);
// assert(outputs[3]->z == 28.3f);
LOG << " Contents of event " << event->GetEventNumber() << LOG_END;
for (auto output : outputs) {
LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
}
LOG << " DONE with contents of event " << event->GetEventNumber() << LOG_END;
}
};


37 changes: 37 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"

#include <JANA/Omni/JOmniFactory.h>
#include <JANA/JFactoryT.h>


struct MyProtoClusterFactory : public JFactoryT<MyCluster> {

int init_call_count = 0;
int change_run_call_count = 0;
int process_call_count = 0;

MyProtoClusterFactory() {
SetLevel(JEventLevel::Timeslice);
}

void Init() override {
++init_call_count;
}

void ChangeRun(const std::shared_ptr<const JEvent>&) override {
++change_run_call_count;
}

void Process(const std::shared_ptr<const JEvent>& event) override {
++process_call_count;

auto protos = event->Get<MyCluster>("protos");
// TODO: Output something sensible
}
};


40 changes: 40 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceSource.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventSource.h>


struct MyTimesliceSource : public JEventSource {

MyTimesliceSource(std::string source_name, JApplication *app) : JEventSource(source_name, app) {
SetLevel(JEventLevel::Timeslice);
}

static std::string GetDescription() { return "MyTimesliceSource"; }

std::string GetType(void) const override { return JTypeInfo::demangle<decltype(*this)>(); }

void Open() override { }

void GetEvent(std::shared_ptr<JEvent> event) override {

auto evt = event->GetEventNumber();
std::vector<MyInput*> inputs;
inputs.push_back(new MyInput(22,3.6,evt,0));
inputs.push_back(new MyInput(23,3.5,evt,1));
inputs.push_back(new MyInput(24,3.4,evt,2));
inputs.push_back(new MyInput(25,3.3,evt,3));
inputs.push_back(new MyInput(26,3.2,evt,4));
event->Insert(inputs);

auto hits = std::make_unique<ExampleHitCollection>();
hits.push_back(ExampleHit(22));
hits.push_back(ExampleHit(23));
hits.push_back(ExampleHit(24));
event->InsertCollection(hits);

jout << "MyTimesliceSource: Emitting " << event->GetEventNumber() << jendl;
}
};
43 changes: 43 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceUnfolder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventUnfolder.h>

struct ExampleTimesliceUnfolder : public JEventUnfolder {

MyTimesliceUnfolder() {
SetParentLevel(JEventLevel::Timeslice);
SetChildLevel(JEventLevel::Event);
}

void Preprocess(const JEvent& parent) const override {
parent->Get<MyCluster>("protos");
}

Result Unfold(const JEvent& parent, JEvent& child, int item) override {
auto protos = parent->Get<MyCluster>("protos");

child.SetEventNumber(parent.GetEventNumber()*10 + item);
LOG << "Unfolding parent=" << parent.GetEventNumber() << ", child=" << child.GetEventNumber() << ", item=" << item << LOG_END;

std::vector<MyCluster*> child_protos;
for (auto proto: protos) {
if (true) {
// TODO: condition
child_protos.push_back(proto);
}
}
child->Insert(child_protos, "event_protos")->SetFactoryFlag(JFactoryFlag::NOT_OBJECT_OWNER);

if (item == 3) {
jout << "Unfold found item 3, finishing join" << jendl;
return Result::Finished;
}
return Result::KeepGoing;
}
}



30 changes: 30 additions & 0 deletions src/examples/TimesliceExample/TimesliceExample.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.


#include "MyTimesliceSource.h"
#include "MyTimesliceUnfolder.h"
#include "MyEventProcessor.h"
#include "MyTimesliceFactory.h"
#include "MyEventFactory.h"

#include <JANA/JApplication.h>


extern "C"{
void InitPlugin(JApplication *app) {

InitJANAPlugin(app);

app->Add(new MyTimesliceSource("Dummy"));
app->Add(new MyTimesliceUnfolder);
app->Add(new MyEventProcessor);

app->Add(new JFactoryGeneratorT<MyTimesliceFactory>());
app->Add(new JFactoryGeneratorT<MyEventFactory>());

app->SetParameterValue("jana:extended_report", 0);
}
} // "C"


4 changes: 4 additions & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ set(JANA2_SOURCES
Engine/JEventSourceArrow.h
Engine/JBlockSourceArrow.h
Engine/JBlockDisentanglerArrow.h
Engine/JEventMapArrow.h
Engine/JEventMapArrow.cc
Engine/JPool.h

Engine/JMailbox.h
Expand Down Expand Up @@ -244,6 +246,7 @@ file(GLOB jana_calibs_headers "Calibrations/*.h*")
file(GLOB jana_cli_headers "CLI/*.h*")
file(GLOB jana_compat_headers "Compatibility/*.h*")
file(GLOB jana_podio_headers "Podio/*.h*")
file(GLOB jana_omni_headers "Omni/*.h*")

install(FILES ${jana_headers} DESTINATION include/JANA)
install(FILES ${jana_engine_headers} DESTINATION include/JANA/Engine)
Expand All @@ -254,6 +257,7 @@ install(FILES ${jana_utils_headers} DESTINATION include/JANA/Utils)
install(FILES ${jana_calibs_headers} DESTINATION include/JANA/Calibrations)
install(FILES ${jana_cli_headers} DESTINATION include/JANA/CLI)
install(FILES ${jana_compat_headers} DESTINATION include/JANA/Compatibility)
install(FILES ${jana_omni_headers} DESTINATION include/JANA/Omni)

if (${USE_PODIO})
install(FILES ${jana_podio_headers} DESTINATION include/JANA/Podio)
Expand Down
7 changes: 6 additions & 1 deletion src/libraries/JANA/Engine/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class JArrow {
const std::string m_name; // Used for human understanding
const bool m_is_parallel; // Whether or not it is safe to parallelize
const bool m_is_source; // Whether or not this arrow should activate/drain the topology
const bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties
Expand All @@ -39,6 +39,7 @@ class JArrow {

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows
friend class JTopologyBuilder;
std::vector<PlaceRefBase*> m_places; // Will eventually supplant m_listeners, m_chunksize

protected:
Expand All @@ -57,6 +58,10 @@ class JArrow {
m_logger = logger;
}

void set_is_sink(bool is_sink) {
m_is_sink = is_sink;
}

// TODO: Get rid of me
void set_chunksize(size_t chunksize) {
std::lock_guard<std::mutex> lock(m_arrow_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Engine/JArrowTopology.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct JArrowTopology {
// Ensure that ComponentManager stays alive at least as long as JArrowTopology does
// Otherwise there is a potential use-after-free when JArrowTopology or JArrowProcessingController access components

std::shared_ptr<JEventPool> event_pool; // TODO: Move into pools eventually
JEventPool* event_pool; // TODO: Move into pools eventually
JPerfMetrics metrics;

std::vector<JArrow*> arrows;
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Engine/JBlockDisentanglerArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class JBlockDisentanglerArrow : public JArrow {
JBlockedEventSource<T>* m_source; // non-owning
JMailbox<T*>* m_block_queue; // non-owning
JMailbox<std::shared_ptr<JEvent>*>* m_event_queue; // non-owning
std::shared_ptr<JEventPool> m_pool;
JEventPool* m_pool;

size_t m_max_events_per_block = 40;

Expand All @@ -23,13 +23,13 @@ class JBlockDisentanglerArrow : public JArrow {
JBlockedEventSource<T>* source,
JMailbox<T*>* block_queue,
JMailbox<std::shared_ptr<JEvent>*>* event_queue,
std::shared_ptr<JEventPool> pool
JEventPool* pool
)
: JArrow(std::move(name), true, false, false, 1)
, m_source(source)
, m_block_queue(block_queue)
, m_event_queue(event_queue)
, m_pool(std::move(pool))
, m_pool(pool)
{}

~JBlockDisentanglerArrow() {
Expand Down
Loading
Loading