Skip to content

Commit

Permalink
Condition variable (#92)
Browse files Browse the repository at this point in the history
* Fixed the self_weight under emtpy receiving case

* Enable empty send neighbors and fix HalfTensor for recv_size==0

* Fixed the self_weight under emtpy receiving case

* Enable empty send neighbors and fix HalfTensor for recv_size==0

* Rename neighbor_weights to src_weights, and send_neighbors to dst_weights for neighbor_allreduce

* A script to test existing examples

* Accept dst_weights as Dict, and reorganize DoNeighborAllreduce

* Reorganize CheckNeighborSendRecvPattern

* Fix timeline_ptr for NCCL

* Fix timeline_ptr for NCCL

* Put dst_weights information into TensorTableEntry

* First Version of neighbor_allreduce dst_weight, existing problem: Fusion Not Implemented, CUDA data_weight problem

* Add some delay after data_weight as a temporary solution

* CPU Fusion for dst_weighted added

* Add ReadyEvent for dst_weight for single entry neighbor_allreduce

* Remove const identifier for tensor dtype as it is meaningless

* Add cuda source for scalebuffer

* Scale buffer to modify itself

* Add .o file to .gitignore

* dst_weight using CUDA for fused entry & compile flow in Python setup.py

* make clean *.o files generated by nvcc

* Add fix for NCCL single entry

* Make setup.py more robust

* Add timeout and cuda check

* Move test example

* Fix NCCL side dst_weight fusion bug

* Add agg to make matplotlib more stable

* Address comments for setup.py

* Simpler logic for dst_weighting_enabled and weighted_average_computation

* Better consideration for weight buffer size

* Make src_weights as std::map, and simplify logic for PerformNeighborAllreduceCallback

* Add TODO #80 and #81, and simplify the logic for dst_weight

* Wrap CheckNeighborSendRecvPattern again

* Add two more TODOs

* Address review comments

* Add condition variable to control the loop

* Minor update on topology_setting in global_state

* Add missing <condition_variable> header

* Change cv.wait to cv.wait_for 10 seconds

* Address comment and remove adjusting resetVersionWinMem in ibfrun

* Add lock to protect loop_cv notify_one

Co-authored-by: Hanbin Hu <[email protected]>
  • Loading branch information
BichengYing and Hanbin Hu authored May 6, 2021
1 parent 5c0f188 commit c576f12
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions bluefog/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,10 @@ Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1473,7 +1476,10 @@ Status EnqueueTensorBroadcast(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1508,7 +1514,10 @@ Status EnqueueTensorAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1553,7 +1562,10 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1605,7 +1617,10 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1646,7 +1661,10 @@ Status EnqueueTensorPairGossip(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1679,7 +1697,10 @@ Status EnqueueTensorWindowCreate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand All @@ -1704,7 +1725,10 @@ Status EnqueueTensorWindowFree(const std::string& name, int device,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1738,7 +1762,10 @@ Status EnqueueTensorWindowPut(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1770,7 +1797,10 @@ Status EnqueueTensorWindowAccumulate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -1798,7 +1828,10 @@ Status EnqueueTensorWindowGet(const std::string& name,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
{
std::unique_lock<std::mutex> lk(bluefog_global.loop_mutex);
bluefog_global.loop_cv.notify_one();
}
return status;
}

Expand Down Expand Up @@ -2048,8 +2081,8 @@ void SetSkipNegotiateStageState(bool value) {
{
std::lock_guard<std::mutex> lk(bluefog_global.loop_mutex);
global_skip_negotiate_stage = value;
bluefog_global.loop_cv.notify_one();
}
bluefog_global.loop_cv.notify_one();
}

bool GetSkipNegotiateStageState() {
Expand Down

0 comments on commit c576f12

Please sign in to comment.