Skip to content

Commit

Permalink
Send explicit acknowledge only when necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Apr 16, 2024
1 parent 2b76e3b commit e6b82b7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
14 changes: 14 additions & 0 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ void ExchangeClient::request(std::vector<RequestSpec>&& requestSpecs) {
std::move(future)
.via(executor_)
.thenValue([self, spec = std::move(spec)](auto&& response) {
bool acknowledge = false;
std::vector<RequestSpec> requestSpecs;
std::shared_ptr<ExchangeSource> currentSource = spec.source;
{
std::lock_guard<std::mutex> l(self->queue_->mutex());
if (self->closed_) {
Expand All @@ -168,6 +170,18 @@ void ExchangeClient::request(std::vector<RequestSpec>&& requestSpecs) {
}
self->totalPendingBytes_ -= spec.maxBytes;
requestSpecs = self->pickSourcesToRequestLocked();
// only acknowledge explicitly if there is no plan to send another
// data request right away
acknowledge =
std::find_if(
requestSpecs.begin(),
requestSpecs.end(),
[&currentSource](const RequestSpec& spec) -> bool {
return spec.source.get() == currentSource.get();
}) == requestSpecs.end();
}
if (acknowledge) {
currentSource->acknowledge();
}
self->request(std::move(requestSpecs));
})
Expand Down
15 changes: 13 additions & 2 deletions velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,29 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
/// Requests the producer to generate up to 'maxBytes' more data and reply
/// within 'maxWait'. Returns a future that completes when producer responds
/// either with 'data' or with a message indicating that all data has been
/// already produced or data will take more time to produce.
/// already produced or data will take more time to produce. Automatically
/// acknowledges successful reception of the data buffers received earlier.
virtual folly::SemiFuture<Response> request(
uint32_t maxBytes,
std::chrono::microseconds maxWait) = 0;

/// Ask for available data sizes that can be fetched. Normally should not
/// fetching any actual data (i.e. Response::bytes should be 0). However for
/// backward compatibility (e.g. communicating with coordinator), we allow
/// small data (1MB) to be returned.
/// small data (1MB) to be returned. Automatically acknowledges successful
/// reception of the data buffers received earlier.
virtual folly::SemiFuture<Response> requestDataSizes(
std::chrono::microseconds maxWait) = 0;

/// Acknowledge successful reception of the data buffers received earlier
/// without requesting any more data. The engine will try to minimize the
/// number of acknowledge requests. For example if there is enough buffer
/// memory available to send an another data request right away an explicit
/// acknowledge will not be issued.
virtual ContinueFuture acknowledge() {
return {};
};

/// Close the exchange source. May be called before all data
/// has been received and processed. This can happen in case
/// of an error or an operator like Limit aborting the query
Expand Down
34 changes: 31 additions & 3 deletions velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include <folly/ScopeGuard.h>
#include <gtest/gtest.h>
#include <atomic>
#include <thread>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/exec/Exchange.h"
Expand Down Expand Up @@ -45,6 +46,8 @@ class ExchangeClientTest : public testing::Test,
}

bufferManager_ = OutputBufferManager::getInstance().lock();

common::testutil::TestValue::enable();
}

std::unique_ptr<SerializedPage> toSerializedPage(const RowVectorPtr& vector) {
Expand Down Expand Up @@ -336,7 +339,6 @@ TEST_F(ExchangeClientTest, multiPageFetch) {

TEST_F(ExchangeClientTest, sourceTimeout) {
constexpr int32_t kNumSources = 3;
common::testutil::TestValue::enable();
auto client =
std::make_shared<ExchangeClient>("test", 17, 1 << 20, pool(), executor());

Expand Down Expand Up @@ -440,8 +442,12 @@ TEST_F(ExchangeClientTest, acknowledge) {
executor());
auto clientCloseGuard = folly::makeGuard([client]() { client->close(); });

client->addRemoteTaskId(sourceTaskId);
client->noMoreRemoteTasks();
std::atomic_int numberOfAcknowledgeRequests{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::test::LocalExchangeSource::acknowledge",
std::function<void(void*)>(([&numberOfAcknowledgeRequests](void*) {
numberOfAcknowledgeRequests++;
})));

{
// adding the first page should not block as there is enough space in
Expand All @@ -456,10 +462,21 @@ TEST_F(ExchangeClientTest, acknowledge) {
// client fetches a single page
ContinueFuture future;
bufferManager_->enqueue(sourceTaskId, 1, makePage(pageSize), &future);

// start fetching
client->addRemoteTaskId(sourceTaskId);
client->noMoreRemoteTasks();

ASSERT_TRUE(std::move(future)
.via(executor())
.wait(std::chrono::seconds{10})
.isReady());

#ifndef NDEBUG
// The client knew there is more data available but could not fetch any more
// Explicit acknowledge was required
EXPECT_EQ(numberOfAcknowledgeRequests, 1);
#endif
}

{
Expand All @@ -486,6 +503,11 @@ TEST_F(ExchangeClientTest, acknowledge) {

ASSERT_TRUE(
std::move(enqueueFuture).wait(std::chrono::seconds{10}).isReady());
#ifndef NDEBUG
// The client knew there is more data available but could not fetch any more
// Explicit acknowledge was required
EXPECT_EQ(numberOfAcknowledgeRequests, 2);
#endif
}

// one page is still in the buffer at this point
Expand All @@ -507,6 +529,12 @@ TEST_F(ExchangeClientTest, acknowledge) {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
ASSERT_TRUE(outputBuffersEmpty);
#ifndef NDEBUG
// The output buffer is empty now
// Explicit acknowledge is not necessary as a blocking getDataSize is sent
// right away
EXPECT_EQ(numberOfAcknowledgeRequests, 2);
#endif
}

pages = fetchPages(*client, 1);
Expand Down
20 changes: 16 additions & 4 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class LocalExchangeSource : public exec::ExchangeSource {
return;
}

int64_t ackSequence;
VeloxPromise<Response> requestPromise;
{
std::vector<ContinuePromise> queuePromises;
Expand All @@ -133,7 +132,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
atEnd_ = true;
}
if (!data.empty()) {
ackSequence = sequence_ = sequence + pages.size();
sequence_ = sequence + pages.size();
}
}
for (auto& promise : queuePromises) {
Expand All @@ -143,8 +142,6 @@ class LocalExchangeSource : public exec::ExchangeSource {
// Outside of queue mutex.
if (atEnd_) {
buffers->deleteResults(taskId_, destination_);
} else if (!data.empty()) {
buffers->acknowledge(taskId_, destination_, ackSequence);
}

if (!requestPromise.isFulfilled()) {
Expand All @@ -165,6 +162,21 @@ class LocalExchangeSource : public exec::ExchangeSource {
return request(0, maxWait);
}

ContinueFuture acknowledge() override {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::acknowledge",
nullptr);
auto buffers = OutputBufferManager::getInstance().lock();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
int64_t ackSequence;
{
std::lock_guard<std::mutex> l(queue_->mutex());
ackSequence = sequence_;
}
buffers->acknowledge(taskId_, destination_, ackSequence);
return {};
}

void close() override {
checkSetRequestPromise();

Expand Down

0 comments on commit e6b82b7

Please sign in to comment.