Skip to content

Commit

Permalink
Use estimateFlatSize in LocalPartition
Browse files Browse the repository at this point in the history
Summary:
Using retained size is problematic as it may account for memory shared between
multiple vectors.

For example in the join operators vectors are wrapped in dictionary and shared
between multiple output vectors. It was observed that a Join may produce
vectors that retain over 40MB of data while having a flat size of little over
2MB.

When the size reported is high the LocalPartition operator has to block
frequently reducing query performance.

Differential Revision: D67601403
  • Loading branch information
arhimondr authored and facebook-github-bot committed Dec 23, 2024
1 parent a6842bb commit 6faa920
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void LocalPartition::addInput(RowVectorPtr input) {
if (numPartitions_ == 1) {
ContinueFuture future;
auto blockingReason =
queues_[0]->enqueue(input, input->retainedSize(), &future);
queues_[0]->enqueue(input, input->estimateFlatSize(), &future);
if (blockingReason != BlockingReason::kNotBlocked) {
blockingReasons_.push_back(blockingReason);
futures_.push_back(std::move(future));
Expand All @@ -320,7 +320,7 @@ void LocalPartition::addInput(RowVectorPtr input) {
if (singlePartition.has_value()) {
ContinueFuture future;
auto blockingReason = queues_[singlePartition.value()]->enqueue(
input, input->retainedSize(), &future);
input, input->estimateFlatSize(), &future);
if (blockingReason != BlockingReason::kNotBlocked) {
blockingReasons_.push_back(blockingReason);
futures_.push_back(std::move(future));
Expand All @@ -342,7 +342,7 @@ void LocalPartition::addInput(RowVectorPtr input) {
++maxIndex[partition];
}

const int64_t totalSize = input->retainedSize();
const int64_t totalSize = input->estimateFlatSize();
for (auto i = 0; i < numPartitions_; i++) {
auto partitionSize = maxIndex[i];
if (partitionSize == 0) {
Expand Down

0 comments on commit 6faa920

Please sign in to comment.