diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 86428a514e..decdff81b1 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -102,9 +102,9 @@ struct RequestReleasePayload final { // // ResponseQueue // -// A simple queue holding the responses to be written. Uses a -// vector of persistent message objects to prevent allocating -// memory for each response to be written. +// This class implements a queue to manage responses that need to be written. +// It internally uses a reusable pool of persistent message objects to avoid +// allocating memory for each response individually. // template class ResponseQueue { @@ -113,19 +113,29 @@ class ResponseQueue { ~ResponseQueue() { + // Delete all responses in the reusable pool + for (auto response : reusable_pool_) { + delete response; + } + + // Delete all responses currently in the queue for (auto response : responses_) { delete response; } } - // Resets the queue + // Resets the queue to its initial state void Reset() { + std::lock_guard lock(mtx_); alloc_count_ = 0; ready_count_ = 0; - current_index_ = 0; - for (auto response : responses_) { - response->Clear(); + pop_count_ = 0; + + while (!responses_.empty()) { + responses_.front()->Clear(); + reusable_pool_.push_back(responses_.front()); + responses_.pop_front(); } } @@ -137,17 +147,27 @@ class ResponseQueue { std::lock_guard lock(mtx_); alloc_count_ = 1; if (responses_.size() < 1) { - responses_.push_back(new ResponseType()); + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { + responses_.push_back(new ResponseType()); + } } return responses_[0]; } - // Allocates a response on the head of the queue + // Allocates a response at the end of the queue void AllocateResponse() { std::lock_guard lock(mtx_); alloc_count_++; - if (responses_.size() < alloc_count_) { + + // Use a response from the reusable pool if available + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { responses_.push_back(new ResponseType()); } } @@ -156,12 +176,15 @@ class ResponseQueue { ResponseType* GetLastAllocatedResponse() { std::lock_guard lock(mtx_); - if (responses_.size() < alloc_count_) { + + // Ensure that the requested response has been allocated + if ((responses_.size() + pop_count_) < alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access the response not yet allocated"; return nullptr; } - return responses_[alloc_count_ - 1]; + + return responses_.back(); } // Marks the next non-ready response complete @@ -178,43 +201,71 @@ class ResponseQueue { return true; } - // Gets the current response from the tail of - // the queue. + // Gets the current response from the front of the queue ResponseType* GetCurrentResponse() { std::lock_guard lock(mtx_); - if (current_index_ >= ready_count_) { + if (pop_count_ >= ready_count_) { LOG_ERROR << "[INTERNAL] Attempting to access current response when it " "is not ready"; return nullptr; } - return responses_[current_index_]; + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses are available in the queue."; + return nullptr; + } + + return responses_.front(); } // Gets the response at the specified index ResponseType* GetResponseAt(const uint32_t index) { std::lock_guard lock(mtx_); + + // Check if the index is valid for allocated responses if (index >= alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access response which is not yet " "allocated"; return nullptr; } - return responses_[index]; + if (index < pop_count_) { + LOG_ERROR << "[INTERNAL] Attempting to access a response that has " + "already been removed from the queue."; + return nullptr; + } + + // Adjust index based on number of popped responses to get actual index in + // 'responses_' + return responses_[index - pop_count_]; } - // Pops the response from the tail of the queue + // Removes the current response from the front of the queue void PopResponse() { std::lock_guard lock(mtx_); - current_index_++; + + // Ensure there are responses in the queue to pop + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses in the queue to pop."; + return; + } + + // Clear and move the current response to the reusable pool + auto response = responses_.front(); + response->Clear(); + reusable_pool_.push_back(response); + responses_.pop_front(); + pop_count_++; } // Returns whether the queue is empty bool IsEmpty() { std::lock_guard lock(mtx_); - return ((alloc_count_ == ready_count_) && (alloc_count_ == current_index_)); + return ( + (alloc_count_ == ready_count_) && (alloc_count_ == pop_count_) && + responses_.empty()); } // Returns whether the queue has responses @@ -222,20 +273,21 @@ class ResponseQueue { bool HasReadyResponse() { std::lock_guard lock(mtx_); - return (ready_count_ > current_index_); + return (ready_count_ > pop_count_); } private: - std::vector responses_; + // Stores responses that need to be written. The front of the queue indicates + // the current response, while the back indicates the last allocated response. + std::deque responses_; + // Stores completed responses that can be reused + std::deque reusable_pool_; std::mutex mtx_; - // There are three indices to track the responses in the queue - // Tracks the allocated response - uint32_t alloc_count_; - // Tracks the response that is ready to be written - uint32_t ready_count_; - // Tracks the response next in the queue to be written - uint32_t current_index_; + // Three counters are used to track and manage responses in the queue + uint32_t alloc_count_; // Number of allocated responses + uint32_t ready_count_; // Number of ready-to-write responses + uint32_t pop_count_; // Number of removed responses from the queue };