From 3a8ea6485baf621562d45ab97780bf4e0b15d0b1 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Mon, 16 Sep 2024 11:21:16 +0100 Subject: [PATCH] HPCC-31984 Changes following review Signed-off-by: Shamser Ahmed --- .../hashdistrib/thhashdistribslave.cpp | 24 ++++++++++++------- thorlcr/thorutil/thbuf.cpp | 10 ++++---- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 4b322a50a45..d83caae9c24 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -95,6 +95,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl size32_t fixedEstSize; Owned pipewr; Owned piperd; + mutable CriticalSection critPiperd; protected: /* @@ -1183,14 +1184,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ihash = _ihash; iCompare = _iCompare; keepBestCompare = _keepBestCompare; - if (allowSpill) { - StringBuffer temp; - GetTempFilePath(temp,"hddrecvbuff"); - piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); + CriticalBlock block(critPiperd); + if (allowSpill) + { + StringBuffer temp; + GetTempFilePath(temp,"hddrecvbuff"); + piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); + } + else + piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize)); } - else - piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize)); pipewr.set(piperd->queryWriter()); connected = true; @@ -1232,7 +1236,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl deserializer = NULL; fixedEstSize = 0; input.clear(); - piperd.clear(); + { + CriticalBlock block(critPiperd); + piperd.clear(); + } pipewr.clear(); ihash = NULL; iCompare = NULL; @@ -1433,9 +1440,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl virtual void mergeStats(CRuntimeStatisticCollection &stats) const { + sender.mergeStats(stats); + CriticalBlock block(critPiperd); if (piperd) mergeRemappedStats(stats, piperd, diskToTempStatsMap); - sender.mergeStats(stats); } // IExceptionHandler impl. virtual bool fireException(IException *e) diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 27635697751..3c62a6ec6e1 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -76,6 +76,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl ThorRowQueue *out; CFileOwner tmpFileOwner; Owned tempFileIO; + mutable CriticalSection critTmpFileIO; SpinLock lock; bool waiting; Semaphore waitsem; @@ -146,6 +147,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl } if (!tempFileIO) { SpinUnblock unblock(lock); + CriticalBlock block(critTmpFileIO); tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw)); if (!tempFileIO) { @@ -426,6 +428,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl } virtual unsigned __int64 getStatistic(StatisticKind kind) const override { + CriticalBlock block(critTmpFileIO); if (tempFileIO) return tempFileIO->getStatistic(kind); else @@ -1075,12 +1078,9 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, virtual unsigned __int64 getStatistic(StatisticKind kind) const { unsigned __int64 v = inactiveStats.queryStatistic(kind).get(); + CriticalBlock b(critCurrentOutputIFileIO); if (currentOutputIFileIO) - { - CriticalBlock b(critCurrentOutputIFileIO); - if (currentOutputIFileIO) - v += currentOutputIFileIO->getStatistic(kind); - } + v += currentOutputIFileIO->getStatistic(kind); return v; } // IRowStream