diff --git a/velox/exec/ExchangeClient.cpp b/velox/exec/ExchangeClient.cpp index 0ca18e1fcf700..377909664059a 100644 --- a/velox/exec/ExchangeClient.cpp +++ b/velox/exec/ExchangeClient.cpp @@ -148,7 +148,9 @@ void ExchangeClient::request(std::vector&& requestSpecs) { std::move(future) .via(executor_) .thenValue([self, spec = std::move(spec)](auto&& response) { + bool acknowledge = false; std::vector requestSpecs; + std::shared_ptr currentSource = spec.source; { std::lock_guard l(self->queue_->mutex()); if (self->closed_) { @@ -168,6 +170,18 @@ void ExchangeClient::request(std::vector&& requestSpecs) { } self->totalPendingBytes_ -= spec.maxBytes; requestSpecs = self->pickSourcesToRequestLocked(); + // only acknowledge explicitly if there is no plan to send another + // data request right away + acknowledge = + std::find_if( + requestSpecs.begin(), + requestSpecs.end(), + [¤tSource](const RequestSpec& spec) -> bool { + return spec.source.get() == currentSource.get(); + }) == requestSpecs.end(); + } + if (acknowledge) { + currentSource->acknowledge(); } self->request(std::move(requestSpecs)); }) diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index a42ee745ac346..282a0099662b7 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -76,7 +76,8 @@ class ExchangeSource : public std::enable_shared_from_this { /// Requests the producer to generate up to 'maxBytes' more data and reply /// within 'maxWait'. Returns a future that completes when producer responds /// either with 'data' or with a message indicating that all data has been - /// already produced or data will take more time to produce. + /// already produced or data will take more time to produce. Automatically + /// acknowledges successful reception of the data buffers received earlier. virtual folly::SemiFuture request( uint32_t maxBytes, std::chrono::microseconds maxWait) = 0; @@ -84,10 +85,20 @@ class ExchangeSource : public std::enable_shared_from_this { /// Ask for available data sizes that can be fetched. Normally should not /// fetching any actual data (i.e. Response::bytes should be 0). However for /// backward compatibility (e.g. communicating with coordinator), we allow - /// small data (1MB) to be returned. + /// small data (1MB) to be returned. Automatically acknowledges successful + /// reception of the data buffers received earlier. virtual folly::SemiFuture requestDataSizes( std::chrono::microseconds maxWait) = 0; + /// Acknowledge successful reception of the data buffers received earlier + /// without requesting any more data. The engine will try to minimize the + /// number of acknowledge requests. For example if there is enough buffer + /// memory available to send an another data request right away an explicit + /// acknowledge will not be issued. + virtual ContinueFuture acknowledge() { + return {}; + }; + /// Close the exchange source. May be called before all data /// has been received and processed. This can happen in case /// of an error or an operator like Limit aborting the query diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index e94807d68e7c6..4c0a705cb551e 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -15,6 +15,7 @@ */ #include #include +#include #include #include "velox/common/base/tests/GTestUtils.h" #include "velox/exec/Exchange.h" @@ -45,6 +46,8 @@ class ExchangeClientTest : public testing::Test, } bufferManager_ = OutputBufferManager::getInstance().lock(); + + common::testutil::TestValue::enable(); } std::unique_ptr toSerializedPage(const RowVectorPtr& vector) { @@ -336,7 +339,6 @@ TEST_F(ExchangeClientTest, multiPageFetch) { TEST_F(ExchangeClientTest, sourceTimeout) { constexpr int32_t kNumSources = 3; - common::testutil::TestValue::enable(); auto client = std::make_shared("test", 17, 1 << 20, pool(), executor()); @@ -440,8 +442,12 @@ TEST_F(ExchangeClientTest, acknowledge) { executor()); auto clientCloseGuard = folly::makeGuard([client]() { client->close(); }); - client->addRemoteTaskId(sourceTaskId); - client->noMoreRemoteTasks(); + std::atomic_int numberOfAcknowledgeRequests{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::test::LocalExchangeSource::acknowledge", + std::function(([&numberOfAcknowledgeRequests](void*) { + numberOfAcknowledgeRequests++; + }))); { // adding the first page should not block as there is enough space in @@ -456,10 +462,21 @@ TEST_F(ExchangeClientTest, acknowledge) { // client fetches a single page ContinueFuture future; bufferManager_->enqueue(sourceTaskId, 1, makePage(pageSize), &future); + + // start fetching + client->addRemoteTaskId(sourceTaskId); + client->noMoreRemoteTasks(); + ASSERT_TRUE(std::move(future) .via(executor()) .wait(std::chrono::seconds{10}) .isReady()); + +#ifndef NDEBUG + // The client knew there is more data available but could not fetch any more + // Explicit acknowledge was required + EXPECT_EQ(numberOfAcknowledgeRequests, 1); +#endif } { @@ -486,6 +503,11 @@ TEST_F(ExchangeClientTest, acknowledge) { ASSERT_TRUE( std::move(enqueueFuture).wait(std::chrono::seconds{10}).isReady()); +#ifndef NDEBUG + // The client knew there is more data available but could not fetch any more + // Explicit acknowledge was required + EXPECT_EQ(numberOfAcknowledgeRequests, 2); +#endif } // one page is still in the buffer at this point @@ -507,6 +529,12 @@ TEST_F(ExchangeClientTest, acknowledge) { std::this_thread::sleep_for(std::chrono::milliseconds{100}); } ASSERT_TRUE(outputBuffersEmpty); +#ifndef NDEBUG + // The output buffer is empty now + // Explicit acknowledge is not necessary as a blocking getDataSize is sent + // right away + EXPECT_EQ(numberOfAcknowledgeRequests, 2); +#endif } pages = fetchPages(*client, 1); diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 173dd8fec5502..0058eafd0de3f 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -117,7 +117,6 @@ class LocalExchangeSource : public exec::ExchangeSource { return; } - int64_t ackSequence; VeloxPromise requestPromise; { std::vector queuePromises; @@ -133,7 +132,7 @@ class LocalExchangeSource : public exec::ExchangeSource { atEnd_ = true; } if (!data.empty()) { - ackSequence = sequence_ = sequence + pages.size(); + sequence_ = sequence + pages.size(); } } for (auto& promise : queuePromises) { @@ -143,8 +142,6 @@ class LocalExchangeSource : public exec::ExchangeSource { // Outside of queue mutex. if (atEnd_) { buffers->deleteResults(taskId_, destination_); - } else if (!data.empty()) { - buffers->acknowledge(taskId_, destination_, ackSequence); } if (!requestPromise.isFulfilled()) { @@ -165,6 +162,21 @@ class LocalExchangeSource : public exec::ExchangeSource { return request(0, maxWait); } + ContinueFuture acknowledge() override { + common::testutil::TestValue::adjust( + "facebook::velox::exec::test::LocalExchangeSource::acknowledge", + nullptr); + auto buffers = OutputBufferManager::getInstance().lock(); + VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); + int64_t ackSequence; + { + std::lock_guard l(queue_->mutex()); + ackSequence = sequence_; + } + buffers->acknowledge(taskId_, destination_, ackSequence); + return {}; + } + void close() override { checkSetRequestPromise();