Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve neighbor allreduce #78

Merged
merged 40 commits into from
Apr 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
61b472c
Fixed the self_weight under emtpy receiving case
BichengYing Feb 8, 2021
7e8ea66
Enable empty send neighbors and fix HalfTensor for recv_size==0
Feb 11, 2021
1d43096
Fixed the self_weight under emtpy receiving case
BichengYing Feb 8, 2021
c082ce9
Enable empty send neighbors and fix HalfTensor for recv_size==0
Feb 11, 2021
50933fb
Merge branch 'improve_neighbor_allreduce' of https://github.com/ybc19…
Feb 19, 2021
a3365b3
Rename neighbor_weights to src_weights, and send_neighbors to dst_wei…
Feb 19, 2021
cf17875
A script to test existing examples
Feb 19, 2021
923df09
Accept dst_weights as Dict, and reorganize DoNeighborAllreduce
Feb 19, 2021
42d0af7
Reorganize CheckNeighborSendRecvPattern
Feb 19, 2021
3184e17
Fix timeline_ptr for NCCL
Feb 19, 2021
c85cbd5
Fix timeline_ptr for NCCL
Feb 19, 2021
8d9fe31
Merge branch 'improve_neighbor_allreduce' of https://github.com/ybc19…
Feb 19, 2021
7d60585
Put dst_weights information into TensorTableEntry
Feb 19, 2021
79c57b3
First Version of neighbor_allreduce dst_weight, existing problem: Fus…
Feb 27, 2021
29a239c
Add some delay after data_weight as a temporary solution
Feb 27, 2021
97517b3
CPU Fusion for dst_weighted added
Feb 27, 2021
364f5fd
Merge branch 'master' into improve_neighbor_allreduce
Feb 28, 2021
cc9faf4
Add ReadyEvent for dst_weight for single entry neighbor_allreduce
Mar 4, 2021
7f47ce4
Remove const identifier for tensor dtype as it is meaningless
Mar 4, 2021
d84138b
Add cuda source for scalebuffer
Mar 4, 2021
06c375d
Scale buffer to modify itself
Mar 4, 2021
e4db92f
Add .o file to .gitignore
Mar 5, 2021
ec2a3fc
dst_weight using CUDA for fused entry & compile flow in Python setup.py
Mar 5, 2021
adaa191
make clean *.o files generated by nvcc
Mar 5, 2021
46e6431
Add fix for NCCL single entry
Mar 15, 2021
ae57e75
Make setup.py more robust
Mar 15, 2021
62dc99d
Add timeout and cuda check
Mar 17, 2021
a836b02
Move test example
Mar 17, 2021
e02a3d0
Fix NCCL side dst_weight fusion bug
Mar 18, 2021
8e42ca2
Add agg to make matplotlib more stable
Mar 18, 2021
e5f8722
Address comments for setup.py
Mar 21, 2021
9f2f55d
Simpler logic for dst_weighting_enabled and weighted_average_computation
Mar 21, 2021
d7a9310
Better consideration for weight buffer size
Mar 21, 2021
2fe9621
Merge branch 'master' into improve_neighbor_allreduce
Mar 26, 2021
562b231
Make src_weights as std::map, and simplify logic for PerformNeighborA…
Mar 26, 2021
6764efa
Add TODO #80 and #81, and simplify the logic for dst_weight
Mar 26, 2021
1e8db23
Wrap CheckNeighborSendRecvPattern again
Mar 26, 2021
3d90087
Add two more TODOs
Mar 26, 2021
96887b5
Address review comments
Mar 27, 2021
36c073a
Add condition variable to control the loop (#88)
BichengYing Apr 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__/

# C extensions
*.so
*.o

# Distribution / packaging
.Python
Expand Down
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test_torch: test_torch_basic test_torch_ops test_torch_win_ops test_torch_optimi
test_tensorflow: test_tensorflow_basic test_tensorflow_ops
test_all: test_torch test_tensorflow

clean: clean_build clean_so
clean: clean_build clean_so clean_o

.PHONY: test_torch_basic
test_torch_basic:
Expand Down Expand Up @@ -51,8 +51,12 @@ test_tensorflow_ops:

.PHONY: clean_build
clean_build:
rm -R build
rm -fR build

.PHONY: clean_so
clean_so:
rm ./bluefog/torch/mpi_lib.*.so
rm -f ./bluefog/torch/mpi_lib.*.so

.PHONY: clean_o
clean_o:
rm -f ./bluefog/common/cuda/*.o
9 changes: 7 additions & 2 deletions bluefog/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ class TensorShape {

class Tensor {
public:
virtual const DataType dtype() const = 0;
virtual DataType dtype() const = 0;
hanbinhu marked this conversation as resolved.
Show resolved Hide resolved
virtual const TensorShape shape() const = 0;
virtual const void* data() const = 0;
virtual std::shared_ptr<common::Tensor> data_weight(float weight) = 0;
virtual std::unique_ptr<common::Tensor> data_weight(float weight) = 0;
virtual int64_t size() const = 0;
virtual ~Tensor() = default;
};
Expand Down Expand Up @@ -241,6 +241,7 @@ class OpContext {
std::shared_ptr<Tensor>* tensor) = 0;
virtual Status AllocateZeros(int64_t num_elements, DataType dtype,
std::shared_ptr<Tensor>* tensor) = 0;
virtual std::shared_ptr<ReadyEvent> RecordReadyEvent(int device) = 0;
virtual Framework framework() const = 0;
virtual ~OpContext() = default;
};
Expand Down Expand Up @@ -279,10 +280,14 @@ struct TensorTableEntry {
// Neighbors for dynamic neighbor_allreduce.
std::shared_ptr<std::vector<int>> send_neighbors;
std::shared_ptr<std::vector<int>> recv_neighbors;
std::shared_ptr<std::vector<double>> send_weights;

// Boolean value if dynamic neighbor is enabled.
bool dynamic_neighbors_enabled = false;

// Boolean value for enabling destination(send) weighting operation or not.
bool dst_weighting_enabled = false;

// Boolean value for enabling topology check.
bool enable_topo_check = false;

Expand Down
120 changes: 120 additions & 0 deletions bluefog/common/cuda/cuda_kernels.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (C) 2020 NVIDIA CORPORATION. All rights reserved.
hanbinhu marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "cuda_kernels.h"

#include <stdexcept>
#include <cuda_fp16.h>

namespace bluefog {
namespace common {

template<typename T, typename TS>
__global__ void scale_buffer_k(T* buffer, int64_t num_elements, const TS scale_factor) {

const size_t idx = static_cast<size_t>(blockDim.x) * blockIdx.x + threadIdx.x;

for (size_t i = idx; i < num_elements; i += gridDim.x * blockDim.x) {
buffer[i] *= scale_factor;
}
}

// Specialization for half2
__global__ void scale_buffer_half2_k(__half* buffer, int64_t num_elements, const __half scale_factor) {

const size_t idx = static_cast<size_t>(blockDim.x) * blockIdx.x + threadIdx.x;

#if __CUDA_ARCH__ > 530
__half2* buffer_h2 = reinterpret_cast<__half2 *>(buffer);
const __half2 scale_factor_h2 = __halves2half2(scale_factor, scale_factor);

for (size_t i = idx; i < num_elements / 2; i += gridDim.x * blockDim.x) {
buffer_h2[i] = __hmul2(scale_factor_h2, buffer_h2[i]);
}

// Deal with last element if num_elements is odd
if (idx == 0 && num_elements % 2) {
buffer[num_elements - 1] = __hmul(scale_factor, buffer[num_elements - 1]);
}
#else
for (size_t i = idx; i < num_elements; i += gridDim.x * blockDim.x) {
buffer[i] = __float2half(__half2float(scale_factor) * __half2float(buffer[i]));
}
#endif
}

// Specialization for architectures without __half compute
template<>
__global__ void scale_buffer_k(__half* buffer, int64_t num_elements, const __half scale_factor) {

const size_t idx = static_cast<size_t>(blockDim.x) * blockIdx.x + threadIdx.x;

#if __CUDA_ARCH__ > 530
for (size_t i = idx; i < num_elements; i += gridDim.x * blockDim.x) {
buffer[i] *= scale_factor;
}
#else
for (size_t i = idx; i < num_elements; i += gridDim.x * blockDim.x) {
buffer[i] = __float2half(__half2float(scale_factor) * __half2float(buffer[i]));
}
#endif
}

#define NTHREADS_SCALE_BUFFER_KERNEL 512
void ScaleBufferCudaImpl(double scale_factor, void* buffer_data, const int64_t num_elements,
DataType dtype, cudaStream_t stream) {
const int64_t blocks = (num_elements + NTHREADS_SCALE_BUFFER_KERNEL - 1) / NTHREADS_SCALE_BUFFER_KERNEL;
const int threads = NTHREADS_SCALE_BUFFER_KERNEL;
switch (dtype) {
case DataType::BLUEFOG_UINT8:
scale_buffer_k<<<blocks, threads, 0, stream>>>((uint8_t*) buffer_data, num_elements, scale_factor);
break;
case DataType::BLUEFOG_INT8:
scale_buffer_k<<<blocks, threads, 0, stream>>>((int8_t*) buffer_data, num_elements, scale_factor);
break;
case DataType::BLUEFOG_INT32:
scale_buffer_k<<<blocks, threads, 0, stream>>>((int32_t*) buffer_data, num_elements, scale_factor);
break;
case DataType::BLUEFOG_INT64:
scale_buffer_k<<<blocks, threads, 0, stream>>>((int64_t*) buffer_data, num_elements, scale_factor);
break;
case DataType::BLUEFOG_FLOAT16:
{
__half scale_factor_half = __float2half((float) scale_factor);
if ((size_t) buffer_data % 4 == 0) {
// If alignment allows, use half2 specialized kernel
int64_t num_elements_h2 = (num_elements + 1) / 2;
int64_t blocks_h2 = (num_elements_h2 + NTHREADS_SCALE_BUFFER_KERNEL - 1) / NTHREADS_SCALE_BUFFER_KERNEL;
scale_buffer_half2_k<<<blocks_h2, threads, 0, stream>>>((__half*) buffer_data, num_elements, scale_factor_half);
} else {
scale_buffer_k<<<blocks, threads, 0, stream>>>((__half*) buffer_data, num_elements, scale_factor_half);
}
break;
}
case DataType::BLUEFOG_FLOAT32:
scale_buffer_k<<<blocks, threads, 0, stream>>>((float*) buffer_data, num_elements, (float) scale_factor);
break;
case DataType::BLUEFOG_FLOAT64:
scale_buffer_k<<<blocks, threads, 0, stream>>>((double*) buffer_data, num_elements, scale_factor);
break;
default:
throw std::logic_error("Type " + DataType_Name(dtype) +
" not supported by ScaleBufferCudaImpl.");
}
}

} // namespace common
} // namespace bluefog

33 changes: 33 additions & 0 deletions bluefog/common/cuda/cuda_kernels.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (C) 2020 NVIDIA CORPORATION. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef CUDA_KERNELS_H
#define CUDA_KERNELS_H

#include <cuda_runtime.h>

#include "../common.h"

namespace bluefog {
namespace common {

// Scales buffer by scalar
void ScaleBufferCudaImpl(double scale_factor, void* buffer_data, const int64_t num_elements,
DataType dtype, cudaStream_t stream);

} // namespace common
} // namespace bluefog

#endif // CUDA_KERNELS_H
14 changes: 11 additions & 3 deletions bluefog/common/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define BLUEFOG_COMMON_GLOBAL_STATE_H

#include <atomic>
#include <condition_variable>
#include <chrono>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -54,6 +55,14 @@ struct BluefogGlobalState {
// Whether collective context has been completed on the background thread.
std::atomic_bool initialization_done{false};

// Condition variable and its mutex for main loop in communication thread.
std::condition_variable loop_cv;
std::mutex loop_mutex;

// Under negotiation, the entries sends to master first and wait until it
// returns ok to run. This variable keeps the records of that.
std::atomic_int unfinished_enqueued_entries{0};

// Timeline writer.
Timeline timeline;

Expand All @@ -80,13 +89,12 @@ struct BluefogGlobalState {
// Threshold for Tensor Fusion. All tensors that occupy memory beyond this
// threshold will be fused.
int64_t tensor_fusion_threshold = 8 * 1024 * 1024;
int64_t tensor_fusion_threshold_for_dst_weight = tensor_fusion_threshold;
FusionBufferManager fusion_buffer;

// Because setting topology happens in the main thread instead of communication
// thread. Following three variables are to sync between them.
// thread. Not really used since the condition variable refactor.
std::atomic_bool setting_topology{false};
std::atomic_bool setting_topology_done{false};
std::atomic_bool ready_to_setting_topology{false};

// Only exists on the coordinator node (rank zero). Maintains a vector of
// requests to allreduce every tensor (keyed by tensor name).
Expand Down
11 changes: 9 additions & 2 deletions bluefog/common/mpi_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bool WindowManager::InitializeMutexWin(const MPI_Comm& mpi_comm) {
std::vector<int> WindowManager::GetVersionMemoryCopy() { return version_mem_; }

void WindowManager::resetVersionWinMem(int initialValue /*=0*/) {
for (int i = 0; i < version_mem_.size(); i++) {
for (size_t i = 0; i < version_mem_.size(); i++) {
version_mem_[i] = initialValue;
}
}
Expand Down Expand Up @@ -222,7 +222,7 @@ MPI_Op MPIContext::GetMPISumOp(DataType dtype) {
return dtype == DataType::BLUEFOG_FLOAT16 ? mpi_float16_sum : MPI_SUM;
}

MPI_Comm MPIContext::GetMPICommunicator(Communicator comm) {
MPI_Comm MPIContext::GetMPICommunicator(Communicator comm) const {
switch (comm) {
case Communicator::GLOBAL:
return mpi_comm;
Expand Down Expand Up @@ -332,6 +332,13 @@ void MPIContext::Initialize(const std::vector<int>& ranks,

// Create custom MPI float16 summation op.
MPI_Op_create(&float16_sum, 1, &mpi_float16_sum);

#if HAVE_CUDA
int greatest_priority;
CUDACHECK(cudaDeviceGetStreamPriorityRange(NULL, &greatest_priority));
CUDACHECK(cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking,
greatest_priority));
#endif
}

void MPIContext::Finalize(MPIContextManager& ctx_manager) {
Expand Down
15 changes: 14 additions & 1 deletion bluefog/common/mpi_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include <unordered_map>
#include <vector>

#if HAVE_CUDA
#include "cuda_runtime.h"
#endif

#include "common.h"
#include "mpi.h"

Expand Down Expand Up @@ -144,7 +148,7 @@ class MPIContext {

MPI_Op GetMPISumOp(DataType dtype);

MPI_Comm GetMPICommunicator(Communicator comm);
MPI_Comm GetMPICommunicator(Communicator comm) const;

int GetMPITypeSize(DataType dtype);

Expand Down Expand Up @@ -232,8 +236,17 @@ class MPIContext {
// MPI Custom data type for float16.
MPI_Datatype mpi_float16_t;
MPI_Op mpi_float16_sum;

// TODO(hhb): #80 We should use a common context for MPI and NCCL controller for CUDA usage.
#if HAVE_CUDA
// CUDA Stream
BichengYing marked this conversation as resolved.
Show resolved Hide resolved
cudaStream_t stream;
#endif
};

std::string GenerateNeighborExchangeErrorMessage(const std::vector<MPI_Status>& statuses,
int nsend, int nrecv);

} // namespace common
} // namespace bluefog

Expand Down
Loading