From f3030f14859b0f3c0ad82b877af16216d7459123 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Apr 2024 11:34:04 +0100 Subject: [PATCH] HPCC-28757 New StSizePeakEphemeralDisk and StSizePeakTempDisk for graphs StSizePeakTempDisk tracks the high water mark for intra-graph temp files. StSizePeakEphemeralDisk tracks the overall high water mark for temp files (i.e. both intra-graph and inter-graph temp files) Note at present, only hash dedup produces the information necessary for generating these 2 stats and so the stats will not be accurate until further work is completed. Other activities that make use of temp files(such as join and sort) will need to be amended to the info necessary to produce these stats. Signed-off-by: Shamser Ahmed HPCC-28757 Change following review Signed-off-by: Shamser Ahmed HPCC-28757 Changes following review Signed-off-by: Shamser Ahmed HPCC-28757 Rename StSizePeakSubgraphTemp to StSizePeakTempDisk and publish StSizePeakTempDisk for activities Signed-off-by: Shamser Ahmed --- system/jlib/jstatcodes.h | 2 + system/jlib/jstats.cpp | 4 +- .../hashdistrib/thhashdistribslave.cpp | 10 +++-- thorlcr/graph/thgraph.hpp | 16 ++++++++ thorlcr/graph/thgraphslave.cpp | 22 +++++++++- thorlcr/graph/thgraphslave.hpp | 9 +++- thorlcr/thorutil/thormisc.cpp | 5 +-- thorlcr/thorutil/thormisc.hpp | 41 ++++++++++++++++++- 8 files changed, 97 insertions(+), 12 deletions(-) diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 37598562349..9995b70ddfd 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -305,6 +305,8 @@ enum StatisticKind StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite, + StSizePeakTempDisk, + StSizePeakEphemeralDisk, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 96e8978a88c..9e8842bd038 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" }, { CYCLESTAT(BlobFetch) }, - { PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" }, + { PEAKSIZESTAT(GraphSpill), "High water mark for inter-subgraph spill size" }, { TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" }, @@ -977,6 +977,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { NUMSTAT(LocalRows), "Number of rows handled locally"}, { NUMSTAT(RemoteRows), "Number of rows sent to remote workers"}, { SIZESTAT(RemoteWrite), "Size of data sent to remote workers"}, + { PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"}, + { PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"}, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 6e95732a78d..5e63067e231 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2703,12 +2703,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; + Linked tempFileSizeTracker; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) - : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN) + CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker) + : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker) { count = 0; writer = NULL; @@ -2726,7 +2727,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface prefix.append(bucketN).append('_').append(desc); GetTempFilePath(tempname, prefix.str()); OwnedIFile iFile = createIFile(tempname.str()); - spillFile.setown(new CFileOwner(iFile.getLink())); + spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker)); if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true)) { rwFlags |= rw_compress; @@ -2762,6 +2763,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface writer = NULL; spillFileIO->flush(); mergeStats(stats, this); + spillFile->noteSize(getStatistic(StSizeSpillFile)); spillFileIO.clear(); } inline __int64 getStatistic(StatisticKind kind) const @@ -3423,7 +3425,7 @@ void CHashTableRowTable::rehash(const void **newRows) CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows) : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows), - rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN) + rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker()) { spilt = false; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index ab5a28fc326..ab8022e07b5 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf, im CSingletonLock CABserializerlock; CSingletonLock CABdeserializerlock; roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone; + Owned tempFileSizeTracker; protected: CGraphElementBase &container; @@ -1171,6 +1172,21 @@ class graph_decl CActivityBase : implements CInterfaceOf, im IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0); IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0); + CFileSizeTracker * queryTempFileSizeTracker() + { + if (!tempFileSizeTracker) + tempFileSizeTracker.setown(new CFileSizeTracker); + return tempFileSizeTracker; + } + offset_t queryActiveTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0; + } + offset_t queryPeakTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryPeakSize() : 0; + } + // IExceptionHandler bool fireException(IException *e); __declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn)); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 6c1d9868eba..ea70ab645db 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats); IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler(); + offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0; if (tempHandler) - stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize()); + stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill); + + // calculate peak spill size + if (started&&initialized) + { + offset_t activeTempSize = 0; + Owned iter = getConnectedIterator(); + ForEach (*iter) + { + CGraphElementBase &element = iter->query(); + CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity(); + activeTempSize += activity.queryActiveTempSize(); + } + if (activeTempSize > peakTempSize) + peakTempSize = activeTempSize; + } + if (peakTempSize) + stats.mergeStatistic(StSizePeakTempDisk, peakTempSize); + if (peakTempSize + sizeGraphSpill) + stats.mergeStatistic(StSizePeakEphemeralDisk, peakTempSize + sizeGraphSpill); stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 7b43999dbab..c0239e4e027 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -224,7 +224,12 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres void startLookAhead(unsigned index); bool isLookAheadActive(unsigned index) const; void setupSpace4FileStats(unsigned where, bool statsForMultipleFiles, bool isSuper, unsigned numSubs, const StatisticsMapping & statsMapping); - virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { } + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const + { + offset_t peakTempSize = queryPeakTempSize(); + if (peakTempSize) + activeStats.mergeStatistic(StSizePeakTempDisk, peakTempSize); + } public: IMPLEMENT_IINTERFACE_USING(CActivityBase) @@ -262,7 +267,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; - // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; @@ -516,6 +520,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase bool doneInit = false; std::atomic_bool progressActive; ProcessInfo processStartInfo; + offset_t peakTempSize = 0; public: diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c45ea6c3940..a97fc94c5f8 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -88,14 +88,13 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); -const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics); +const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics); const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); - MODULE_INIT(INIT_PRIORITY_STANDARD) { ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT)); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 8a737512363..73d57dbea31 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -308,19 +308,58 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded virtual bool action() = 0; }; +// Tracks the current and peak storage used for some files +class CFileSizeTracker: public CInterface +{ + RelaxedAtomic activeSize{0}; + RelaxedAtomic peakSize{0}; +public: + void growSize(offset_t size) + { + if (size) + { + offset_t newActiveSize = activeSize.add_fetch(size); + peakSize.store_max(newActiveSize); + } + } + void shrinkSize(offset_t size) + { + if (size) + activeSize.fetch_sub(size); + } + offset_t queryActiveSize() const + { + return activeSize.load(); + } + offset_t queryPeakSize() const + { + return peakSize.load(); + } +}; + // simple class which takes ownership of the underlying file and deletes it on destruction class graph_decl CFileOwner : public CSimpleInterface, implements IInterface { OwnedIFile iFile; + Linked fileSizeTracker; + offset_t fileSize = 0; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFileOwner(IFile *_iFile) : iFile(_iFile) + CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } ~CFileOwner() { + if (fileSizeTracker) + fileSizeTracker->shrinkSize(fileSize); iFile->remove(); } + void noteSize(offset_t size) + { + fileSize = size; + if (fileSizeTracker) + fileSizeTracker->growSize(fileSize); + } IFile &queryIFile() const { return *iFile; } };