From 471e477f522aa38c22fbbf56723cb3d1c1ff49a3 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 14 Nov 2023 14:16:25 +0000 Subject: [PATCH] Changes following review Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 4 + dali/base/dadfs.cpp | 3 +- system/jhtree/jhtree.cpp | 5 +- .../activities/indexread/thindexreadslave.cpp | 105 ++++++------- thorlcr/activities/keyedjoin/thkeyedjoin.cpp | 2 +- .../activities/keyedjoin/thkeyedjoinslave.cpp | 142 +++++++++++------- thorlcr/graph/thgraphmaster.cpp | 6 +- thorlcr/graph/thgraphslave.hpp | 5 +- thorlcr/thorutil/thormisc.cpp | 3 +- thorlcr/thorutil/thormisc.hpp | 1 + 10 files changed, 152 insertions(+), 124 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index 9edd2ea3741..7a6e8c7d597 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -750,6 +750,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf previous.updateDelta(to, stats); } virtual const LogMsgJobInfo & queryJob() const override { return job; } + void reset() + { + stats.reset(); + } }; extern THORHELPER_API bool isActivitySink(ThorActivityKind kind); diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 08543f4825c..965cc10d057 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -226,8 +226,7 @@ extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWr extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads) { StringBuffer clusterName; - // Should really specify the cluster number too, but this is the best we can do for now - f->getClusterName(0, clusterName); + f->getClusterName(0, clusterName); // Should really specify the cluster number too, but good enough for now return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads); } diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 99bbd5d5684..0afdaedaf9f 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -575,10 +575,11 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { + // IO Stats comming from the keyCursor and jhtree cache stats coming from this class's stats if (keyCursor) - keyCursor->mergeStats(targetStats); + keyCursor->mergeStats(targetStats); // merge IO stats if (stats.ctx) - targetStats.merge(stats.ctx->queryStats()); + targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats } }; diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index 38df9cc39de..557a82748d0 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity protected: StringAttr logicalFilename; IArrayOf partDescs; + bool isSuperFile = false; IHThorIndexReadBaseArg *helper; IHThorSourceLimitTransformExtra * limitTransformExtra; Owned allocator; @@ -78,8 +79,7 @@ class CIndexReadSlaveBase : public CSlaveActivity Owned lazyIFileIO; mutable CriticalSection ioStatsCS; unsigned fileTableStart = NotFound; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; + std::vector> contextLoggers; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface { @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -169,6 +169,7 @@ class CIndexReadSlaveBase : public CSlaveActivity unsigned p = partNum; while (p keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); - Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false); if (localMerge) { if (!keyIndexSet) @@ -331,7 +332,8 @@ class CIndexReadSlaveBase : public CSlaveActivity return createIndexLookup(keyManager); } } - keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); + //Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider + keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); const ITranslator *translator = translators.item(0); if (translator) keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); @@ -348,44 +350,12 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (!currentManager) - return; - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - currentManager->mergeStats(*fileStats[fileTableStart+subfile]); - } - else - { - currentManager->mergeStats(*fileStats[fileTableStart]); - } - } - } - void updateStats() - { - // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) - { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartqueryHelper(); limitTransformExtra = nullptr; @@ -559,7 +528,6 @@ class CIndexReadSlaveBase : public CSlaveActivity break; if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (hard) // checkCount checks hard key count only. count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] else @@ -611,6 +579,7 @@ class CIndexReadSlaveBase : public CSlaveActivity IPartDescriptor &part0 = partDescs.item(0); IFileDescriptor &fileDesc = part0.queryOwner(); ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); + isSuperFile = super != nullptr; if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) { @@ -688,7 +657,15 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); + // One contextLoggers per part required: + // 1) superfile: multiple contextLoggers required to allow stats to be tracked at subfile level + // 2) non-superfile: although all stats doesn't need to be tracked at part level, having separate contextLoggers is needed to + // merge both io stats and jhtree stats. (Without multiple contextLoggers, when ForEach(keyManagers)mergeStats() is called, + // the same jhtree stats would be merged multiple times. And ForEach(keyManagers)->mergeStats needs to be called to ensure + // that io stats are merged) + for(unsigned i = 0; i < parts; ++i) + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); } } // IThorDataLink @@ -723,10 +700,35 @@ class CIndexReadSlaveBase : public CSlaveActivity virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { PARENT::gatherActiveStats(activeStats); + if (partDescs.ordinality()) { - CriticalBlock b(ioStatsCS); - if (lazyIFileIO) - mergeStats(activeStats, lazyIFileIO); + // reset required because within loop below, mergeStats() is used to build up stats for each file + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); + for (unsigned partNum=0; partNummapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) + continue; // should not happen + } + if (fileStats.size()>0) + keyManager.mergeStats(*fileStats[fileTableStart+subfile]); + else + keyManager.mergeStats(activeStats); // when just 1 file, merge into activeStats + } + // fileStats[] will be serialized separately so file level stats are tracked (see serializeStats() below) + // Also, merged into the activeStats for activity level stats + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); } activeStats.setStatistic(StNumRowsProcessed, progress); } @@ -737,15 +739,6 @@ class CIndexReadSlaveBase : public CSlaveActivity for (auto &stats: fileStats) stats->serialize(mb); } - virtual void done() override - { - { - CriticalBlock b(ioStatsCS); - updateStats(); - lazyIFileIO.clear(); - } - PARENT::done(); - } }; class CIndexReadSlaveActivity : public CIndexReadSlaveBase @@ -823,7 +816,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... rawSeek = (byte *)temp; } - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) return NULL; const byte *row = currentManager->queryKeyBuffer(); @@ -976,7 +968,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase // IRowStream virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled { keyedLimitCount = sendGetCount(keyedProcessed); @@ -1146,7 +1137,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); @@ -1305,7 +1295,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 8a687180a98..8130a5d9d2f 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -304,7 +304,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = 0; Owned dataFile; - Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry); + Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry); if (indexFile) { if (!isFileKey(indexFile)) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index a973d9e74d8..ea766f28df5 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -959,8 +959,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { protected: CKeyedJoinSlave &activity; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; IThorRowInterfaces *rowIf; IHThorKeyedJoinArg *helper = nullptr; std::vector queues; @@ -997,8 +995,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } public: CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this), - activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit), - contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger) + activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit) { helper = activity.helper; } @@ -1087,7 +1084,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stop() { - CStatsScopedDeltaUpdater scoped(statsUpdater); stopped = true; join(); for (auto &queue : queues) @@ -1159,7 +1155,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void flush() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); // NB: queueLookup() must be protected from re-entry by caller for (unsigned b=0; bdec(); // unblocks any requests to start lookup threads } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) = 0; + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const = 0; }; class CKeyLookupLocalBase : public CLookupHandler @@ -1288,7 +1283,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager) { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); for (unsigned r=0; rsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &contextLogger); + KLBlobProviderAdapter adapter(keyManager, nullptr); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1354,6 +1348,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem //Need ptr to std::vector as std::atomic's are not constructable(doesn't have copy constructor) std::unique_ptr>> keyManagers; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; public: CKeyLookupLocalHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) { @@ -1372,6 +1368,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem keyManagers.reset(new std::vector>(parts.size())); for (auto & k: *keyManagers) k = nullptr; + contextLoggers.clear(); + for (unsigned i=0; ireset(); } virtual void addPartNum(unsigned partNum) override { @@ -1386,36 +1391,33 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger); + keyManager = activity.createPartKeyManager(partNo, copy, contextLoggers[selected]); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } processRows(processing, partNo, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; imergeStats(*fileStats[startOffset+subfile]); - } - else - keyManager->mergeStats(*fileStats[startOffset]); + assertex(parts.size()==contextLoggers.size()); + CStatsContextLogger * contextLogger = contextLoggers[i]; + // Superfile: entry is StartOffset + subfileNum + // Otherwise, use 'startOffset' entry for the stats + unsigned fileEntry = isSuper ? (startOffset+subFileNum[i]) : startOffset; + assertex(fileEntry < fileStats.size()); + fileStats[fileEntry]->merge(contextLogger->queryStats()); } } }; class CKeyLookupMergeHandler : public CKeyLookupLocalBase { typedef CKeyLookupLocalBase PARENT; - + CStatsContextLogger contextLogger; std::atomic keyManager{nullptr}; public: - CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) + CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity), contextLogger(jhtreeCacheStatistics, thorJob) { limiter = &activity.lookupThreadLimiter; translators.push_back(nullptr); @@ -1437,33 +1439,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); partKeySet->addIndex(keyIndex.getClear()); } - keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors(), false); + keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); setupTranslation(0, 0, *keyManager); } processRows(processing, 0, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { - if (keyManager) - { - for (size_t i=0; imerge(contextLogger.queryStats()); } - }; class CRemoteLookupHandler : public CLookupHandler { typedef CLookupHandler PARENT; - protected: rank_t lookupSlave = RANK_NULL; mptag_t replyTag = TAG_NULL; @@ -1547,14 +1538,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem for (auto &h: handles) h = 0; } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override - { - /* Note: currently, stats from remote file not tracked */ - } }; class CKeyLookupRemoteHandler : public CRemoteLookupHandler { typedef CRemoteLookupHandler PARENT; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; void initRead(CMessageBuffer &msg, unsigned selected, unsigned partNo, unsigned copy) { @@ -1637,6 +1626,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter = &activity.lookupThreadLimiter; allParts = &activity.allIndexParts; } + virtual void init() override + { + PARENT::init(); + contextLoggers.clear(); + for (unsigned i=0; ireset(); + } virtual StringBuffer &getInfo(StringBuffer &info) const override { return PARENT::getInfo(info).append(", lookupSlave=").append(lookupSlave); @@ -1757,12 +1759,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); - activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); - activity.inactiveStats.sumStatistic(StNumIndexScans, scans); - activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); - activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches); - activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches); - activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches); + CStatsContextLogger * contextLogger(contextLoggers[selected]); + contextLogger->noteStatistic(StNumIndexSeeks, seeks); + contextLogger->noteStatistic(StNumIndexScans, scans); + contextLogger->noteStatistic(StNumIndexWildSeeks, wildseeks); + contextLogger->noteStatistic(StNumNodeDiskFetches, nodeDiskFetches); + contextLogger->noteStatistic(StNumLeafDiskFetches, leafDiskFetches); + contextLogger->noteStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1772,6 +1775,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem PARENT::end(); doClose(kjs_keyclose); } + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override + { + for (size_t i=0; imerge(contextLogger->queryStats()); + } + } }; class CFetchLocalLookupHandler : public CLookupHandler { @@ -1812,7 +1828,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void process(CThorExpandingRowArray &processing, unsigned selected) override { - CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater); unsigned partCopy = parts[selected]; unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; @@ -1873,13 +1888,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem diskSeeks++; } } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i> & fileStats, unsigned startOffset) const override {} }; class CReadAheadThread : implements IThreaded { @@ -2199,7 +2216,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem lookupHandler->end(); } } - void getFileStats(std::vector> & fileStats, unsigned startOffset) + void getFileStats(std::vector> & fileStats, unsigned startOffset) const { ForEachItemIn(h, handlers) handlers.item(h)->getFileStats(fileStats, startOffset); @@ -3120,7 +3137,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } ISuperFileDescriptor *superFdesc = numIndexParts?allIndexParts.item(0).queryOwner().querySuperFileDescriptor():nullptr; unsigned numIndexSubFiles = superFdesc?superFdesc->querySubFiles():0; - setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadActivityStatistics); + setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadFileStatistics); setupLookupHandlers(keyLookupHandlers, totalIndexParts, superFdesc, localIndexParts, indexPartToSlaveMap, localKey, forceRemoteKeyedLookup ? ht_remotekeylookup : ht_localkeylookup, ht_remotekeylookup); data.read(totalDataParts); if (totalDataParts) @@ -3791,11 +3808,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { return container.queryId(); } - virtual void serializeStats(MemoryBuffer &mb) override + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { + PARENT::gatherActiveStats(activeStats); + // fileStats gather per logical file. Handlers track stats per part. Multiple handlers could be updating the same logical file. + // To allow each handler to update the same logical file without overwriting the other handlers updates, the fileStats are 'merged' into fileStats[] + // So, reset required because the handlers merge stats rather than sets them + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + keyLookupHandlers.getFileStats(fileStats, indexFileStatsTableEntry); fetchLookupHandlers.getFileStats(fileStats, dataFileStatsTableEntry); - + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); + } + virtual void serializeStats(MemoryBuffer &mb) override + { PARENT::serializeStats(mb); mb.append((unsigned)fileStats.size()); for (auto &stats: fileStats) diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 8d9f85a2001..f4dd1734768 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -680,12 +680,14 @@ void CMasterActivity::updateFileReadCostStats() if (fileStats.size()>0) { + ThorActivityKind activityKind = container.getKind(); unsigned fileIndex = 0; - for (unsigned i=0; i> fileStats; + // fileStats is mutable as it is updated by gatherActiveStats (const member func) + // fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all. + // (Having it in the base class aids setup and resizing.) + mutable std::vector> fileStats; protected: unsigned __int64 queryLocalCycles() const; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 9eb7d43f8bf..78428be4630 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -78,7 +78,8 @@ const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); -const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics); const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ee667d4246c..5801109bf7c 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -154,6 +154,7 @@ extern graph_decl const StatisticsMapping graphStatistics; extern graph_decl const StatisticsMapping indexDistribActivityStatistics; extern graph_decl const StatisticsMapping soapcallActivityStatistics; +extern graph_decl const StatisticsMapping indexReadFileStatistics; class BooleanOnOff { bool &tf;