Skip to content

Commit

Permalink
HPCC-31984 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Sep 17, 2024
1 parent 51b1c6c commit 3a8ea64
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
24 changes: 16 additions & 8 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
size32_t fixedEstSize;
Owned<IRowWriter> pipewr;
Owned<ISmartRowBuffer> piperd;
mutable CriticalSection critPiperd;

protected:
/*
Expand Down Expand Up @@ -1183,14 +1184,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = _ihash;
iCompare = _iCompare;
keepBestCompare = _keepBestCompare;
if (allowSpill)
{
StringBuffer temp;
GetTempFilePath(temp,"hddrecvbuff");
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
CriticalBlock block(critPiperd);
if (allowSpill)
{
StringBuffer temp;
GetTempFilePath(temp,"hddrecvbuff");
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
}
else
piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize));
}
else
piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize));

pipewr.set(piperd->queryWriter());
connected = true;
Expand Down Expand Up @@ -1232,7 +1236,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
deserializer = NULL;
fixedEstSize = 0;
input.clear();
piperd.clear();
{
CriticalBlock block(critPiperd);
piperd.clear();
}
pipewr.clear();
ihash = NULL;
iCompare = NULL;
Expand Down Expand Up @@ -1433,9 +1440,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
sender.mergeStats(stats);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
Expand Down
10 changes: 5 additions & 5 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
ThorRowQueue *out;
CFileOwner tmpFileOwner;
Owned<IFileIO> tempFileIO;
mutable CriticalSection critTmpFileIO;
SpinLock lock;
bool waiting;
Semaphore waitsem;
Expand Down Expand Up @@ -146,6 +147,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
}
if (!tempFileIO) {
SpinUnblock unblock(lock);
CriticalBlock block(critTmpFileIO);
tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw));
if (!tempFileIO)
{
Expand Down Expand Up @@ -426,6 +428,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
CriticalBlock block(critTmpFileIO);
if (tempFileIO)
return tempFileIO->getStatistic(kind);
else
Expand Down Expand Up @@ -1075,12 +1078,9 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
virtual unsigned __int64 getStatistic(StatisticKind kind) const
{
unsigned __int64 v = inactiveStats.queryStatistic(kind).get();
CriticalBlock b(critCurrentOutputIFileIO);
if (currentOutputIFileIO)
{
CriticalBlock b(critCurrentOutputIFileIO);
if (currentOutputIFileIO)
v += currentOutputIFileIO->getStatistic(kind);
}
v += currentOutputIFileIO->getStatistic(kind);
return v;
}
// IRowStream
Expand Down

0 comments on commit 3a8ea64

Please sign in to comment.