Skip to content

Commit

Permalink
MTMG multi node (#3932)
Browse files Browse the repository at this point in the history
This PR extends the MTMG framework to support a multi-node configuration.

Multi-node configuration of MTMG assumes some externally available mechanism for communicating configuration parameters and the NCCL unique id between the processes that will participate.  For testing purposes we assume a shared file system where we can create a directory and read/write configuration information rather than constructing a more complex communication mechanism (NFS is sufficient for testing).

Closes rapidsai/graph_dl#329

Authors:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Ralph Liu (https://github.com/nv-rliu)

Approvers:
  - Joseph Nke (https://github.com/jnke2016)
  - Seunghwa Kang (https://github.com/seunghwak)

URL: #3932
  • Loading branch information
ChuckHastings authored Nov 6, 2023
1 parent 2bdb735 commit 586451d
Show file tree
Hide file tree
Showing 5 changed files with 626 additions and 59 deletions.
8 changes: 0 additions & 8 deletions cpp/include/cugraph/mtmg/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ class handle_t {
*/
int get_size() const { return raft_handle_.get_comms().get_size(); }

/**
* @brief Get number of local gpus
*
* @return number of local gpus
*/
// FIXME: wrong for multi-node
int get_local_size() const { return raft_handle_.get_comms().get_size(); }

/**
* @brief Get gpu rank
*
Expand Down
27 changes: 20 additions & 7 deletions cpp/include/cugraph/mtmg/instance_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include <cugraph/mtmg/handle.hpp>

#include <nccl.h>
#include <raft/comms/std_comms.hpp>

#include <vector>

Expand All @@ -37,16 +37,27 @@ class instance_manager_t {
*/
instance_manager_t(std::vector<std::unique_ptr<raft::handle_t>>&& handles,
std::vector<std::unique_ptr<ncclComm_t>>&& nccl_comms,
std::vector<rmm::cuda_device_id>&& device_ids,
int local_gpu_count)
std::vector<rmm::cuda_device_id>&& device_ids)
: thread_counter_{0},
raft_handle_{std::move(handles)},
nccl_comms_{std::move(nccl_comms)},
device_ids_{std::move(device_ids)},
local_gpu_count_{local_gpu_count}
device_ids_{std::move(device_ids)}
{
}

~instance_manager_t()
{
int current_device{};
RAFT_CUDA_TRY(cudaGetDevice(&current_device));

for (size_t i = 0; i < nccl_comms_.size(); ++i) {
RAFT_CUDA_TRY(cudaSetDevice(device_ids_[i].value()));
RAFT_NCCL_TRY(ncclCommDestroy(*nccl_comms_[i]));
}

RAFT_CUDA_TRY(cudaSetDevice(current_device));
}

/**
* @brief Get handle
*
Expand Down Expand Up @@ -79,17 +90,19 @@ class instance_manager_t {
/**
* @brief Number of local GPUs in the instance
*/
int get_local_gpu_count() { return local_gpu_count_; }
int get_local_gpu_count() { return static_cast<int>(raft_handle_.size()); }

private:
// FIXME: Should this be an std::map<> where the key is the rank?
// On a multi-node system we might have nodes with fewer
// (or no) GPUs, so mapping rank to a handle might be a challenge
//
std::vector<std::unique_ptr<raft::handle_t>> raft_handle_{};

// FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t
// to be managed by RAFT instead of cugraph::mtmg
std::vector<std::unique_ptr<ncclComm_t>> nccl_comms_{};
std::vector<rmm::cuda_device_id> device_ids_{};
int local_gpu_count_{};

std::atomic<int> thread_counter_{0};
};
Expand Down
124 changes: 80 additions & 44 deletions cpp/include/cugraph/mtmg/resource_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@ namespace mtmg {
* register_local_gpu (or register_remote_gpu once we support a multi-node
* configuration) to allocate resources that can be used in the mtmg space.
*
* Each GPU in the cluster should be given a unique global rank, an integer
* that will be used to reference the GPU within the resource manager. It
* is recommended that the GPUs be numbered sequentially from 0, although this
* is not required.
*
* When we want to execute some graph computations, we need to create an instance for execution.
* Based on how big a subset of the desired compute resources is desired, we can allocate some
* number of GPUs to the problem (up to the total set of managed resources).
*
* The returned instance can be used to create a graph, execute one or more algorithms, etc. Once
* we are done the caller can delete the instance.
*
* At the moment, the caller is assumed to be responsible for scheduling use of the resources.
* The caller is assumed to be responsible for scheduling use of the resources.
*
* For our first release, we will only consider a single node multi-GPU configuration, so the remote
* GPU methods are currently disabled via ifdef.
Expand All @@ -63,49 +68,72 @@ class resource_manager_t {
/**
* @brief add a local GPU to the resource manager.
*
* @param rank The rank to assign to the local GPU
* @param device_id The device_id corresponding to this rank
* @param global_rank The global rank to assign to the local GPU
* @param local_device_id The local device_id corresponding to this rank
*/
void register_local_gpu(int rank, rmm::cuda_device_id device_id)
void register_local_gpu(int global_rank, rmm::cuda_device_id local_device_id)
{
std::lock_guard<std::mutex> lock(lock_);

CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(),
"cannot register same rank multiple times");
CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(),
"cannot register same global_rank as local and remote");
CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(),
"cannot register same global_rank multiple times");

int num_gpus_this_node;
RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_this_node));

CUGRAPH_EXPECTS((device_id.value() >= 0) && (device_id.value() < num_gpus_this_node),
"device id out of range");
CUGRAPH_EXPECTS(
(local_device_id.value() >= 0) && (local_device_id.value() < num_gpus_this_node),
"local device id out of range");

local_rank_map_.insert(std::pair(rank, device_id));
local_rank_map_.insert(std::pair(global_rank, local_device_id));

RAFT_CUDA_TRY(cudaSetDevice(device_id.value()));
RAFT_CUDA_TRY(cudaSetDevice(local_device_id.value()));

// FIXME: There is a bug in the cuda_memory_resource that results in a Hang.
// using the pool resource as a work-around.
//
// There is a deprecated environment variable: NCCL_LAUNCH_MODE=GROUP
// which should temporarily work around this problem.
//
// Further NOTE: multi-node requires the NCCL_LAUNCH_MODE=GROUP feature
// to be enabled even with the pool memory resource.
//
// Ultimately there should be some RMM parameters passed into this function
// (or the constructor of the object) to configure this behavior
#if 0
auto per_device_it = per_device_rmm_resources_.insert(
std::pair{rank, std::make_shared<rmm::mr::cuda_memory_resource>()});
std::pair{global_rank, std::make_shared<rmm::mr::cuda_memory_resource>()});
#else
auto const [free, total] = rmm::detail::available_device_memory();
auto const min_alloc =
rmm::detail::align_down(std::min(free, total / 6), rmm::detail::CUDA_ALLOCATION_ALIGNMENT);

