-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[y1r] 読み進めメモ #4
Comments
FOR_SEND, FOR_RECV マクロ// Unroll unconditionally the first send/recv since nsend/nrecv should be at
// least 1 if SEND/RECV is set.
#define FOR_SEND(func, ...) do { \
if (SEND) { \
/* Send to far first, then close */ \
for (int i=1; i<NSEND && i<nsend; i++) func(i, ##__VA_ARGS__); \
func(0, ##__VA_ARGS__); \
} \
} while (0)
#define FOR_RECV(func, ...) do { \
if (RECV) { \
/* Recv from close first, then far */ \
func(0, ##__VA_ARGS__); \
for (int i=1; i<NRECV && i<nrecv; i++) func(i, ##__VA_ARGS__); \
} \
} while (0) 先の調査から,NSEND, NRECVは送信先・受信先のプロセス数に対応することが分かっている.リングアルゴリズムを仮定すると,この2つのforは実行されず, |
GenericOpを読みたいが,テンプレートがどのように展開されるか分からないと非常に読みづらいので,
|
prims.send__device__ __forceinline__ void
send(const T* src, int nelem) {
GenericOp<0, 0, 0, 1, 1, 0>(src, NULL, nelem, 0);
} テンプレート引数: |
定数
ncclAllReduceRingKernelで使われる変数
ncclPrimitives.GenericOpで使われる変数
chunk > slice > step の大小関係? |
prims.sendつづきKernelから渡されるnelemsは,大体chunkぐらいの大きさのバッファ.それをsliceに分けて通信しているよう.
|
directSend/RecvPtr関数template <int DIRECTRECV>
inline __device__ const T* directRecvPtr(int i, int directOffset) {
return DIRECTRECV && recvDirectBuff[i] ? recvDirectBuff[i]+directOffset : recvPtr(i);
}
template <int DIRECTSEND>
inline __device__ T* directSendPtr(int i, int directOffset) {
return DIRECTSEND && sendDirectBuff[i] ? sendDirectBuff[i]+directOffset : sendPtr(i);
}
send/recvPtr 関数 inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ const T* recvPtr(int i) { return ((const T*)recvBuff[i])+recvOffset(i); }
inline __device__ T* sendPtr(int i) { return ((T*)sendBuff[i])+sendOffset(i); } send/recvBuffにポインタが入っているらしい. |
通信に関する構造体の整理ncclComm構造体struct ncclComm {
struct ncclChannel channels[MAXCHANNELS];
struct ncclPeerInfo* peerInfo;
void* bootstrap;
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator
int cudaDev; // my cuda device index
int nvmlDev; // my NVML device number
enum { GROUP, PARALLEL } launchMode;
cudaStream_t userStream;
bool userStreamSet;
cudaEvent_t doneEvent;
bool checkPointers;
// Counter to make sure collectives match (needed for bcast/reduce
// where syncs are not symmetric).
uint64_t opCount;
// Channels for collectives
int nChannels;
int nThreads;
// Low-latency algorithm threshold
ssize_t llThreshold;
ssize_t threadThreshold;
// Tree algorithm threshold
ssize_t treeThreshold;
// An internal CUDA stream for NCCL kernel CGMD launches
int groupCudaStream;
cudaStream_t groupStream;
// Whether there has been a fatal error in this communicator.
ncclResult_t fatalError;
// Error reported by GPU
volatile ncclDevError_t* fatalDevError;
// Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag;
// Device side of the communicator
struct ncclDevComm *devComm;
// Host copy of the devComm (to free CUDA allocs)
struct ncclDevComm hostDevComm;
// Intra-process sync
int intraRank;
int intraRanks;
int* intraBarrier;
int intraPhase;
// Storage for deferred intra-process launch
struct cudaLaunchParams * intraParams;
struct cudaLaunchParams *myParams;
int* intraCudaDevs;
int* intraCGMode; // Whether we can use CUDA9 CGMD or not
int* intraCC; // Only to check all have the same ComputeCap and disable CGMode if not
struct ncclColl args;
void* argsptr;
// Global proxy thread
pthread_t proxyThread;
struct ncclProxyState proxyState;
}; ncclChannel構造体struct ncclChannel* channel = comm->channels+blockIdx.x; こういう使われ方をするので,各CUDAブロックが担当する通信に必要なデータの塊を入れたやつ 定義 struct ncclChannel {
union {
struct {
struct ncclRing ring;
struct ncclTree tree;
int id;
int nthreads;
int buffSize;
// Communication structures
struct ncclPeer* peers;
struct ncclPeer* devPeers;
// Operation list for aggregation
struct ncclColl* collectives;
struct ncclColl* devCollectives;
int collStart;
int collCount;
int collFifoHead; // Only used by GPU
int collFifoTail; // Only used by CPU
};
int data[0x80];
};
}; ncclPeer構造体特に存在価値がない struct ncclPeer {
struct ncclConnector send;
struct ncclConnector recv;
}; ncclConnector構造体色々入っているが,GPUスレッドが使うのはncclConnInfoだけ. struct ncclConnector {
int connected;
struct ncclProxyArgs *proxyAppend;
struct ncclTransportComm* transportComm;
void* transportResources; // Host-side resources
struct ncclConnInfo conn;
struct ncclComm *comm;
}; ncclConnInfo構造体なんもわからん struct ncclConnInfo {
// Regular comm mechanism
char *buff; // Local for recv, remote for send
uint64_t *tail; // Local for recv, remote for send
uint64_t *head; // Local for send, remote for recv
uint64_t *opCountLoc; // opCount of local rank
uint64_t *opCountRem; // opCount of remote rank
int direct; // Direct communication
void **ptrExchange; // Pointer exchange for direct communication
int *fifo; // Size fifo for proxy
uint64_t step; // Keep where we are
// Low latency mechanism
union ncclLLFifoLine *llBuff; // Local for recv, remote for send
uint64_t llLastCleaning;
}; ncclProxyState構造体struct ncclProxyState {
pthread_cond_t cond;
pthread_mutex_t mutex;
bool stop;
struct ncclProxyArgs* ops;
struct ncclProxyArgs* pool;
struct ncclProxyPool* pools;
} ncclComm直下に入っている,Proxyの状態を全て管理する構造体.proxyThread本体はpthread_tとして別に確保されている.ここの操作をするときはmutexを取る. ncclSend/RecvMemActually larger than that: ncclRecvMemはより大きい領域に確保されており,buffの続きは構造体外にそのまま入っている() struct ncclSendMem {
union {
struct {
uint64_t head;
char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
void* ptrExchange;
char pad2[CACHE_LINE_SIZE-sizeof(void*)];
uint64_t opCount;
};
char pad3[MEM_ALIGN];
};
};
struct ncclRecvMem {
union {
struct {
uint64_t tail;
char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
uint64_t opCount;
char pad2[CACHE_LINE_SIZE-sizeof(uint64_t)];
int sizesFifo[NCCL_STEPS];
};
char pad4[MEM_ALIGN];
};
ncclLLFifoLine llBuff[NCCL_LL_BUFF_LINES];
char buff[1]; // Actually larger than that
}; |
ncclPrimitives内での通信初期化for (int i=0; i<NRECV && recvPeers[i] >= 0; i++)
loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff);
for (int i=0; i<NSEND && sendPeers[i] >= 0; i++)
loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i, directBuff); NRECV, NSENDに対応することから.sendPeers, recvPeersは自分の対応する送受信先のindex.(ringなら0だけ) |
SHMNETの前に,一番行数の少ないSHMの動作を理解する ncclResult_t shmSendConnect(struct ncclConnect* connectInfo, struct ncclConnector* send) {
// Setup device pointers
struct shmConnectInfo* info = (struct shmConnectInfo*)connectInfo;
struct shmSendResources* resources = (struct shmSendResources*)send->transportResources;
char shmName[MAX_SHM_NAME_LEN];
sprintf(shmName, "nccl-shm-recv-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank);
resources->remShmSize = info->shmSize;
TRACE(NCCL_SHM,"Open shmName %s shmSize %d", shmName, info->shmSize);
NCCLCHECK(shmOpen(shmName, resources->remShmSize, (void**)&resources->remHostMem, (void**)&resources->devRemHostMem, 0));
// Remove the file to ensure proper clean-up
NCCLCHECK(shmUnlink(shmName));
send->transportResources = resources;
send->conn.buff = resources->devRemHostMem->buff;
send->conn.llBuff = resources->devRemHostMem->llBuff;
send->conn.tail = &resources->devRemHostMem->tail;
send->conn.opCountRem = &resources->devRemHostMem->opCount;
send->conn.head = &resources->devHostMem->head;
send->conn.opCountLoc = &resources->devHostMem->opCount;
return ncclSuccess;
}
ncclResult_t shmRecvConnect(struct ncclConnect* connectInfo, struct ncclConnector* recv) {
// Setup device pointers
struct shmRecvResources* resources = (struct shmRecvResources*)recv->transportResources;
struct shmConnectInfo* info = (struct shmConnectInfo*)connectInfo;
char shmName[MAX_SHM_NAME_LEN];
sprintf(shmName, "nccl-shm-send-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank);
resources->remShmSize = info->shmSize;
TRACE(NCCL_SHM,"Open shmName %s shmSize %d", shmName, info->shmSize);
NCCLCHECK(shmOpen(shmName, resources->remShmSize, (void**)&resources->remHostMem, (void**)&resources->devRemHostMem, 0));
NCCLCHECK(shmUnlink(shmName));
recv->conn.head = &resources->devRemHostMem->head;
recv->conn.opCountRem = &resources->devRemHostMem->opCount;
recv->conn.buff = resources->devHostMem->buff;
recv->conn.llBuff = resources->devHostMem->llBuff;
recv->conn.tail = &resources->devHostMem->tail;
recv->conn.opCountLoc = &resources->devHostMem->opCount;
return ncclSuccess;
} |
postRecv/Sendinline __device__ void postRecv(int i) {
*(recvConn[i]->head) = recvStep[i] += SLICESTEPS;
}
inline __device__ void postSend(int i) {
*(sendConn[i]->tail) = sendStep[i] += SLICESTEPS;
} waitSend/waitRecv inline __device__ void waitRecv(int i) {
spins = 0;
mismatch = 0;
recvStep[i] += SLICESTEPS;
if (tid == i) {
while (*(waitPtr) < recvStep[i]) {
if (checkAbort(recvConn[i]->opCountRem)) break;
}
}
}
inline __device__ void waitSend(int i) {
spins = 0;
mismatch = 0;
sendStep[i] += SLICESTEPS;
if (tid == WARP_SIZE+i) {
while (sendConnHead[i] + NCCL_STEPS < sendStep[i]) {
sendConnHead[i] = *waitPtr;
if (checkAbort(sendConn[i]->opCountRem)) break;
}
}
} tail, head
つまり,postSendでtailを進め,postRecvでheadを進める. |
次回やりたいこと:Proxy |
2019/09/01前回の続き |
opsに通信要求が積まれるまでの流れ
(というか,persistentThreadはinit時に起動していたのだった)(完) |
persistentThread
|
netRecvProxy
|
netRecvSetup
netRecvConnect struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem;
recv->conn.buff = recvMem->buff;
recv->conn.llBuff = recvMem->llBuff;
// Head/Tail/Opcount are always on host
recv->conn.tail = &resources->devHostRecvMem->tail;
recv->conn.opCountLoc = &resources->devHostRecvMem->opCount;
recv->conn.head = &resources->devHostSendMem->head;
recv->conn.opCountRem = &resources->devHostSendMem->opCount; Primitivesでいうtail / headは,ここでconn.tail/headに入れたもの. |
まとめ
|
2019/10/06
の通りやる. |
ncclInfoのsetupstruct ncclInfo {
/* below members are initialized in ncclAllReduce (top function) */
ncclColl_t coll;
const char* opName;
// NCCL Coll Args
const void* sendbuff;
void* recvbuff;
size_t count;
ncclDataType_t datatype;
ncclRedOp_t op;
int root;
ncclComm_t comm;
cudaStream_t stream;
int chunkSteps; // ALLREDUCE_CHUNKSTEPS (constant)
int sliceSteps; // ALLREDUCE_SLICESTEPS (constant)
// Computed later
// ncclEnqueueCheck/saveKernel/computeColl/getPatternInfo
ncclPattern_t pattern;
// ncclEnqueueCheck/ArgsCheck
size_t nBytes;
//ncclEnqueueCheck/saveKernel/computeColl/getLoopInfo
int nstepsPerLoop;
int nchunksPerLoop;
}; |
具体的な値NCCL_STEPS: 8 // computeColl
int stepSize = ( llMode ? NCCL_LL_BUFF_SIZE : info->comm->channels[0].buffSize ) / NCCL_STEPS;
int chunkSteps = (llMode|treeMode) ? 1 : info->chunkSteps;
int sliceSteps = (llMode|treeMode) ? 1 : info->sliceSteps;
int chunkSize = stepSize*chunkSteps;
|
使われ方から,chunk/slice/stepの意味合いを探る
|
まとめると:
|
UNROLL(src/collectives/device/common.h)
#define COLL_UNROLL 4 がtemplate引数の (src/collectives/device/common_kernel.h)
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS))) にあるように,unrollする数(=load/storeを連続して行う回数)を多くとも8回にしたい このUNROLLは, template<class FUNC, typename T, int UNROLL, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ __forceinline__ void ReduceCopy128bMulti( const int w, const int nw, const int t,
int nsrcs, const T* s[MAXSRCS], int ndsts, T* d[MAXDSTS],
const int elemOffset, const int Npack) {
const int inc = nw * UNROLL * WARP_SIZE;
int offset = w * UNROLL * WARP_SIZE + t;
const Pack128* srcs[MAXSRCS];
for (int i=0; i<MAXSRCS; i++) srcs[i] = ((const Pack128*)(s[i]+elemOffset))+offset;
Pack128* dsts[MAXDSTS];
for (int i=0; i<MAXDSTS; i++) dsts[i] = ((Pack128*)(d[i]+elemOffset))+offset;
while (offset < Npack) {
Pack128 vals[UNROLL];
// Load and reduce
for (int u = 0; u < UNROLL; ++u) Fetch128(vals[u], srcs[0]+u*WARP_SIZE);
for (int i=1; i<MINSRCS; i++) {
Pack128 vals2[UNROLL];
// MINSRC=2なら,UNROLL=4なので,Fetch128は8回のunroll
// MINSRC=1なら,UNROLL=8なので...
for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE);
for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]);
}
#pragma unroll 1 // そもそもできないのでは?
for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) {
Pack128 vals2[UNROLL];
for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE);
for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]);
}
// Store
for (int i = 0; i < MINDSTS; i++) {
for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]);
}
#pragma unroll 1 // そもそもできない気がする
for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) {
for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]);
}
for (int i=0; i<MAXSRCS; i++) srcs[i] += inc;
for (int i=0; i<MAXDSTS; i++) dsts[i] += inc;
offset += inc;
}
} お気持ち
|
NCCLの初期化のまとめ
|
2019/08/03
目標: ncclPrimitivesの動作を理解する.つまり:
あたりかな?
The text was updated successfully, but these errors were encountered: