diff --git a/CHANGELOG.md b/CHANGELOG.md index 538096980..118b1b14d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,14 @@ and this project aspires to adhere to [Semantic Versioning](https://semver.org/s #### Conduit - Changed the MPI CMake target used by conduit from `MPI:MPI_CXX` to `MPI:MPI_C` to provide better compatibility with downstream tools. +#### Blueprint +- Certain algorithms that use MPI tags had their tag values lowered since some MPI implementations do not support large values. + +#### Relay +- User-supplied warning and error handlers are suspended during `conduit::relay::communicate_using_schema::execute()` so exceptions will be thrown properly when there is an MPI error. The handlers are restored before the execute method returns. +- `conduit::relay::communicate_using_schema::execute()` flushes logs as they are generated, in case of error. This is mostly to facilitate internal debugging. +- Changes were made to how Relay queries the upper limit for MPI tags to work around problems on some systems. + ## [0.9.2] - Released 2024-05-21 ### Added diff --git a/src/libs/blueprint/conduit_blueprint_mpi_mesh_parmetis.cpp b/src/libs/blueprint/conduit_blueprint_mpi_mesh_parmetis.cpp index 36f51958f..73b542619 100644 --- a/src/libs/blueprint/conduit_blueprint_mpi_mesh_parmetis.cpp +++ b/src/libs/blueprint/conduit_blueprint_mpi_mesh_parmetis.cpp @@ -375,7 +375,7 @@ void generate_global_element_and_vertex_ids(conduit::Node &mesh, if (adjset_name != "") { - const int TAG_SHARED_NODE_SYNC = 175000000; + const int TAG_SHARED_NODE_SYNC = 175; // map of groups -> global vtx ids std::map, std::vector> groups_2_vids; // map of rank -> sends to/recvs from that rank of global vtx ids for @@ -464,7 +464,7 @@ void generate_global_element_and_vertex_ids(conduit::Node &mesh, for (const std::set& group : recv_groups) { index_t domid = *(group.begin()); - const int tag = conduit::relay::mpi::safe_tag(TAG_SHARED_NODE_SYNC + domid * 100 + group_idx); + const int tag = conduit::relay::mpi::safe_tag(TAG_SHARED_NODE_SYNC + domid * 100 + group_idx, comm); async_recvs.push_back(MPI_Request{}); group_idx++; std::vector& recvbuf = groups_2_vids[group]; @@ -484,7 +484,7 @@ void generate_global_element_and_vertex_ids(conduit::Node &mesh, for (const std::set& group : send_groups) { index_t domid = *(group.begin()); - const int tag = conduit::relay::mpi::safe_tag(TAG_SHARED_NODE_SYNC + domid * 100 + group_idx); + const int tag = conduit::relay::mpi::safe_tag(TAG_SHARED_NODE_SYNC + domid * 100 + group_idx, comm); async_sends.push_back(MPI_Request{}); group_idx++; std::vector& sendbuf = groups_2_vids[group]; diff --git a/src/libs/blueprint/conduit_blueprint_mpi_mesh_utils.cpp b/src/libs/blueprint/conduit_blueprint_mpi_mesh_utils.cpp index 0df47c58a..1f9f7c6e1 100644 --- a/src/libs/blueprint/conduit_blueprint_mpi_mesh_utils.cpp +++ b/src/libs/blueprint/conduit_blueprint_mpi_mesh_utils.cpp @@ -173,8 +173,8 @@ PointQuery::execute(const std::string &coordsetName) // the results. The results get stored in m_domResults. std::map, conduit::Node *> input_sends, result_sends, input_recvs, result_recvs; - int inputs_tag = 55000000; - int results_tag = 66000000; + const int inputs_tag = 550; + const int results_tag = 660; for(int pass = 0; pass < 2; pass++) { conduit::relay::mpi::communicate_using_schema C(m_comm); @@ -187,10 +187,10 @@ PointQuery::execute(const std::string &coordsetName) #endif for(size_t i = 0; i < allqueries.size(); i += 3) { - int asker = allqueries[i]; - int domain = allqueries[i+1]; - int npts = allqueries[i+2]; - int owner = domain_to_rank[domain]; + const int asker = allqueries[i]; + const int domain = allqueries[i+1]; + const int npts = allqueries[i+2]; + const int owner = domain_to_rank[domain]; if(asker == rank) { @@ -278,6 +278,7 @@ PointQuery::execute(const std::string &coordsetName) for(auto it = result_recvs.begin(); it != result_recvs.end(); it++) { int domain = it->first.second; + const conduit::Node &r = it->second->fetch_existing("results"); auto acc = r.as_int_accessor(); std::vector &result = m_domResults[domain]; @@ -435,12 +436,12 @@ MatchQuery::execute() C.set_logging_root("mpi_matchquery"); C.set_logging(true); #endif - int query_tag = 77000000; + const int query_tag = 770; for(size_t i = 0; i < allqueries.size(); i += ntuple_values) { - int owner = allqueries[i]; - int domain = allqueries[i + 1]; - int query_domain = allqueries[i + 2]; + const int owner = allqueries[i]; + const int domain = allqueries[i + 1]; + const int query_domain = allqueries[i + 2]; auto oppositeKey = std::make_pair(query_domain, domain); @@ -667,7 +668,7 @@ compare_pointwise_impl(conduit::Node &mesh, const std::string &adjsetName, // Iterate over each of the possible adjset relationships. Not all of these // will have adjset groups. - const int tag = 12211221; + const int tag = 122; for(int d0 = 0; d0 < maxDomains; d0++) { for(int d1 = d0 + 1; d1 < maxDomains; d1++) diff --git a/src/libs/relay/conduit_relay_mpi.cpp b/src/libs/relay/conduit_relay_mpi.cpp index 0c3d1c159..330381a51 100644 --- a/src/libs/relay/conduit_relay_mpi.cpp +++ b/src/libs/relay/conduit_relay_mpi.cpp @@ -9,6 +9,7 @@ //----------------------------------------------------------------------------- #include "conduit_relay_mpi.hpp" +#include #include #include @@ -257,37 +258,6 @@ mpi_dtype_to_conduit_dtype_id(MPI_Datatype dt) return res; } -//---------------------------------------------------------------------------// -/** - @brief Checks whether the input tag is invalid. - - @param tag The MPI tag to check for validity. - @return True if the tag is wildly incorrect (negative). - - @note Right now, this function only flags tags that are negative. Tags are - supposed to be less than or equal to MPI_TAG_UB too. This function does - not check for that - it's done by using safe_tag when tags are used. - */ -bool invalid_tag(int tag) -{ - return tag < 0; -} - -//---------------------------------------------------------------------------// -/** - @brief MPI tags can be in the range [0,MPI_TAG_UB]. The values are - implementation-dependent. If the tag is not in that range, return - MPI_TAG_UB so it is safe to use with MPI functions. - - @param tag The input tag. - - @return A tag value that is safe to use with MPI. - */ -int safe_tag(int tag) -{ - return (tag < MPI_TAG_UB) ? ((tag >= 0) ? tag : MPI_TAG_UB) : MPI_TAG_UB; -} - //---------------------------------------------------------------------------// /** @brief Some MPI installations install an error handler that causes functions @@ -343,8 +313,8 @@ class HandleMPICommError } /// MPI calls this function to handle errors. - static void handler(MPI_Comm *comm, - int *errcode, + static void handler(MPI_Comm * /*comm*/, + int * /*errcode*/, ...) { #if 0 @@ -357,7 +327,7 @@ class HandleMPICommError va_end(argp); #endif - std::cout << "handler: comm=" << *comm << ", errcode=" << *errcode << std::endl; + //std::cout << "handler: comm=" << *comm << ", errcode=" << *errcode << std::endl; #if 0 // We could try emitting a Conduit error. @@ -382,6 +352,179 @@ class HandleMPICommError MPI_Errhandler m_newHandler; }; +//---------------------------------------------------------------------------// +/** + * @brief This class helps to determine MPI tag upper limits. + */ +class TagLimits +{ +public: + /** + * @brief Return the tag upper limit. + * + * @param comm The MPI communicator. + * + * @return The tag upper limit. + * + * @note We probe to determine the value since query is not as reliable across MPI distributions. + */ + static int upper_bound(MPI_Comm comm) + { + return probe(comm); + } + +private: + /** + * @brief Query MPI for the maximum tag value. + * + * @param comm The MPI communicator. + * + * @return The tag upper bound or -1 on error. + * + * @note This is how we are supposed to be able to ask for the max tag value. + * However, this method does not seem reliable across MPIs and it is + * possible for the query to return values that still do not work in + * Isend/Irecv sometimes. + */ + static int query(MPI_Comm comm) + { + bool ok = false; + int tag_ub = 0, flag = 0; + int mpi_error = MPI_Comm_get_attr(comm, MPI_TAG_UB, &tag_ub, &flag); + if(mpi_error == MPI_SUCCESS && flag != 0) + { + if(tag_ub > 0) + { + ok = true; + } + } + return ok ? tag_ub : -1; + } + + /** + * @brief Probe MPI to determine the max tag value. + * + * @param comm The MPI communicator. + * + * @note MPI error handlers are installed that ignore problems, preventing the + * program from dying if the default handler is set to abort on error. The + * error handler is restored when leaving this function. + */ + static int probe(MPI_Comm comm) + { + // Temporarily override MPI error handler with a more benign one. + HandleMPICommError err(comm); + int tag = probeTagUpperBound(0, std::numeric_limits::max(), comm); + return tag; + } + + /** + * @brief Probe a range of tag values to determine if the range is valid. + * + * @param low The low tag value + * @param high The high tag value + * @param comm The MPI communicator. + * + * @return The max tag value. + * + * @note The rank sends a message to itself using a tag value. The result of that + * is used to narrow the range of tag values. + */ + static int probeTagUpperBound(int low, int high, MPI_Comm comm) + { + int tag; + if((high - low) < 2) + tag = low; + else + { + int rank; + MPI_Comm_rank(comm, &rank); + + tag = (low + high) / 2; + + // Try sending with the current tag. + int srcBuff = 0; + MPI_Request requests[2]; + int mpi_error = MPI_Isend(&srcBuff, + 1, + MPI_INT, + rank, + tag, + comm, + &requests[0]); + if(mpi_error == MPI_SUCCESS) + { + // It worked. + // Issue the recv. + int destBuff = 0; + MPI_Irecv(&destBuff, + 1, + MPI_INT, + rank, + tag, + comm, + &requests[1]); + + MPI_Status statuses[2]; + MPI_Waitall(2, requests, statuses); + + tag = probeTagUpperBound(tag, high, comm); + } + else + { + tag = probeTagUpperBound(low, tag, comm); + } + } + return tag; + } +}; + +//---------------------------------------------------------------------------// +/** + @brief Checks whether the input tag is invalid. + + @param tag The MPI tag to check for validity. + @return True if the tag is wildly incorrect (negative). + + @note Right now, this function only flags tags that are negative. Tags are + supposed to be less than or equal to MPI_TAG_UB too. This function does + not check for that - it's done by using safe_tag when tags are used. + */ +bool invalid_tag(int tag) +{ + return tag < 0; +} + +//---------------------------------------------------------------------------// +/** + @brief Return a tag value that is safe to use with MPI functions. The tag is + determined dynamically the first time the function is called. If the + input tag is greater than the max tag then the value is clamped. + + @param tag The input tag. + @param comm The MPI communicator. + + @return A tag value that is safe to use with MPI. + */ +int safe_tag(int tag, MPI_Comm comm) +{ + static constexpr int UPPER_BOUND_NOT_SET = -1; + static int tag_upper_bound = UPPER_BOUND_NOT_SET; + if(tag_upper_bound == UPPER_BOUND_NOT_SET) + { + // The first time through, determine the upper bound. + tag_upper_bound = TagLimits::upper_bound(comm); + } + + int newtag = std::max(0, tag); + if(newtag > tag_upper_bound) + { + newtag = tag_upper_bound; + } + + return newtag; +} + //---------------------------------------------------------------------------// int send_using_schema(const Node &node, int dest, int tag, MPI_Comm comm) @@ -988,11 +1131,12 @@ isend(const Node &node, "(" << std::numeric_limits::max() << ")") } + const int newtag = safe_tag(tag, mpi_comm); int mpi_error = MPI_Isend(const_cast(data_ptr), static_cast(data_size), MPI_BYTE, dest, - safe_tag(tag), + newtag, mpi_comm, &(request->m_request)); @@ -1041,11 +1185,12 @@ irecv(Node &node, "(" << std::numeric_limits::max() << ")") } + const int newtag = safe_tag(tag, mpi_comm); int mpi_error = MPI_Irecv(data_ptr, static_cast(data_size), MPI_BYTE, src, - safe_tag(tag), + newtag, mpi_comm, &(request->m_request)); @@ -1886,14 +2031,47 @@ communicate_using_schema::add_irecv(Node &node, int src, int tag) //----------------------------------------------------------------------------- int communicate_using_schema::execute() +{ + // Uncomment this to install an MPI_Comm error handler. + // HandleMPICommError errHandler(comm); + + // Get the currently installed warning and error handlers. + conduit::utils::conduit_warning_handler onWarning = conduit::utils::warning_handler(); + conduit::utils::conduit_error_handler onError = conduit::utils::error_handler(); + + // Install the default exception-throwing handlers. + conduit::utils::set_warning_handler(conduit::utils::default_warning_handler); + conduit::utils::set_error_handler(conduit::utils::default_error_handler); + + int retval = 0; + try + { + retval = execute_internal(); + + // Restore warning/error handlers. + conduit::utils::set_warning_handler(onWarning); + conduit::utils::set_error_handler(onError); + } + catch(conduit::Error &e) + { + // Restore warning/error handlers. + conduit::utils::set_warning_handler(onWarning); + conduit::utils::set_error_handler(onError); + + // Rethrow the exception. + throw e; + } + return retval; +} + +//----------------------------------------------------------------------------- +int +communicate_using_schema::execute_internal() { int mpi_error = 0; std::vector requests(operations.size()); std::vector statuses(operations.size()); - // Uncomment this to install an MPI_Comm error handler. - // HandleMPICommError errHandler(comm); - int rank, size; MPI_Comm_rank(comm, &rank); MPI_Comm_size(comm, &size); @@ -1946,6 +2124,7 @@ communicate_using_schema::execute() // Send the serialized node data. index_t msg_data_size = operations[i].node[1]->total_bytes_compact(); + const int newtag = safe_tag(operations[i].tag, comm); if(logging) { log << " MPI_Isend(" @@ -1953,7 +2132,7 @@ communicate_using_schema::execute() << msg_data_size << ", " << "MPI_BYTE, " << operations[i].rank << ", " - << safe_tag(operations[i].tag) << ", " + << newtag << ", " << "comm, &requests[" << i << "]);" << std::endl; } @@ -1968,7 +2147,7 @@ communicate_using_schema::execute() static_cast(msg_data_size), MPI_BYTE, operations[i].rank, - safe_tag(operations[i].tag), + newtag, comm, &requests[i]); CONDUIT_CHECK_MPI_ERROR(mpi_error); @@ -1978,6 +2157,7 @@ communicate_using_schema::execute() if(logging) { log << "* Time issuing MPI_Isend calls: " << (t1-t0) << std::endl; + log.flush(); } // Issue all the recvs. @@ -1986,14 +2166,15 @@ communicate_using_schema::execute() if(operations[i].op == OP_RECV) { // Probe the message for its buffer size. + const int newtag = safe_tag(operations[i].tag, comm); if(logging) { log << " MPI_Probe(" << operations[i].rank << ", " - << safe_tag(operations[i].tag) << ", " + << newtag << ", " << "comm, &statuses[" << i << "]);" << std::endl; } - mpi_error = MPI_Probe(operations[i].rank, safe_tag(operations[i].tag), comm, &statuses[i]); + mpi_error = MPI_Probe(operations[i].rank, newtag, comm, &statuses[i]); CONDUIT_CHECK_MPI_ERROR(mpi_error); int buffer_size = 0; @@ -2015,7 +2196,7 @@ communicate_using_schema::execute() << buffer_size << ", " << "MPI_BYTE, " << operations[i].rank << ", " - << safe_tag(operations[i].tag) << ", " + << newtag << ", " << "comm, &requests[" << i << "]);" << std::endl; } @@ -2024,7 +2205,7 @@ communicate_using_schema::execute() buffer_size, MPI_BYTE, operations[i].rank, - safe_tag(operations[i].tag), + newtag, comm, &requests[i]); CONDUIT_CHECK_MPI_ERROR(mpi_error); @@ -2034,6 +2215,7 @@ communicate_using_schema::execute() if(logging) { log << "* Time issuing MPI_Irecv calls: " << (t2-t1) << std::endl; + log.flush(); } // Wait for the requests to complete. @@ -2041,6 +2223,7 @@ communicate_using_schema::execute() if(logging) { log << " MPI_Waitall(" << n << ", &requests[0], &statuses[0]);" << std::endl; + log.flush(); } mpi_error = MPI_Waitall(n, &requests[0], &statuses[0]); CONDUIT_CHECK_MPI_ERROR(mpi_error); @@ -2048,6 +2231,7 @@ communicate_using_schema::execute() if(logging) { log << "* Time in MPI_Waitall: " << (t3-t2) << std::endl; + log.flush(); } // Finish building the nodes for which we received data. @@ -2082,6 +2266,7 @@ communicate_using_schema::execute() if(logging) { log << "* Built output node " << i << std::endl; + log.flush(); } } } diff --git a/src/libs/relay/conduit_relay_mpi.hpp b/src/libs/relay/conduit_relay_mpi.hpp index 284189d31..b1c676ec4 100644 --- a/src/libs/relay/conduit_relay_mpi.hpp +++ b/src/libs/relay/conduit_relay_mpi.hpp @@ -83,10 +83,11 @@ namespace mpi MPI_TAG_UB so it is safe to use with MPI functions. @param tag The input tag. + @param comm The MPI communicator. @return A tag value that is safe to use with MPI. */ - int CONDUIT_RELAY_API safe_tag(int tag); + int CONDUIT_RELAY_API safe_tag(int tag, MPI_Comm comm); int CONDUIT_RELAY_API send(const Node &node, int dest, @@ -337,6 +338,13 @@ class CONDUIT_RELAY_API communicate_using_schema private: void clear(); + /** + @brief Execute all the outstanding isend/irecv calls and reconstruct any + nodes after doing the data movement. + @return The return value from MPI_Waitall. + */ + int execute_internal(); + static const int OP_SEND; static const int OP_RECV; struct operation