Skip to content

Commit

Permalink
Detemplatize JPool
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Oct 17, 2024
1 parent ca19a09 commit f735a55
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 159 deletions.
12 changes: 6 additions & 6 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <JANA/JLogger.h>
#include <JANA/JException.h>
#include <JANA/Topology/JMailbox.h>
#include <JANA/Topology/JPool.h>
#include <JANA/Topology/JEventPool.h>


#ifndef JANA2_ARROWDATA_MAX_SIZE
Expand All @@ -36,7 +36,7 @@ class JArrow {
// This is usable by subclasses.
JLogger m_logger;
friend class JTopologyBuilder;
std::vector<Place*> m_places; // Will eventually supplant m_listeners, m_chunksize
std::vector<Place*> m_places; // Will eventually supplant m_listeners

public:
std::string get_name() { return m_name; }
Expand Down Expand Up @@ -113,7 +113,7 @@ struct Place {
this->is_queue = true;
}

void set_pool(JPool<EventT>* pool) {
void set_pool(JEventPool* pool) {
assert(pool != nullptr);
this->place_ref = pool;
this->is_queue = false;
Expand All @@ -138,7 +138,7 @@ struct Place {
return (data.item_count >= min_item_count);
}
else {
auto pool = static_cast<JPool<EventT>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = 0;
return (data.item_count >= min_item_count);
Expand Down Expand Up @@ -169,7 +169,7 @@ struct Place {
}
else {
if (is_input) {
auto pool = static_cast<JPool<EventT>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, false, data.location_id);
}
}
Expand All @@ -185,7 +185,7 @@ struct Place {
return is_input ? 0 : data.item_count;
}
else {
auto pool = static_cast<JPool<EventT>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, !is_input, data.location_id);
data.item_count = 0;
data.reserve_count = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,74 +1,83 @@
// Copyright 2023, Jefferson Science Associates, LLC.

// Copyright 2020, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.


#pragma once
#include <JANA/Utils/JCpuInfo.h>
#include <JANA/JLogger.h>

#include <JANA/JEvent.h>
#include <JANA/Services/JComponentManager.h>
#include <JANA/JEvent.h>
#include <mutex>
#include <vector>


class JPoolBase {
protected:
size_t m_pool_size;
size_t m_location_count;
bool m_limit_total_events_in_flight;
public:
JPoolBase(
size_t pool_size,
size_t location_count,
bool limit_total_events_in_flight)
: m_pool_size(pool_size)
, m_location_count(location_count)
, m_limit_total_events_in_flight(limit_total_events_in_flight) {}

virtual ~JPoolBase() = default;
};

template <typename T>
class JPool : public JPoolBase {
class JEventPool {
private:
struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool {
std::mutex mutex;
std::vector<T*> available_items;
std::vector<T> items;
std::vector<std::shared_ptr<JEvent>*> available_items;
std::vector<std::shared_ptr<JEvent>> items;
};

std::unique_ptr<LocalPool[]> m_pools;

size_t m_pool_size;
size_t m_location_count;
bool m_limit_total_events_in_flight;

std::shared_ptr<JComponentManager> m_component_manager;
JEventLevel m_level;


public:
JPool(size_t pool_size,
size_t location_count,
bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight)
{
inline JEventPool(std::shared_ptr<JComponentManager> component_manager,
size_t pool_size,
size_t location_count,
bool limit_total_events_in_flight,
JEventLevel level = JEventLevel::PhysicsEvent)

: m_pool_size(pool_size)
, m_location_count(location_count)
, m_limit_total_events_in_flight(limit_total_events_in_flight)
, m_component_manager(component_manager)
, m_level(level) {

assert(m_location_count >= 1);
assert(m_pool_size > 0 || !m_limit_total_events_in_flight);
}

virtual ~JPool() = default;

void init() {
m_pools = std::unique_ptr<LocalPool[]>(new LocalPool[m_location_count]());

for (size_t j=0; j<m_location_count; ++j) {

m_pools[j].items = std::vector<T>(m_pool_size); // Default-construct everything in place
m_pools[j].items = std::vector<std::shared_ptr<JEvent>>(m_pool_size); // Default-construct everything in place

for (T& item : m_pools[j].items) {
for (auto& item : m_pools[j].items) {
configure_item(&item);
m_pools[j].available_items.push_back(&item);
}
}
}

virtual void configure_item(T*) {
void configure_item(std::shared_ptr<JEvent>* item) {
(*item) = std::make_shared<JEvent>();
m_component_manager->configure_event(**item);
item->get()->SetLevel(m_level); // This needs to happen _after_ configure_event
}

virtual void release_item(T*) {
void release_item(std::shared_ptr<JEvent>* item) {
if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item);
(*item)->mFactorySet->Release();
(*item)->mInspector.Reset();
(*item)->GetJCallGraphRecorder()->Reset();
(*item)->Reset();
}


T* get(size_t location=0) {
std::shared_ptr<JEvent>* get(size_t location=0) {

assert(m_pools != nullptr); // If you hit this, you forgot to call init().
LocalPool& pool = m_pools[location % m_location_count];
Expand All @@ -79,20 +88,20 @@ class JPool : public JPoolBase {
return nullptr;
}
else {
auto t = new T;
auto t = new std::shared_ptr<JEvent>();
configure_item(t);
return t;
}
}
else {
T* item = pool.available_items.back();
std::shared_ptr<JEvent>* item = pool.available_items.back();
pool.available_items.pop_back();
return item;
}
}


void put(T* item, bool release, size_t location) {
void put(std::shared_ptr<JEvent>* item, bool release, size_t location) {

assert(m_pools != nullptr); // If you hit this, you forgot to call init().

Expand All @@ -118,7 +127,7 @@ class JPool : public JPoolBase {
}


size_t pop(T** dest, size_t min_count, size_t max_count, size_t location) {
size_t pop(std::shared_ptr<JEvent>** dest, size_t min_count, size_t max_count, size_t location) {

assert(m_pools != nullptr); // If you hit this, you forgot to call init().

Expand All @@ -135,7 +144,7 @@ class JPool : public JPoolBase {
// Return as many as we can. We aren't allowed to create any more
size_t count = std::min(available_count, max_count);
for (size_t i=0; i<count; ++i) {
T* t = pool.available_items.back();
std::shared_ptr<JEvent>* t = pool.available_items.back();
pool.available_items.pop_back();
dest[i] = t;
}
Expand All @@ -147,21 +156,21 @@ class JPool : public JPoolBase {
size_t i=0;
for (i=0; i<count; ++i) {
// Pop the items already in the pool
T* t = pool.available_items.back();
std::shared_ptr<JEvent>* t = pool.available_items.back();
pool.available_items.pop_back();
dest[i] = t;
}
for (; i<min_count; ++i) {
// If we haven't reached our min count yet, allocate just enough to reach it
auto t = new T;
auto t = new std::shared_ptr<JEvent>;
configure_item(t);
dest[i] = t;
}
return i;
}
}

void push(T** source, size_t count, bool release, size_t location) {
void push(std::shared_ptr<JEvent>** source, size_t count, bool release, size_t location) {
for (size_t i=0; i<count; ++i) {
put(source[i], release, location);
source[i] = nullptr;
Expand All @@ -170,5 +179,3 @@ class JPool : public JPoolBase {
};




1 change: 0 additions & 1 deletion src/libraries/JANA/Topology/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


#include <JANA/Topology/JEventProcessorArrow.h>
#include <JANA/Utils/JEventPool.h>
#include <JANA/JEventProcessor.h>
#include <JANA/JEventSource.h>

Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <JANA/JApplication.h>
#include <JANA/JEventSource.h>
#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Utils/JEventPool.h>



Expand Down
4 changes: 2 additions & 2 deletions src/libraries/JANA/Topology/JEventSourceArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ class JEventSourceArrow : public JArrow {
void set_input(JMailbox<Event*>* queue) {
m_input.set_queue(queue);
}
void set_input(JPool<Event>* pool) {
void set_input(JEventPool* pool) {
m_input.set_pool(pool);
}
void set_output(JMailbox<Event*>* queue) {
m_output.set_queue(queue);
}
void set_output(JPool<Event>* pool) {
void set_output(JEventPool* pool) {
m_output.set_pool(pool);
}

Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Topology/JEventTapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


#include <JANA/Topology/JEventTapArrow.h>
#include <JANA/Utils/JEventPool.h>
#include <JANA/JEventProcessor.h>
#include <JANA/JEventUnfolder.h>
#include <JANA/JEvent.h>
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Topology/JFoldArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#pragma once

#include <JANA/Topology/JArrow.h>
#include <JANA/Utils/JEventPool.h>
#include <JANA/Utils/JEventLevel.h>

class JFoldArrow : public JArrow {
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JJunctionArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include <JANA/Topology/JArrow.h>
#include <JANA/Topology/JMailbox.h>
#include <JANA/Topology/JPool.h>
#include <JANA/Topology/JEventPool.h>



Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Topology/JPipelineArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <JANA/Topology/JArrow.h>
#include <JANA/Topology/JMailbox.h>
#include <JANA/Topology/JPool.h>
#include <JANA/Topology/JEventPool.h>

using MessageT = std::shared_ptr<JEvent>;

Expand All @@ -27,13 +27,13 @@ class JPipelineArrow : public JArrow {
void set_input(JMailbox<MessageT*>* queue) {
m_input.set_queue(queue);
}
void set_input(JPool<MessageT>* pool) {
void set_input(JEventPool* pool) {
m_input.set_pool(pool);
}
void set_output(JMailbox<MessageT*>* queue) {
m_output.set_queue(queue);
}
void set_output(JPool<MessageT>* pool) {
void set_output(JEventPool* pool) {
m_output.set_pool(pool);
}

Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JTopologyBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ std::string JTopologyBuilder::print_topology() {
i += 1;
}
// Build index lookup for pools
for (JPoolBase* pool : pools) {
for (JEventPool* pool : pools) {
lookup[pool] = i;
i += 1;
}
Expand Down
5 changes: 2 additions & 3 deletions src/libraries/JANA/Topology/JTopologyBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <memory>
#include <JANA/JService.h>
#include <JANA/Utils/JProcessorMapping.h>
#include <JANA/Utils/JEventPool.h>
#include <JANA/Topology/JEventPool.h>
#include <JANA/Engine/JPerfMetrics.h> // TODO: Should't be here

#include <JANA/Services/JParameterManager.h>
Expand All @@ -18,7 +18,6 @@ class JParameterManager;
class JComponentManager;
class JArrow;
class JQueue;
class JPoolBase;
class JQueue;
class JFoldArrow;
class JUnfoldArrow;
Expand All @@ -32,7 +31,7 @@ class JTopologyBuilder : public JService {
// The topology itself
std::vector<JArrow*> arrows;
std::vector<JQueue*> queues; // Queues shared between arrows
std::vector<JPoolBase*> pools; // Pools shared between arrows
std::vector<JEventPool*> pools; // Pools shared between arrows

// Topology configuration
size_t m_pool_capacity = 4;
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Topology/JUnfoldArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include <JANA/Topology/JArrow.h>
#include <JANA/JEventUnfolder.h>
#include <JANA/Utils/JEventPool.h>

class JUnfoldArrow : public JArrow {
private:
Expand Down
Loading

0 comments on commit f735a55

Please sign in to comment.