Skip to content

Commit

Permalink
Merge pull request #18482 from shamser/issue28757
Browse files Browse the repository at this point in the history
HPCC-28757 New StSizePeakEphemeralDisk and StSizePeakTempDisk for graphs

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored May 23, 2024
2 parents 4040cbe + f3030f1 commit e5240c5
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 12 deletions.
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ enum StatisticKind
StNumLocalRows,
StNumRemoteRows,
StSizeRemoteWrite,
StSizePeakTempDisk,
StSizePeakEphemeralDisk,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
4 changes: 3 additions & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(LeafFetch) },
{ TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" },
{ CYCLESTAT(BlobFetch) },
{ PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" },
{ PEAKSIZESTAT(GraphSpill), "High water mark for inter-subgraph spill size" },
{ TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" },
{ CYCLESTAT(AgentQueue) },
{ TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" },
Expand Down Expand Up @@ -977,6 +977,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ NUMSTAT(LocalRows), "Number of rows handled locally"},
{ NUMSTAT(RemoteRows), "Number of rows sent to remote workers"},
{ SIZESTAT(RemoteWrite), "Size of data sent to remote workers"},
{ PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"},
{ PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"},
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
10 changes: 6 additions & 4 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2703,12 +2703,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Linked<CFileSizeTracker> tempFileSizeTracker;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker)
{
count = 0;
writer = NULL;
Expand All @@ -2726,7 +2727,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
prefix.append(bucketN).append('_').append(desc);
GetTempFilePath(tempname, prefix.str());
OwnedIFile iFile = createIFile(tempname.str());
spillFile.setown(new CFileOwner(iFile.getLink()));
spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker));
if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
rwFlags |= rw_compress;
Expand Down Expand Up @@ -2762,6 +2763,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSize(getStatistic(StSizeSpillFile));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
Expand Down Expand Up @@ -3423,7 +3425,7 @@ void CHashTableRowTable::rehash(const void **newRows)

CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
: owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker())

{
spilt = false;
Expand Down
16 changes: 16 additions & 0 deletions thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
CSingletonLock CABserializerlock;
CSingletonLock CABdeserializerlock;
roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone;
Owned<CFileSizeTracker> tempFileSizeTracker;

protected:
CGraphElementBase &container;
Expand Down Expand Up @@ -1171,6 +1172,21 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0);
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0);

CFileSizeTracker * queryTempFileSizeTracker()
{
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
return tempFileSizeTracker;
}
offset_t queryActiveTempSize() const
{
return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0;
}
offset_t queryPeakTempSize() const
{
return tempFileSizeTracker ? tempFileSizeTracker->queryPeakSize() : 0;
}

// IExceptionHandler
bool fireException(IException *e);
__declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn));
Expand Down
22 changes: 21 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats);

IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler();
offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0;
if (tempHandler)
stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize());
stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill);

// calculate peak spill size
if (started&&initialized)
{
offset_t activeTempSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeTempSize += activity.queryActiveTempSize();
}
if (activeTempSize > peakTempSize)
peakTempSize = activeTempSize;
}
if (peakTempSize)
stats.mergeStatistic(StSizePeakTempDisk, peakTempSize);
if (peakTempSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakEphemeralDisk, peakTempSize + sizeGraphSpill);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down
9 changes: 7 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
void startLookAhead(unsigned index);
bool isLookAheadActive(unsigned index) const;
void setupSpace4FileStats(unsigned where, bool statsForMultipleFiles, bool isSuper, unsigned numSubs, const StatisticsMapping & statsMapping);
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { }
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
offset_t peakTempSize = queryPeakTempSize();
if (peakTempSize)
activeStats.mergeStatistic(StSizePeakTempDisk, peakTempSize);
}
public:
IMPLEMENT_IINTERFACE_USING(CActivityBase)

Expand Down Expand Up @@ -262,7 +267,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;


// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
Expand Down Expand Up @@ -516,6 +520,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
offset_t peakTempSize = 0;

public:

Expand Down
5 changes: 2 additions & 3 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);


MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
Expand Down
41 changes: 40 additions & 1 deletion thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,58 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded
virtual bool action() = 0;
};

// Tracks the current and peak storage used for some files
class CFileSizeTracker: public CInterface
{
RelaxedAtomic<offset_t> activeSize{0};
RelaxedAtomic<offset_t> peakSize{0};
public:
void growSize(offset_t size)
{
if (size)
{
offset_t newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(offset_t size)
{
if (size)
activeSize.fetch_sub(size);
}
offset_t queryActiveSize() const
{
return activeSize.load();
}
offset_t queryPeakSize() const
{
return peakSize.load();
}
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
{
OwnedIFile iFile;
Linked<CFileSizeTracker> fileSizeTracker;
offset_t fileSize = 0;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile) : iFile(_iFile)
CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
~CFileOwner()
{
if (fileSizeTracker)
fileSizeTracker->shrinkSize(fileSize);
iFile->remove();
}
void noteSize(offset_t size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
}
IFile &queryIFile() const { return *iFile; }
};

Expand Down

0 comments on commit e5240c5

Please sign in to comment.