Skip to content

Commit

Permalink
Merge pull request #516 from m-a-d-n-e-s-s/515-rmi-can-break-down-whe…
Browse files Browse the repository at this point in the history
…n-more-than-216-tasks-are-sent-per-rank-to-rank-pair

RMI can handle more than 2^16 tasks per rank-to-rank pair
  • Loading branch information
evaleev authored Jan 7, 2024
2 parents 9e61d01 + 3b38102 commit b1f1c39
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 19 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ add_feature_info(DQ_PREBUF ENABLE_DQ_PREBUF
set(MADNESS_DQ_USE_PREBUF ${ENABLE_DQ_PREBUF} CACHE BOOL
"Enables thread-local buffer for task aggregation to reduce lock contention")

set(MADNESS_DQ_PREBUF_SIZE 20 CACHE STRING "Numberof entries in the thread-pool prebuffer for task aggregation to reduce lock contention")
#set(MADNESS_DQ_PREBUF_SZ ${MADNESS_DQ_PREBUF_SIZE} CACHE STRING "Numberof entries in the thread-pool prebuffer for task aggregation to reduce lock contention")
set(MADNESS_DQ_PREBUF_SIZE 20 CACHE STRING "Number of entries in the thread-pool prebuffer for task aggregation to reduce lock contention")

option(ENABLE_BSEND_ACKS
"Use MPI Send instead of MPI Bsend for huge message acknowledgements" ON)
Expand Down
14 changes: 7 additions & 7 deletions src/madness/world/uniqueid.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ namespace madness {
unsigned long worldid; ///< ID of the \c World the object belongs to.
unsigned long objid; ///< ID of the object.

/// Constructor that sets the world and object IDs.
public:
/// Default constructor.
uniqueidT()
: worldid(0), objid(0) {};

/// Constructor that uses the world and object IDs.

/// \param[in] worldid The ID of the \c World the object belongs to.
/// \param[in] objid The ID of the object.
uniqueidT(unsigned long worldid, unsigned long objid)
: worldid(worldid), objid(objid) {};

public:
/// Constructor.
uniqueidT()
: worldid(0), objid(0) {};
: worldid(worldid), objid(objid) {};

/// nonnull state tester

Expand Down
8 changes: 8 additions & 0 deletions src/madness/world/world.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,14 @@ namespace madness {
return uniqueidT(_id,obj_id++);
}

/// Reports the next universe-wide unique ID generated by make_unique_obj_id() objects created in this \c World. No comms.

/// \return A the next universe-wide unique ID for objects created in this \c World.
/// \warning This is not re-entrant, should be called from a single (typically, main) thread
uniqueidT next_unique_obj_id() const {
return uniqueidT(_id,obj_id);
}

/// Associate a local pointer with a universe-wide unique ID.

/// Use the routines \c register_ptr(), \c unregister_ptr(),
Expand Down
6 changes: 5 additions & 1 deletion src/madness/world/world_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ namespace madness {

} // namespace detail

/// Base class for WorldObject, useful for introspection
struct WorldObjectBase {
virtual ~WorldObjectBase() = default;
};

/// Implements most parts of a globally addressable object (via unique ID).

Expand All @@ -357,7 +361,7 @@ namespace madness {
/// \tparam Derived The derived class. \c WorldObject is a curiously
/// recurring template pattern.
template <class Derived>
class WorldObject {
class WorldObject : public WorldObjectBase {
public:
/// \todo Description needed.
typedef WorldObject<Derived> objT;
Expand Down
7 changes: 5 additions & 2 deletions src/madness/world/worldrmi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace madness {
waiter.reset();
#endif

if (print_debug_info)
if (print_debug_info && narrived > 0)
print_error(rank, ":RMI: ", narrived, " messages just arrived\n");

if (narrived) {
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace madness {
// Only ordered messages can end up in the queue due to
// out-of-order receipt or order of recv buffer processing.

// Sort queued messages by ascending recv count
// Sort queued messages by source and ascending (modulo overflow) recv count
std::sort(q.get(),q.get()+n_in_q);

// Loop thru messages ... since we have sorted only one pass
Expand Down Expand Up @@ -322,6 +322,9 @@ namespace madness {
status.reset(new SafeMPI::Status[maxq_]);
ind.reset(new int[maxq_]);
q.reset(new qmsg[maxq_]);
MADNESS_ASSERT(maxq_ <= 1<<14); // 16 bit task counter is sufficient to ensure that up to 2^14 tasks can be
// pending per rank .. although maxq_ controls the TOTAL queue size, do this
// purely as a reminder

// Allocate receive buffers
if(nproc > 1) {
Expand Down
32 changes: 25 additions & 7 deletions src/madness/world/worldrmi.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ namespace madness {
typedef std::ptrdiff_t rel_fn_ptr_t;

struct qmsg {
typedef uint16_t counterT;
typedef uint32_t attrT;
typedef uint16_t counterT; //!< counter for ordered messages
typedef uint32_t attrT; //!< attributes of the message; high 16 bits are the counter
size_t len;
rmi_handlerT func;
int i; // buffer index
Expand All @@ -117,9 +117,27 @@ namespace madness {
qmsg(size_t len, rmi_handlerT func, int i, int src, attrT attr, counterT count)
: len(len), func(func), i(i), src(src), attr(attr), count(count) {}

bool operator<(const qmsg& other) const {
return count < other.count;
}
// N.B. since msg counters in same batch might wrap around 0, need to sort buckets defined by the 2 highest
// bits ... basically we want 11xxx < 00xxx < 01xxx < 10xxx < 11xxx ... assume we only have messages from
// at most 2 adjacent buckets, thus only when comparing counters from buckets 00 and 11 reverse the order
// P.S. we thus assume we won't have to deal with msg sequences > 2^14 (per rank)
friend inline bool operator<(const qmsg& a, const qmsg& b) {
const auto a_src = a.src;
const auto b_src = b.src;
if (a_src == b_src) {
const auto a_count_bk = a.count >> 14;
const auto b_count_bk = b.count >> 14;
if (a_count_bk == 0b00 && b_count_bk == 0b11) {
return false;
} else if (a_count_bk == 0b11 && b_count_bk == 0b00) {
return true;
} else {
return a.count < b.count;
}
} else {
return a_src < b_src;
}
}

qmsg() {}
}; // struct qmsg
Expand All @@ -145,8 +163,8 @@ namespace madness {

/// This class implements the communications server thread and provides the only send interface
class RMI {
typedef uint16_t counterT;
typedef uint32_t attrT;
typedef qmsg::counterT counterT;
typedef qmsg::attrT attrT;

/// @return reference to the boolean variable indicating whether this thread is the server thread
static bool& is_server_thread_accessor();
Expand Down

0 comments on commit b1f1c39

Please sign in to comment.