auto per_device_it = per_device_rmm_resources_.insert(
std::pair{rank,
std::pair{global_rank,
rmm::mr::make_owning_wrapper<rmm::mr::pool_memory_resource>(
std::make_shared<rmm::mr::cuda_memory_resource>(), min_alloc)});
#endif

rmm::mr::set_per_device_resource(device_id, per_device_it.first->second.get());
rmm::mr::set_per_device_resource(local_device_id, per_device_it.first->second.get());
}

/**
* @brief add a remote GPU to the resource manager.
*
* @param global_rank The global rank to assign to the remote GPU
*/
void register_remote_gpu(int global_rank)
{
std::lock_guard<std::mutex> lock(lock_);

CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(),
"cannot register same global_rank as local and remote");
CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(),
"cannot register same global_rank multiple times");

remote_rank_set_.insert(global_rank);
}

/**
Expand All @@ -130,29 +158,36 @@ class resource_manager_t {
ncclUniqueId instance_manager_id,
size_t n_streams = 16) const
{
std::for_each(
ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) {
CUGRAPH_EXPECTS(local_ranks.find(rank) != local_ranks.end(),
"requesting inclusion of an invalid rank");
});
std::vector<int> local_ranks_to_include;

std::copy_if(ranks_to_include.begin(),
ranks_to_include.end(),
std::back_inserter(local_ranks_to_include),
[&local_ranks = local_rank_map_](int rank) {
return (local_ranks.find(rank) != local_ranks.end());
});

// FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t
// to be managed by RAFT instead of cugraph::mtmg
std::vector<std::unique_ptr<ncclComm_t>> nccl_comms{};
std::vector<std::unique_ptr<raft::handle_t>> handles{};
std::vector<rmm::cuda_device_id> device_ids{};

nccl_comms.reserve(ranks_to_include.size());
handles.reserve(ranks_to_include.size());
device_ids.reserve(ranks_to_include.size());
nccl_comms.reserve(local_ranks_to_include.size());
handles.reserve(local_ranks_to_include.size());
device_ids.reserve(local_ranks_to_include.size());

// FIXME: not quite right for multi-node
auto gpu_row_comm_size = static_cast<int>(sqrt(static_cast<double>(ranks_to_include.size())));
while (ranks_to_include.size() % gpu_row_comm_size != 0) {
--gpu_row_comm_size;
}

// FIXME: not quite right for multi-node
for (size_t i = 0; i < ranks_to_include.size(); ++i) {
int rank = ranks_to_include[i];
int current_device{};
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
RAFT_NCCL_TRY(ncclGroupStart());

for (size_t i = 0; i < local_ranks_to_include.size(); ++i) {
int rank = local_ranks_to_include[i];
auto pos = local_rank_map_.find(rank);
RAFT_CUDA_TRY(cudaSetDevice(pos->second.value()));

Expand All @@ -162,36 +197,37 @@ class resource_manager_t {
std::make_shared<rmm::cuda_stream_pool>(n_streams),
per_device_rmm_resources_.find(rank)->second));
device_ids.push_back(pos->second);

RAFT_NCCL_TRY(
ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank));
raft::comms::build_comms_nccl_only(
handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank);
}
RAFT_NCCL_TRY(ncclGroupEnd());
RAFT_CUDA_TRY(cudaSetDevice(current_device));

std::vector<std::thread> running_threads;

for (size_t i = 0; i < ranks_to_include.size(); ++i) {
for (size_t i = 0; i < local_ranks_to_include.size(); ++i) {
running_threads.emplace_back([instance_manager_id,
idx = i,
gpu_row_comm_size,
comm_size = ranks_to_include.size(),
&ranks_to_include,
&local_rank_map = local_rank_map_,
&local_ranks_to_include,
&device_ids,
&nccl_comms,
&handles]() {
int rank = ranks_to_include[idx];
auto pos = local_rank_map.find(rank);
RAFT_CUDA_TRY(cudaSetDevice(pos->second.value()));

NCCL_TRY(ncclCommInitRank(nccl_comms[idx].get(), comm_size, instance_manager_id, rank));

raft::comms::build_comms_nccl_only(handles[idx].get(), *nccl_comms[idx], comm_size, rank);
int rank = local_ranks_to_include[idx];
RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value()));

cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size);
});
}

std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); });

// FIXME: Update for multi-node
return std::make_unique<instance_manager_t>(
std::move(handles), std::move(nccl_comms), std::move(device_ids), ranks_to_include.size());
std::move(handles), std::move(nccl_comms), std::move(device_ids));
}

/**
Expand All @@ -203,24 +239,24 @@ class resource_manager_t {
{
std::lock_guard<std::mutex> lock(lock_);

//
// C++20 mechanism:
// return std::vector<int>{ std::views::keys(local_rank_map_).begin(),
// std::views::keys(local_rank_map_).end() };
// Would need a bit more complicated to handle remote_rank_map_ also
//
std::vector<int> registered_ranks(local_rank_map_.size());
std::vector<int> registered_ranks(local_rank_map_.size() + remote_rank_set_.size());
std::transform(
local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) {
return pair.first;
});

std::copy(remote_rank_set_.begin(),
remote_rank_set_.end(),
registered_ranks.begin() + local_rank_map_.size());

std::sort(registered_ranks.begin(), registered_ranks.end());
return registered_ranks;
}

private:
mutable std::mutex lock_{};
std::map<int, rmm::cuda_device_id> local_rank_map_{};
std::set<int> remote_rank_set_{};
std::map<int, std::shared_ptr<rmm::mr::device_memory_resource>> per_device_rmm_resources_{};
};

Expand Down
10 changes: 10 additions & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,16 @@ if(BUILD_CUGRAPH_MG_TESTS)
ConfigureCTestMG(MG_CAPI_TWO_HOP_NEIGHBORS_TEST c_api/mg_two_hop_neighbors_test.c)

rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing_mg DESTINATION bin/gtests/libcugraph_mg)

###############################################################################################
# - Multi-node MTMG tests ---------------------------------------------------------------------
ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu utilities/mg_utilities.cpp)
target_link_libraries(MTMG_MULTINODE_TEST
PRIVATE
cugraphmgtestutil
UCP::UCP
)

endif()

###################################################################################################
Expand Down
Loading

0 comments on commit 586451d

Please sign in to comment.