Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Distinct aggregation OOM when getOutput #8025

Open
ccat3z opened this issue Nov 22, 2024 · 19 comments
Open

[VL] Distinct aggregation OOM when getOutput #8025

ccat3z opened this issue Nov 22, 2024 · 19 comments
Labels
bug Something isn't working triage

Comments

@ccat3z
Copy link
Contributor

ccat3z commented Nov 22, 2024

Backend

VL (Velox)

Bug description

Distinct aggregation will merge all sorted spill file in getOutput() (SpillPartition::createOrderedReader). If there are too many spill files, reading the first batch of each file into memory will consume a significant amount of memory. In one of our internal cases, one task generated 300 spill files, which requires close to 3G of memory.

image

Possible workarounds:

  1. Increase kMaxSpillRunRows, 1M will generate too many spill files for hundreds million rows of input. [GLUTEN-7249][VL] Lower default overhead memory ratio and spill run size #7531
  2. Reduce kSpillWriteBufferSize to 1M or lower. Why it is set to 4M by default? Is there any experience in performance tuning?

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@ccat3z ccat3z added bug Something isn't working triage labels Nov 22, 2024
@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 23, 2024

Looks it's the same issue as shuffle spill. All spill merge should have the same issue. we should solve it by similar way.

  1. is the tradeoff of spill file# and overhead memory.
  2. It's set by pr [VL] Add 3 configs of spill #5088 , I didn't do any test on it. Not sure why I set it to 4M while Velox default 1M. Let's decrease the default value to 1M, also check the other configs

What's the vanilla spark's spill buffer size? is it configurable? in theory vanilla spark has the same issue as Gluten. @jinchengchenghh do you know?

@FelixYBW
Copy link
Contributor

I can only find the configuration: spark.shuffle.spill.diskWriteBufferSize.

No spill merge one.

@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 23, 2024

Thank you, @ccat3z . I encounted the same issue in orderby operator and debugged several days!

@jinchengchenghh
Copy link
Contributor

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value?
Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

for (auto& fileInfo : files_) {
    streams.push_back(FileSpillMergeStream::create(
        SpillReadFile::create(fileInfo, bufferSize, pool, spillStats)));
  }

input_ = std::make_unique<common::FileInputStream>(
      std::move(file), bufferSize, pool_);

FileInputStream::FileInputStream(
    std::unique_ptr<ReadFile>&& file,
    uint64_t bufferSize,
    memory::MemoryPool* pool)
    : file_(std::move(file)),
      fileSize_(file_->size()),
      bufferSize_(std::min(fileSize_, bufferSize)),
      pool_(pool),
      readAheadEnabled_((bufferSize_ < fileSize_) && file_->hasPreadvAsync()) {
  VELOX_CHECK_NOT_NULL(pool_);
  VELOX_CHECK_GT(fileSize_, 0, "Empty FileInputStream");

  buffers_.push_back(AlignedBuffer::allocate<char>(bufferSize_, pool_)); // allocate buffer cause OOM
  if (readAheadEnabled_) {
    buffers_.push_back(AlignedBuffer::allocate<char>(bufferSize_, pool_));
  }
  readNextRange();
}

kSpillWriteBufferSize controls the serialization buffer, if up to this threshold, flush and compress the buffer.

@jinchengchenghh
Copy link
Contributor

jinchengchenghh commented Nov 25, 2024

Spark also open all the spill file to read.

final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(
        recordComparatorSupplier.get(), prefixComparator, spillWriters.size());
      for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
        spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager));
      }
      if (inMemSorter != null) {
        readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
        spillMerger.addSpillIfNotEmpty(readingIterator);
      }
      return spillMerger.getSortedIterator();

Spark use the PriorityQueue<UnsafeSorterIterator> priorityQueue to get the record to merge.

Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
      int prefixComparisonResult =
        prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
      if (prefixComparisonResult == 0) {
        return recordComparator.compare(
          left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
          right.getBaseObject(), right.getBaseOffset(), right.getRecordLength());
      } else {
        return prefixComparisonResult;
      }
    };
    priorityQueue = new PriorityQueue<>(numSpills, comparator);

It has config to control the read buffer size (default 1 MB) as following:

  private[spark] val UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED =
    ConfigBuilder("spark.unsafe.sorter.spill.read.ahead.enabled")
      .internal()
      .version("2.3.0")
      .booleanConf
      .createWithDefault(true)

  private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
    ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size")
      .internal()
      .version("2.1.0")
      .bytesConf(ByteUnit.BYTE)
      .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES,
        s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
      .createWithDefault(1024 * 1024)

class UnsafeSorterSpillReader

