Skip to content

Commit

Permalink
Do not fail getData for deleted buffer
Browse files Browse the repository at this point in the history
Since requests can arrive out of order
  • Loading branch information
arhimondr committed Mar 21, 2024
1 parent 1f4ce8c commit 2e4c1a5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 12 deletions.
19 changes: 19 additions & 0 deletions CMake/resolve_dependency_modules/boost/FindBoost.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
message(STATUS "Using Boost - Bundled")
set(Boost_FOUND TRUE)
set(Boost_LIBRARIES atomic;context;date_time;filesystem;program_options;regex;system;thread)
list(APPEND Boost_LIBRARIES headers)
list(TRANSFORM Boost_LIBRARIES PREPEND Boost::)
message(STATUS "Boost targets: ${Boost_LIBRARIES}")
18 changes: 9 additions & 9 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,15 +724,15 @@ void OutputBuffer::getData(

VELOX_CHECK_LT(destination, buffers_.size());
auto* buffer = buffers_[destination].get();
VELOX_CHECK_NOT_NULL(
buffer,
"getData received after its buffer is deleted. Destination: {}, sequence: {}",
destination,
sequence);
freed = buffer->acknowledge(sequence, true);
updateAfterAcknowledgeLocked(freed, promises);
data = buffer->getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
if (buffer) {
freed = buffer->acknowledge(sequence, true);
updateAfterAcknowledgeLocked(freed, promises);
data = buffer->getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
} else {
data.data.emplace_back(nullptr);
data.immediate = true;
}
}
releaseAfterAcknowledge(freed, promises);
if (data.immediate) {
Expand Down
34 changes: 31 additions & 3 deletions velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ class OutputBufferManagerTest : public testing::Test {
}
}
bufferManager_->deleteResults(taskId, destination);
// out of order requests are allowed (fetch after delete)
{
struct Response {
std::vector<std::unique_ptr<folly::IOBuf>> pages;
int64_t sequence;
std::vector<int64_t> remainingBytes;
};
folly::Promise<Response> promise;
auto future = promise.getSemiFuture();
bufferManager_->getData(
taskId,
destination,
32'000'000,
nextSequence,
[&promise](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t inSequence,
std::vector<int64_t> remainingBytes) {
promise.setValue(Response{
std::move(pages), inSequence, std::move(remainingBytes)});
});
future.wait();
ASSERT_TRUE(future.isReady());
auto& response = future.value();
ASSERT_EQ(response.sequence, nextSequence);
ASSERT_EQ(response.remainingBytes.size(), 0);
ASSERT_EQ(response.pages.size(), 1);
ASSERT_EQ(response.pages.at(0), nullptr);
}

fetchedPages = nextSequence;
}

Expand Down Expand Up @@ -829,9 +859,7 @@ TEST_F(OutputBufferManagerTest, basicBroadcast) {
acknowledge(taskId, 5, 3);
EXPECT_FALSE(bufferManager_->isFinished(taskId));
deleteResults(taskId, 5);
VELOX_ASSERT_THROW(
fetch(taskId, 5, 0, 1'000'000'000, 2),
"getData received after its buffer is deleted. Destination: 5, sequence: 0");
fetch(taskId, 5, 0, 1'000'000'000, 1, true);

bufferManager_->updateOutputBuffers(taskId, 7, true);
EXPECT_FALSE(bufferManager_->isFinished(taskId));
Expand Down

0 comments on commit 2e4c1a5

Please sign in to comment.