From c576f1241762336dc1748ee2597d6873668ff271 Mon Sep 17 00:00:00 2001 From: Bicheng Ying Date: Wed, 5 May 2021 22:01:07 -0700 Subject: [PATCH] Condition variable (#92) * Fixed the self_weight under emtpy receiving case * Enable empty send neighbors and fix HalfTensor for recv_size==0 * Fixed the self_weight under emtpy receiving case * Enable empty send neighbors and fix HalfTensor for recv_size==0 * Rename neighbor_weights to src_weights, and send_neighbors to dst_weights for neighbor_allreduce * A script to test existing examples * Accept dst_weights as Dict, and reorganize DoNeighborAllreduce * Reorganize CheckNeighborSendRecvPattern * Fix timeline_ptr for NCCL * Fix timeline_ptr for NCCL * Put dst_weights information into TensorTableEntry * First Version of neighbor_allreduce dst_weight, existing problem: Fusion Not Implemented, CUDA data_weight problem * Add some delay after data_weight as a temporary solution * CPU Fusion for dst_weighted added * Add ReadyEvent for dst_weight for single entry neighbor_allreduce * Remove const identifier for tensor dtype as it is meaningless * Add cuda source for scalebuffer * Scale buffer to modify itself * Add .o file to .gitignore * dst_weight using CUDA for fused entry & compile flow in Python setup.py * make clean *.o files generated by nvcc * Add fix for NCCL single entry * Make setup.py more robust * Add timeout and cuda check * Move test example * Fix NCCL side dst_weight fusion bug * Add agg to make matplotlib more stable * Address comments for setup.py * Simpler logic for dst_weighting_enabled and weighted_average_computation * Better consideration for weight buffer size * Make src_weights as std::map, and simplify logic for PerformNeighborAllreduceCallback * Add TODO #80 and #81, and simplify the logic for dst_weight * Wrap CheckNeighborSendRecvPattern again * Add two more TODOs * Address review comments * Add condition variable to control the loop * Minor update on topology_setting in global_state * Add missing header * Change cv.wait to cv.wait_for 10 seconds * Address comment and remove adjusting resetVersionWinMem in ibfrun * Add lock to protect loop_cv notify_one Co-authored-by: Hanbin Hu --- bluefog/common/operations.cc | 57 ++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/bluefog/common/operations.cc b/bluefog/common/operations.cc index 578291e0..74363952 100644 --- a/bluefog/common/operations.cc +++ b/bluefog/common/operations.cc @@ -1437,7 +1437,10 @@ Status EnqueueTensorAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1473,7 +1476,10 @@ Status EnqueueTensorBroadcast(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1508,7 +1514,10 @@ Status EnqueueTensorAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1553,7 +1562,10 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1605,7 +1617,10 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1646,7 +1661,10 @@ Status EnqueueTensorPairGossip(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1679,7 +1697,10 @@ Status EnqueueTensorWindowCreate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1704,7 +1725,10 @@ Status EnqueueTensorWindowFree(const std::string& name, int device, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1738,7 +1762,10 @@ Status EnqueueTensorWindowPut(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1770,7 +1797,10 @@ Status EnqueueTensorWindowAccumulate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1798,7 +1828,10 @@ Status EnqueueTensorWindowGet(const std::string& name, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -2048,8 +2081,8 @@ void SetSkipNegotiateStageState(bool value) { { std::lock_guard lk(bluefog_global.loop_mutex); global_skip_negotiate_stage = value; + bluefog_global.loop_cv.notify_one(); } - bluefog_global.loop_cv.notify_one(); } bool GetSkipNegotiateStageState() {