final InputStream bs =
        new NioBufferedFileInputStream(file, bufferSizeBytes);
if (readAheadEnabled) {
        this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
                bufferSizeBytes);
      } else {
        this.in = serializerManager.wrapStream(blockId, bs);
      }
      this.din = new DataInputStream(this.in);

It needs to load the bufferSize in NioBufferedFileInputStream, and 2 bufferSize in ReadAheadInputStream, after loaded, it will put the UnsafeSorterIterator reader to the priorityQueue again to load next record. But the buffer is allocated by ByteBuffer, not tracked by Spark memory pool. @FelixYBW

@jinchengchenghh
Copy link
Contributor

In that case, we need to respect UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE.
Velox spill RowVector, so we must read the buffer size ahead. But it's better to add a config for total buffer size when UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED is false.

@FelixYBW
Copy link
Contributor

Thank you, @jinchengchenghh . With the tuning of kMaxSpillRunRows and kSpillWriteBufferSize. one of my task succeed but the other one still fails. Looks like it still have some large memory allocation in getoutput.

@FelixYBW
Copy link
Contributor

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

Can you add it as config in Gluten?

@FelixYBW
Copy link
Contributor

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

should we propose the way of #7861?

@FelixYBW
Copy link
Contributor

In that case, we need to respect UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. Velox spill RowVector, so we must read the buffer size ahead. But it's better to add a config for total buffer size when UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED is false.

So the worst case of Vanilla spark is also 1M buffer per file, right? Let's hornor the value of spark.unsafe.sorter.spill.reader.buffer.size then. It may be set in queries.

@jinchengchenghh
Copy link
Contributor

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

should we propose the way of #7861?

#7861 releases the buffer after read, Velox FileInputStream reuse the readBufferSize to read the file. So it is the similar way.

@jinchengchenghh
Copy link
Contributor

In that case, we need to respect UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. Velox spill RowVector, so we must read the buffer size ahead. But it's better to add a config for total buffer size when UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED is false.

So the worst case of Vanilla spark is also 1M buffer per file, right? Let's hornor the value of spark.unsafe.sorter.spill.reader.buffer.size then. It may be set in queries.

Yes, I will draft a PR to respect this config.

@jinchengchenghh
Copy link
Contributor

Thank you, @jinchengchenghh . With the tuning of kMaxSpillRunRows and kSpillWriteBufferSize. one of my task succeed but the other one still fails. Looks like it still have some large memory allocation in getoutput.

Maybe because the Streams will hold all the buffers, and released after all the files read completed.
Compress consumes much buffer but not tracked by memory pool in the meantime. https://github.com/facebookincubator/velox/blob/main/velox/serializers/PrestoSerializer.cpp#L4416

I don't see the compression in Spark spill, so it doesn't need to request memory for compression.

I will add a new config to control the velox spill codec.

It is still OOM or kill by yarn?

@jinchengchenghh
Copy link
Contributor

Spark closes the reader when loadNext after all the records consumed.
I would prefer to close the input file after file is atEnd, https://github.com/facebookincubator/velox/blob/main/velox/exec/SpillFile.cpp#L314
It may benefits release the memory as early as possible.

@jinchengchenghh
Copy link
Contributor

UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE

And also close the serializer in serde.

@FelixYBW
Copy link
Contributor

#7861 releases the buffer after read, Velox FileInputStream reuse the readBufferSize to read the file. So it is the similar way.

No, 7681 uses mmap, memory is mapped into user space directly. velox uses file read/write, data is copied to buffer.

@FelixYBW
Copy link
Contributor

s, I will draft a PR to respect

I'm adding to #8026

@FelixYBW
Copy link
Contributor

I will add a new config to control the velox spill codec.

It is still OOM or kill by yarn?

we already should have spill codec and spill buffer configured.
OOM, not killed by yarn.

@jinchengchenghh
Copy link
Contributor

jinchengchenghh commented Nov 26, 2024

Now it is Spark codec spark.io.compression.codec, I add the config to control it separately.
PrestoVectorSerde has Header, so it should read the compressed data and deserialize to RowVector. And hold RowVector for each file. The many RowVectors may cause OOM. Write kSpillWriteBufferSize flushes the RowVector. So for each file, kSpillReadBufferSize to reserve buffer in FileInputStream, one RowVector which size is approximately to kSpillWriteBufferSize.

In SortBuffer getOutputWithSpill, the std::vector<const RowVector*> spillSources_; and std::vector<vector_size_t> spillSourceRows_; size is equal to outputSize, which also holds memory but not tracked, may cause Kill By Yarn. I will draft a PR to track the memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

3 participants