diff --git a/roxie/roxiemem/roxiemem.cpp b/roxie/roxiemem/roxiemem.cpp index 6e4926a1593..ce9477b1c35 100644 --- a/roxie/roxiemem/roxiemem.cpp +++ b/roxie/roxiemem/roxiemem.cpp @@ -5155,18 +5155,23 @@ class CChunkingRowManager : public CRowManager unsigned pageLimit = getPageLimit(); if (totalPages <= pageLimit) { + unsigned newHeapPages; if (pageLimit != UNLIMITED_PAGES) { //Use compare_exchange so that only one thread can increase the number of pages at a time. //(Don't use atomic_add because we need to check the limit hasn't been exceeded.) - if (!totalHeapPages.compare_exchange_weak(numHeapPages, numHeapPages + numRequested, std::memory_order_relaxed)) + newHeapPages = numHeapPages + numRequested; + if (!totalHeapPages.compare_exchange_weak(numHeapPages, newHeapPages, std::memory_order_relaxed)) continue; } else { //Unlimited pages => just increment the total - totalHeapPages.fetch_add(numRequested, std::memory_order_relaxed); + newHeapPages = totalHeapPages.fetch_add(numRequested, std::memory_order_relaxed) + numRequested; } + + //Ensure the total is correct if two threads allocate big blocks of memory at the same time + totalPages = dataBuffPages + newHeapPages; break; } @@ -5203,9 +5208,15 @@ class CChunkingRowManager : public CRowManager if (totalPages > peakPages) { - if (trackMemoryByActivity) - getPeakActivityUsage(); - peakPages = totalPages; + unsigned value = totalPages; + //Atomically update the peak - this could temporarily reduce the value, but it will eventually be correct. + while (value > peakPages.load()) + value = peakPages.exchange(value, std::memory_order_acq_rel); + + //Check if this thread actually updated the peak value - or did another thread beat us to it. + if (totalPages == peakPages) + if (trackMemoryByActivity) + getPeakActivityUsage(); } return totalPages; }