diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 37598562349..9995b70ddfd 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -305,6 +305,8 @@ enum StatisticKind StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite, + StSizePeakTempDisk, + StSizePeakEphemeralDisk, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 96e8978a88c..9e8842bd038 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" }, { CYCLESTAT(BlobFetch) }, - { PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" }, + { PEAKSIZESTAT(GraphSpill), "High water mark for inter-subgraph spill size" }, { TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" }, @@ -977,6 +977,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { NUMSTAT(LocalRows), "Number of rows handled locally"}, { NUMSTAT(RemoteRows), "Number of rows sent to remote workers"}, { SIZESTAT(RemoteWrite), "Size of data sent to remote workers"}, + { PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"}, + { PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"}, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b0dd15219c6..feb7591c556 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2703,12 +2703,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; + Linked tempFileSizeTracker; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) - : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN) + CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker) + : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker) { count = 0; writer = NULL; @@ -2726,7 +2727,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface prefix.append(bucketN).append('_').append(desc); GetTempFilePath(tempname, prefix.str()); OwnedIFile iFile = createIFile(tempname.str()); - spillFile.setown(new CFileOwner(iFile.getLink())); + spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker)); if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true)) { rwFlags |= rw_compress; @@ -2762,6 +2763,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface writer = NULL; spillFileIO->flush(); mergeStats(stats, this); + spillFile->noteSize(getStatistic(StSizeSpillFile)); spillFileIO.clear(); } inline __int64 getStatistic(StatisticKind kind) const @@ -3423,7 +3425,7 @@ void CHashTableRowTable::rehash(const void **newRows) CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows) : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows), - rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN) + rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker()) { spilt = false; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index 07c9a601cc4..1624baa5717 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf, im CSingletonLock CABserializerlock; CSingletonLock CABdeserializerlock; roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone; + Owned tempFileSizeTracker; protected: CGraphElementBase &container; @@ -1171,6 +1172,21 @@ class graph_decl CActivityBase : implements CInterfaceOf, im IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0); IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0); + CFileSizeTracker * queryTempFileSizeTracker() + { + if (!tempFileSizeTracker) + tempFileSizeTracker.setown(new CFileSizeTracker); + return tempFileSizeTracker; + } + offset_t queryActiveTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0; + } + offset_t queryPeakTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryPeakSize() : 0; + } + // IExceptionHandler bool fireException(IException *e); __declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn)); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 5173eaf2ae5..43bf54587f7 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats); IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler(); + offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0; if (tempHandler) - stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize()); + stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill); + + // calculate peak spill size + if (started&&initialized) + { + offset_t activeTempSize = 0; + Owned iter = getConnectedIterator(); + ForEach (*iter) + { + CGraphElementBase &element = iter->query(); + CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity(); + activeTempSize += activity.queryActiveTempSize(); + } + if (activeTempSize > peakTempSize) + peakTempSize = activeTempSize; + } + if (peakTempSize) + stats.mergeStatistic(StSizePeakTempDisk, peakTempSize); + if (peakTempSize + sizeGraphSpill) + stats.mergeStatistic(StSizePeakEphemeralDisk, peakTempSize + sizeGraphSpill); stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 7b43999dbab..c0239e4e027 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -224,7 +224,12 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres void startLookAhead(unsigned index); bool isLookAheadActive(unsigned index) const; void setupSpace4FileStats(unsigned where, bool statsForMultipleFiles, bool isSuper, unsigned numSubs, const StatisticsMapping & statsMapping); - virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { } + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const + { + offset_t peakTempSize = queryPeakTempSize(); + if (peakTempSize) + activeStats.mergeStatistic(StSizePeakTempDisk, peakTempSize); + } public: IMPLEMENT_IINTERFACE_USING(CActivityBase) @@ -262,7 +267,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; - // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; @@ -516,6 +520,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase bool doneInit = false; std::atomic_bool progressActive; ProcessInfo processStartInfo; + offset_t peakTempSize = 0; public: diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c45ea6c3940..a97fc94c5f8 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -88,14 +88,13 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); -const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics); +const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics); const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); - MODULE_INIT(INIT_PRIORITY_STANDARD) { ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT)); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 8a737512363..73d57dbea31 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -308,19 +308,58 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded virtual bool action() = 0; }; +// Tracks the current and peak storage used for some files +class CFileSizeTracker: public CInterface +{ + RelaxedAtomic activeSize{0}; + RelaxedAtomic peakSize{0}; +public: + void growSize(offset_t size) + { + if (size) + { + offset_t newActiveSize = activeSize.add_fetch(size); + peakSize.store_max(newActiveSize); + } + } + void shrinkSize(offset_t size) + { + if (size) + activeSize.fetch_sub(size); + } + offset_t queryActiveSize() const + { + return activeSize.load(); + } + offset_t queryPeakSize() const + { + return peakSize.load(); + } +}; + // simple class which takes ownership of the underlying file and deletes it on destruction class graph_decl CFileOwner : public CSimpleInterface, implements IInterface { OwnedIFile iFile; + Linked fileSizeTracker; + offset_t fileSize = 0; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFileOwner(IFile *_iFile) : iFile(_iFile) + CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } ~CFileOwner() { + if (fileSizeTracker) + fileSizeTracker->shrinkSize(fileSize); iFile->remove(); } + void noteSize(offset_t size) + { + fileSize = size; + if (fileSizeTracker) + fileSizeTracker->growSize(fileSize); + } IFile &queryIFile() const { return *iFile; } };