Skip to content

Commit

Permalink
#0: Move SynchronizeWorkerThreads to tt_metal::detail namespace (#14322)
Browse files Browse the repository at this point in the history
- This is a core tt_metal API and shouldn't be in tensor_ops
- Also account for worker thread deadlock when calling this API
  • Loading branch information
tt-asaigal authored Oct 28, 2024
1 parent cbcd5b3 commit 25a35e3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
2 changes: 2 additions & 0 deletions tt_metal/detail/tt_metal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,7 @@ inline namespace v0 {
DeviceAddr AllocateBuffer(Buffer* buffer);

void DeallocateBuffer(Buffer *buffer);

void SynchronizeWorkerThreads(const std::vector<Device*>& workers);
} // namespace detail
} // namespace tt::tt_metal
17 changes: 17 additions & 0 deletions tt_metal/tt_metal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,23 @@ void DeallocateBuffer(Buffer *buffer) {
allocator::deallocate_buffer(*buffer->device()->allocator_, buffer);
}

void SynchronizeWorkerThreads(const std::vector<Device*>& workers) {
if (tt::tt_metal::detail::InWorkerThread()) {
// Early exit if in a worker thread, since waiting for the worker
// queue to become empty inside a worker thread leads to a deadlock
// Synchronizing in a worker thread should be a nop by definition
return;
}
// Push empty work to threads and ensure its been picked up
for (auto target_device : workers) {
target_device->work_executor.push_work([](){});
}
// Block until work has been picked up, to flush the queue
for (auto target_device : workers) {
while(not target_device->work_executor.worker_queue.empty());
}
}

} // namespace detail

inline namespace v0 {
Expand Down
16 changes: 1 addition & 15 deletions ttnn/cpp/ttnn/tensor/tensor_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,6 @@
#include "ttnn/core.hpp"


namespace{
inline void SynchronizeWorkerThreads(const std::vector<Device*>& workers) {
// Push empty work to threads and ensure its been picked up
for (auto target_device : workers) {
target_device->work_executor.push_work([](){});
}
// Block until work has been picked up, to flush the queue
for (auto target_device : workers) {
while(not target_device->work_executor.worker_queue.empty());
}
}
}


namespace tt::tt_metal::tensor_ops {

Tensor tensor_to(const Tensor& input_tensor, Device* target_device, const MemoryConfig& mem_config) {
Expand Down Expand Up @@ -147,7 +133,7 @@ Tensor tensor_cpu(const Tensor& input_tensor, bool blocking, uint8_t cq_id) {
}

if (blocking) {
SynchronizeWorkerThreads(workers);
tt::tt_metal::detail::SynchronizeWorkerThreads(workers);
}
// Update main_thread_ref_count for tensor after pushing to queue.
input_tensor.tensor_attributes->update_main_thread_ref_count(workers.at(0), original_tensor_ref_count);
Expand Down

0 comments on commit 25a35e3

Please sign in to comment.