Skip to content

Commit

Permalink
#0: Refactor allocation/deallocation threading to allow getting buffe…
Browse files Browse the repository at this point in the history
…r address from another thread
  • Loading branch information
sminakov-tt committed Oct 22, 2024
1 parent b1ff5aa commit 0efa98a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 12 deletions.
74 changes: 64 additions & 10 deletions tt_metal/impl/buffers/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Buffer::Buffer(
TT_FATAL(this->device_ != nullptr and this->device_->allocator_ != nullptr, "Device and allocator need to not be null.");

if (size == 0) {
is_allocated_ = true;
allocation_status_.store(AllocationStatus::ALLOCATED, std::memory_order::relaxed);
return;
}

Expand Down Expand Up @@ -228,49 +228,103 @@ BufferPageMapping generate_buffer_page_mapping(const Buffer& buffer) {
}

void Buffer::allocate() {
{
std::unique_lock lock(allocation_mutex_);
TT_FATAL(allocation_status_.load(std::memory_order::relaxed) == AllocationStatus::NOT_ALLOCATED, "Can't allocate buffer after it was already allocated");
allocation_status_.store(AllocationStatus::ALLOCATION_REQUESTED, std::memory_order::relaxed);
}

device_->push_work([self = weak_self.lock()] {
if (self->is_allocated_) {
std::unique_lock lock(self->allocation_mutex_);
if (self->allocation_status_.load(std::memory_order::relaxed) != AllocationStatus::ALLOCATION_REQUESTED) {
// The allocation was interrupted by a deallocation
return;
}

bool bottom_up = self->bottom_up_.value_or(self->is_dram());
detail::AllocateBuffer(self.get(), bottom_up);
detail::BUFFER_MAP.insert({self->device_->id(), self->address_}, self.get());

self->is_allocated_ = true;
self->allocation_status_.store(AllocationStatus::ALLOCATED, std::memory_order::relaxed);
lock.unlock();
self->allocation_cv_.notify_all();
});
}

void Buffer::deallocate() {
if (size_ == 0) {
// 0-size buffer, no need to deallocate
return;
}

{
std::unique_lock lock(allocation_mutex_);
auto status = allocation_status_.load(std::memory_order::relaxed);
if (status != AllocationStatus::ALLOCATED && status != AllocationStatus::ALLOCATION_REQUESTED) {
// Buffer isn't allocated, nothing to be done
return;
}
// Overwriting either ALLOCATED or ALLOCATION_REQUESTED with DEALLOCATION_REQUESTED
allocation_status_.store(AllocationStatus::DEALLOCATION_REQUESTED, std::memory_order::relaxed);
}

device_->push_work([self = weak_self.lock()] {
if (!self->is_allocated_ || !self->device_->initialized_ || self->size_ == 0) {
// Because the status is DEALLOCATION_REQUESTED, it won't be changed by anyone else, no need to lock a mutex
if (!self->device_->initialized_) {
return;
}

detail::BUFFER_MAP.erase({self->device()->id(), self->address()});
detail::DeallocateBuffer(self.get());
self->is_allocated_ = false;
self->allocation_status_.store(AllocationStatus::DEALLOCATED, std::memory_order::relaxed);
});
}

void Buffer::deallocateAndDelete(Buffer* buffer) {
// This is the last reference to the buffer, no need to lock or update AllocationStatus
buffer->device_->push_work([buffer] {
if (buffer->is_allocated_ && buffer->device_->initialized_ && buffer->size_ != 0) {
detail::BUFFER_MAP.erase({buffer->device_->id(), buffer->address_});
detail::DeallocateBuffer(buffer);
// Buffer will be deleted at the end of this block
std::unique_ptr<Buffer> unique_buffer = std::unique_ptr<Buffer>(buffer);

auto status = buffer->allocation_status_.load(std::memory_order::relaxed);
if (status == AllocationStatus::NOT_ALLOCATED || status == AllocationStatus::ALLOCATION_REQUESTED || status == AllocationStatus::DEALLOCATED) {
// Buffer isn't allocated, nothing to be done
return;
}

delete buffer;
if (!buffer->device_->initialized_ || buffer->size_ == 0) {
return;
}

detail::BUFFER_MAP.erase({buffer->device_->id(), buffer->address_});
detail::DeallocateBuffer(buffer);
});
}

bool Buffer::is_allocated() const {
auto allocation_status = allocation_status_.load(std::memory_order::relaxed);
if (device_->can_use_passthrough_scheduling()) {
return allocation_status == AllocationStatus::ALLOCATED;
}
// For calls from different threads we consider buffer to be allocated even if it's just ALLOCATION_REQUESTED,
// because once the caller will try to access it, the buffer will already be fully allocated
return allocation_status == AllocationStatus::ALLOCATED || allocation_status == AllocationStatus::ALLOCATION_REQUESTED;
}

uint32_t Buffer::address() const {
TT_FATAL(device_->can_use_passthrough_scheduling() , "Buffer::address must be called in device worker thread");
if (device_->can_use_passthrough_scheduling()) {
// No locking required, because address can only be modified from the same thread
return address_;
}

std::unique_lock lock(allocation_mutex_);
allocation_cv_.wait(lock, [this] { return this->allocation_status_.load(std::memory_order::relaxed) != AllocationStatus::ALLOCATION_REQUESTED; });
return address_;
}

void Buffer::set_address(uint64_t addr) {
TT_FATAL(device_->can_use_passthrough_scheduling() , "Buffer::set_address must be called in device worker thread");
TT_FATAL(allocation_status_.load(std::memory_order::relaxed) == AllocationStatus::ALLOCATION_REQUESTED, "Buffer address can only be set during allocation");
address_ = addr;
}

Expand Down
27 changes: 25 additions & 2 deletions tt_metal/impl/buffers/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Buffer final {

Device *device() const { return device_; }
DeviceAddr size() const { return size_; }
bool is_allocated() const { return is_allocated_; }
bool is_allocated() const;

// Returns address of buffer in the first bank
uint32_t address() const;
Expand Down Expand Up @@ -224,12 +224,35 @@ class Buffer final {

DeviceAddr translate_page_address(uint64_t offset, uint32_t bank_id) const;

enum class AllocationStatus : uint8_t {
// The buffer is created in NOT_ALLOCATED state (except for 0-size buffers, which are initially ALLOCATED).
// The buffer can transition from NOT_ALLOCATED to ALLOCATION_REQUESTED only once in its lifetime.
NOT_ALLOCATED,
// The task is scheduled on the queue to allocate the buffer.
// When the task succeeds, the buffer transitions into ALLOCATED state.
// The scheduled allocation can be interrupted by a deallocation, which would transition the buffer to DEALLOCATION_REQUESTED,
// and then to DEALLOCATED states.
ALLOCATION_REQUESTED,
// The buffer is completely allocated and the address is available.
// The buffer can transition from ALLOCATED only to DEALLOCATION_REQUESTED.
ALLOCATED,
// The task is scheduled to deallocate the buffer.
// When the task succeeds, the buffer transitions into DEALLOCATED state.
DEALLOCATION_REQUESTED,
// The buffer is completely deallocated.
// This is the final state, no transitions from this state are possible.
DEALLOCATED,
};

Device * const device_;
const DeviceAddr size_; // Size in bytes
const BufferType buffer_type_;
const TensorMemoryLayout buffer_layout_;
const std::optional<bool> bottom_up_;
std::atomic<bool> is_allocated_ = false;

std::atomic<AllocationStatus> allocation_status_ = AllocationStatus::NOT_ALLOCATED;
mutable std::condition_variable allocation_cv_;
mutable std::mutex allocation_mutex_;

// These members must be only accessed on the device worker thread
DeviceAddr address_ = 0; // Address of buffer
Expand Down

0 comments on commit 0efa98a

Please sign in to comment.