diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 2314a94ab65..f8e22d19861 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -302,6 +302,8 @@ enum StatisticKind StSizeContinuationData, StNumContinuationRequests, StNumFailures, + StSizePeakSubgraphTemp, + StSizePeakEphemeralStorage, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dcab0b9ef1..f1c717ccaa8 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" }, { CYCLESTAT(BlobFetch) }, - { PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" }, + { PEAKSIZESTAT(GraphSpill), "Peak size of graph spills" }, { TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" }, @@ -974,6 +974,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" }, { NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" }, { NUMSTAT(Failures), "The number of times a query has failed" }, + { PEAKSIZESTAT(PeakSubgraphTemp), "The high water mark for intra-graph temp files"}, + { PEAKSIZESTAT(PeakEphemeralStorage), "The high water mark for emphemeral storage use"}, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 89b95eea189..c495e1ab2cb 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2653,12 +2653,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; + Linked tempFileSizeTracker; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) - : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN) + CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker) + : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker) { count = 0; writer = NULL; @@ -2676,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface prefix.append(bucketN).append('_').append(desc); GetTempFilePath(tempname, prefix.str()); OwnedIFile iFile = createIFile(tempname.str()); - spillFile.setown(new CFileOwner(iFile.getLink())); + spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker)); if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true)) { rwFlags |= rw_compress; @@ -2712,6 +2713,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface writer = NULL; spillFileIO->flush(); mergeStats(stats, this); + spillFile->noteSpill(getStatistic(StSizeSpillFile)); spillFileIO.clear(); } inline __int64 getStatistic(StatisticKind kind) const @@ -3373,7 +3375,7 @@ void CHashTableRowTable::rehash(const void **newRows) CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows) : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows), - rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN) + rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker()) { spilt = false; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index ab5a28fc326..de5d4f35894 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf, im CSingletonLock CABserializerlock; CSingletonLock CABdeserializerlock; roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone; + Owned tempFileSizeTracker; protected: CGraphElementBase &container; @@ -1171,6 +1172,17 @@ class graph_decl CActivityBase : implements CInterfaceOf, im IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0); IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0); + CFileSizeTracker * queryTempFileSizeTracker() + { + if (!tempFileSizeTracker) + tempFileSizeTracker.setown(new CFileSizeTracker); + return tempFileSizeTracker; + } + offset_t queryActiveTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0; + } + // IExceptionHandler bool fireException(IException *e); __declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn)); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 6c1d9868eba..18f66511f9c 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats); IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler(); + offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0; if (tempHandler) - stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize()); + stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill); + + // calculate peak spill size + if (started&&initialized) + { + offset_t activeTempSize = 0; + Owned iter = getConnectedIterator(); + ForEach (*iter) + { + CGraphElementBase &element = iter->query(); + CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity(); + activeTempSize += activity.queryActiveTempSize(); + } + if (activeTempSize > peakTempSize) + peakTempSize = activeTempSize; + } + if (peakTempSize) + stats.mergeStatistic(StSizePeakSubgraphTemp, peakTempSize); + if (peakTempSize + sizeGraphSpill) + stats.mergeStatistic(StSizePeakEphemeralStorage, peakTempSize + sizeGraphSpill); stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 7b43999dbab..b9297f89695 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -262,7 +262,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; - // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; @@ -516,6 +515,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase bool doneInit = false; std::atomic_bool progressActive; ProcessInfo processStartInfo; + offset_t peakTempSize = 0; public: diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c39a612e50b..8a651c5b0e1 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -88,7 +88,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSubgraphTemp, StSizePeakEphemeralStorage, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 21cae128b7f..8b4f3b00bf3 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -306,22 +306,62 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded virtual bool action() = 0; }; +// Tracks the current and peak storage used for some files +class CFileSizeTracker: public CInterface +{ + RelaxedAtomic activeSize{0}; + RelaxedAtomic peakSize{0}; +public: + void growSize(offset_t size) + { + if (size) + { + offset_t newActiveSize = activeSize.add_fetch(size); + peakSize.store_max(newActiveSize); + } + } + void shrinkSize(offset_t size) + { + if (size) + activeSize.fetch_sub(size); + } + offset_t queryActiveSize() const + { + return activeSize.load(); + } + offset_t queryPeakSize() const + { + return peakSize.load(); + } +}; + // simple class which takes ownership of the underlying file and deletes it on destruction class graph_decl CFileOwner : public CSimpleInterface, implements IInterface { OwnedIFile iFile; + Linked fileSizeTracker; + offset_t fileSize = 0; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFileOwner(IFile *_iFile) : iFile(_iFile) + CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } ~CFileOwner() { + if (fileSizeTracker) + fileSizeTracker->shrinkSize(fileSize); iFile->remove(); } + void noteSpill(offset_t size) + { + fileSize = size; + if (fileSizeTracker) + fileSizeTracker->growSize(fileSize); + } IFile &queryIFile() const { return *iFile; } }; + // stream wrapper, that takes ownership of a CFileOwner class graph_decl CStreamFileOwner : public CSimpleInterfaceOf { @@ -359,7 +399,6 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf } }; - #define DEFAULT_THORMASTERPORT 20000 #define DEFAULT_THORSLAVEPORT 20100 #define DEFAULT_SLAVEPORTINC 20