-
Notifications
You must be signed in to change notification settings - Fork 0
翻訳記事: 例
NCCLのExamplesに登場する順に関数を追ってゆく。
対象バージョンはNCCL2.4.8
※ついでにExamplesの翻訳もしてゆく。 なお、公式の内容がびみょいので適宜修正・意訳を行う。
このセクションの例では、一つまたは複数の手法を組み合わせて、様々な環境でどのようにNCCLが利用するか全体像を提供していきます。
- 一つのスレッド/プロセスあたり複数のGPUを使う場合
- 複数のスレッドを使う場合
- 複数のプロセスを使う場合 (複数プロセスの例では、並列化環境としてMPIを用いていますが、どの複数プロセスシステムでも同様に動くはずです。)
なお、実際のコードでは、常にNCCLの関数の戻り値のチェックを行ってください。簡潔のため、今後の例のコードではエラーチェックを行っていません。
この例では、NCCLの初期化の一般的に使い方を示します。
特定のシングルプロセスの例では、ncclCommInitAll関数が利用できます。 ここでは、4つのデバイス向けのコミュニケータを作成する例を示します。 そのため、4つのコミュニケータオブジェクトが存在します。
ncclComm_t comms[4];
int devs[4] = { 0, 1, 2, 3 };
ncclCommInitAll(comms, 4, devs);
訳者注: 詳細はncclComm_t及びncclCommInitAllを参照
次に、それぞれcommmオブジェクトが提供されている、シングルスレッド・グループコール?・複数スレッドからのNCCLの集合操作の呼び出しができます。
そして、プログラムの最後に、すべてのコミュニケータを破棄します。
for (int i=0; i<4; i++)
ncclCommDestroy(comms[i]);
下記のコードは、シングルプロセスで複数デバイスを管理する例で、そのまま動作するものです。
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[])
{
ncclComm_t comms[4];
//managing 4 devices
int nDev = 4;
int size = 32*1024*1024;
int devs[4] = { 0, 1, 2, 3 };
//allocating and initializing device buffers
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
//initializing NCCL
NCCLCHECK(ncclCommInitAll(comms, nDev, devs));
//calling NCCL communication API. Group API is required when using
//multiple devices per thread
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nDev; ++i)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
//synchronizing on CUDA streams to wait for completion of NCCL operation
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaStreamSynchronize(s[i]));
}
//free device buffers
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for(int i = 0; i < nDev; ++i)
ncclCommDestroy(comms[i]);
printf("Success \n");
return 0;
}
一つのスレッドまたはプロセスが協調動作をする場合、ncclCommInitRank関数をコミュニケータを作成する集合操作として利用できます。各スレッドまたはプロセスはそれぞれのオブジェクトを受け取ります。
次のコードはMPIのコンテキストで、各MPIランクごとに一つのデバイスを用いた、コミュニケータを作成する例です。
まず、各プロセスでMPIの情報を取得します。
int myRank, nRanks;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
次に各ランクでユニークIDを作成し、それらを他のすべてのランクに送信し、全員がIDを持っていることを確認します。
ncclUniqueId id;
if (myRank == 0) ncclGetUniqueId(&id);
MPI_Bcast(id, sizeof(id), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
最後に、コミュニケータを作成します。
ncclComm_t comm;
ncclCommInitRank(&comm, nRanks, id, myRank);
この後、NCCLの集合操作を呼ぶことができます。
ncclAllReduce( ... , comm);
最後に、コミュニケータを破棄します。
ncclCommDestroy(comm);
下記のコードは、複数MPIプロセスで各デバイスにで1つデバイスが存在する例で、そのまま動作するものです。
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char* string) {
// Based on DJB2, result = result * 33 + char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++){
result = ((result << 5) + result) + string[c];
}
return result;
}
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank based on hostname which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
ncclUniqueId id;
ncclComm_t comm;
float *sendbuff, *recvbuff;
cudaStream_t s;
//get NCCL unique ID at rank 0 and broadcast it to all others
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//picking a GPU based on localRank, allocate device buffers
CUDACHECK(cudaSetDevice(localRank));
CUDACHECK(cudaMalloc(&sendbuff, size * sizeof(float)));
CUDACHECK(cudaMalloc(&recvbuff, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(&s));
//initializing NCCL
NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));
//communicating using NCCL
NCCLCHECK(ncclAllReduce((const void*)sendbuff, (void*)recvbuff, size, ncclFloat, ncclSum,
comm, s));
//completing NCCL operation by synchronizing on the CUDA stream
CUDACHECK(cudaStreamSynchronize(s));
//free device buffers
CUDACHECK(cudaFree(sendbuff));
CUDACHECK(cudaFree(recvbuff));
//finalizing NCCL
ncclCommDestroy(comm);
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
複数プロセス/スレッドと一プロセス/スレッドあたり複数のデバイスを組み合わせることも可能です。 このケースではグループセマンティクスを扱う必要があります。
次はMPI上で各ランク当たり複数デバイスを用いた例です。
まず、MPIプロセス情報を取得します。
int myRank, nRanks;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
次に各ランクでユニークIDを作成し、それらを他のすべてのランクに送信し、全員がIDを持っていることを確認します。
ncclUniqueId id;
if (myRank == 0) ncclGetUniqueId(&id);
MPI_Bcast(id, sizeof(id), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
その後、npgusコミュニケータオブジェクトを作成します。 npgusコミュニケータは、(mpiのノード数)*(1ノードのgpu数)の大きなグループの一部です。
ncclComm_t comms[ngpus];
ncclGroupStart();
for (int i=0; i<ngpus; i++) {
cudaSetDevice(devs[i]);
ncclCommInitRank(comms+i, ngpus*nRanks, id, myRank*ngpus+i);
}
ncclGroupEnd();
この後、それぞれcommmオブジェクトが提供されている、シングルスレッド・グループコール?・複数スレッドからのNCCLの集合操作の呼び出しができます。
最後に、すべてのコミュニケータオブジェクトを破棄します。
for (int i=0; i<ngpus; i++)
ncclCommDestroy(comms[i]);
下記のコードは、複数MPIプロセスで各デバイスにで複数デバイスが存在する例で、そのまま動作するものです。
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char* string) {
// Based on DJB2, result = result * 33 + char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++){
result = ((result << 5) + result) + string[c];
}
return result;
}
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
//each process is using two GPUs
int nDev = 2;
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
//picking GPUs based on localRank
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
ncclUniqueId id;
ncclComm_t comms[nDev];
//generating NCCL unique ID at one process and broadcasting it to all
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//initializing NCCL, group API is required around ncclCommInitRank as it is
//called across multiple GPUs in each thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));
}
NCCLCHECK(ncclGroupEnd());
//calling NCCL communication API. Group API is required when using
//multiple devices per thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
//synchronizing on CUDA stream to complete NCCL communication
for (int i=0; i<nDev; i++)
CUDACHECK(cudaStreamSynchronize(s[i]));
//freeing device memory
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for (int i=0; i<nDev; i++) {
ncclCommDestroy(comms[i]);
}
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
次に示す例は、NCCLの集団通信の実行の一般的なパターンです。
もしも一つのプロセス/スレッドあたり、一つのデバイスが存在する場合、各スレッドは集団操作を自身のデバイスに対して呼ひます。 例としてAllReduceの場合を次に示します。
ncclAllReduce(sendbuff, recvbuff, count, datatype, op, comm, stream);
呼び出し後には、オペレーションはStreamにキューされています。そして、cudaStreamSynchronizeを介して、操作の完了を待つことができます。
cudaStreamSynchronize(stream);
実際に動作する例は、ソースコード2をご確認ください。
一つのスレッドに複数のデバイスが存在する場合は、複数のデバイスに対し、一回でオペレーションを起動するには、グループセマンティクスを利用する必要があります。
ncclGroupStart();
for (int i=0; i<ngpus; i++)
ncclAllReduce(sendbuffs[i], recvbuff[i], count, datatype, op, comms[i], streams[i]);
ncclGroupEnd();
ncclGroupEnd関数の完了後、すべてのオペレーションはStreamにキューされています。 そして、cudaStreamSynchronizeを介して、操作の完了を待つことができます。
for (int i=0; i<ngpus; i++)
cudaStreamSynchronize(streams[i]);
実際に動作する例は、ソースコード3をご確認ください。