Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

BulkExecutor 2.3.0-preview2: temporal memory leak when frequently call bulk operations #60

Open
spzSource opened this issue Aug 6, 2019 · 1 comment

Comments

@spzSource
Copy link

spzSource commented Aug 6, 2019

Context

Package version: 2.3.0-preview2

We had a massive issue in temporal memory leak when we tried use BulkExecutor in our ETL pipeline for data load. There were 4 VMs performing data ingestion by batches. Each batch contains 2000 - 5000 elements. Constantly one or two of VMs were falling down due to out of memory. Situation gets much worse when multiple Bulk- operations perform on same VM simultaneously.

Investigation

Analysing decompiled code for BulkInsert method, I've noticed a strange usage of ConcurrentBag collection. Simplified piece of code you can find below:

...
var documentsToImportByPartition = 
    new ConcurrentDictionary<string, ConcurrentBag<string>>();
var miniBatchesToImportByPartition = 
    new ConcurrentDictionary<string, List<List<string>>>();

foreach (string partitionKeyRangeId in this.partitionKeyRangeIds)
{
    documentsToImportByPartition[partitionKeyRangeId] = new ConcurrentBag<string>();
    miniBatchesToImportByPartition[partitionKeyRangeId] = new List<List<string>>();
}
...
Parallel.ForEach<string>(
    GetNextPage<string>(sourceEnumerator, pageSize), 
    documentString => documentsToImportByPartition[partitionKeyId].Add(documentString));
...

As you might see, instance of ConcurrentBag in documentsToImportByPartition dictionary gets populated, but never consumed. But since ConcurrentBag uses ThreadLocals to keep separate list of items per thread, it means that the items will be garbage collected only when finalizer of ThreadLocal called. Considering the fact that bag creation and population was performed frequently, we ended up with huge number of 'rubbish' retained by ThreadLocal.

Reproduction

To show the problem the following code can be used:

    [TestFixture]
    public class ConcurrentBagTests
    {
        private class TestPayload
        {
            private int[] _data = new int[1024 * 10];
        }

        [Test]
        public async Task ConcurrentBag_MultiThreadPopulation()
        {
            for (int i = 0; i < 100; i++)
            {
                await PopulateBag();
            }
        }

        private async Task PopulateBag()
        {
            var bag = new ConcurrentBag<TestPayload>();

            for (int i = 0; i < 1000; i++)
            {
                var routines = new List<Task>
                {
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),

                };

                await Task.WhenAll(routines);
            }
        }
    }

Here is a memory diagram, which showing 1,2GB of memory usage at the peak:

image

If we rewrite previous test by using a simple List object:

        private async Task PopulateList()
        {
            var bag = new List<TestPayload>();

            for (int i = 0; i < 1000; i++)
            {
                var routines = new List<Task>
                {
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload())),
                    Task.Run(() => bag.Add(new TestPayload()))
                };

                await Task.WhenAll(routines);
            }
        }

Memory diagram looks like depicted below (only 485 MB allocated at the peak):

image

Possible fixes

There are multiple options:

  • just clean-up allocated ConcurrentBag objects after bulk operation finished, so it should reduce lifetime of items and items can be collected by GC;
  • use different collection, for instance ConcurrentQueue.

Looking forward for the issue to be addressed. I'm happy to assist for further investigation and fix.

@spzSource spzSource changed the title BulkExecutor: temporal memory leak when frequently call bulk operations BulkExecutor 2.3.0-preview2: temporal memory leak when frequently call bulk operations Aug 6, 2019
@abinav2307
Copy link
Contributor

Thank you Alex for bringing this to our attention. We are actively looking into this.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants