diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index f85244be7b5..a5f2368a375 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -762,6 +762,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf previous.updateDelta(to, stats); } virtual const LogMsgJobInfo & queryJob() const override { return job; } + void reset() + { + stats.reset(); + } }; extern THORHELPER_API bool isActivitySink(ThorActivityKind kind); diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 53265d82167..4a245bb24a7 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -573,10 +573,13 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface filter->describe(out); } - virtual void mergeStats(CRuntimeStatisticCollection & stats) const + virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { + // IO Stats comming from the keyCursor and jhtree cache stats coming from this class's stats if (keyCursor) - keyCursor->mergeStats(stats); + keyCursor->mergeStats(targetStats); // merge IO stats + if (stats.ctx) + targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats } }; diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 1ca005cb6aa..9830a2dba9f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1322,8 +1322,8 @@ const StatisticsMapping noStatistics({}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, - StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, + StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead}); const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..557a82748d0 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity protected: StringAttr logicalFilename; IArrayOf partDescs; + bool isSuperFile = false; IHThorIndexReadBaseArg *helper; IHThorSourceLimitTransformExtra * limitTransformExtra; Owned allocator; @@ -78,8 +79,7 @@ class CIndexReadSlaveBase : public CSlaveActivity Owned lazyIFileIO; mutable CriticalSection ioStatsCS; unsigned fileTableStart = NotFound; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; + std::vector> contextLoggers; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface { @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -169,6 +169,7 @@ class CIndexReadSlaveBase : public CSlaveActivity unsigned p = partNum; while (p keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); - Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false); if (localMerge) { if (!keyIndexSet) @@ -331,7 +332,8 @@ class CIndexReadSlaveBase : public CSlaveActivity return createIndexLookup(keyManager); } } - keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); + //Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider + keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); const ITranslator *translator = translators.item(0); if (translator) keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); @@ -348,40 +350,12 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); - } - else - mergeStats(*fileStats[fileTableStart], partIO); - } - } - void updateStats() - { - // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) - { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartqueryHelper(); limitTransformExtra = nullptr; @@ -555,7 +528,6 @@ class CIndexReadSlaveBase : public CSlaveActivity break; if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (hard) // checkCount checks hard key count only. count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] else @@ -607,6 +579,7 @@ class CIndexReadSlaveBase : public CSlaveActivity IPartDescriptor &part0 = partDescs.item(0); IFileDescriptor &fileDesc = part0.queryOwner(); ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); + isSuperFile = super != nullptr; if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) { @@ -684,7 +657,15 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); + // One contextLoggers per part required: + // 1) superfile: multiple contextLoggers required to allow stats to be tracked at subfile level + // 2) non-superfile: although all stats doesn't need to be tracked at part level, having separate contextLoggers is needed to + // merge both io stats and jhtree stats. (Without multiple contextLoggers, when ForEach(keyManagers)mergeStats() is called, + // the same jhtree stats would be merged multiple times. And ForEach(keyManagers)->mergeStats needs to be called to ensure + // that io stats are merged) + for(unsigned i = 0; i < parts; ++i) + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); } } // IThorDataLink @@ -719,10 +700,35 @@ class CIndexReadSlaveBase : public CSlaveActivity virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { PARENT::gatherActiveStats(activeStats); + if (partDescs.ordinality()) { - CriticalBlock b(ioStatsCS); - if (lazyIFileIO) - mergeStats(activeStats, lazyIFileIO); + // reset required because within loop below, mergeStats() is used to build up stats for each file + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); + for (unsigned partNum=0; partNummapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) + continue; // should not happen + } + if (fileStats.size()>0) + keyManager.mergeStats(*fileStats[fileTableStart+subfile]); + else + keyManager.mergeStats(activeStats); // when just 1 file, merge into activeStats + } + // fileStats[] will be serialized separately so file level stats are tracked (see serializeStats() below) + // Also, merged into the activeStats for activity level stats + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); } activeStats.setStatistic(StNumRowsProcessed, progress); } @@ -733,15 +739,6 @@ class CIndexReadSlaveBase : public CSlaveActivity for (auto &stats: fileStats) stats->serialize(mb); } - virtual void done() override - { - { - CriticalBlock b(ioStatsCS); - updateStats(); - lazyIFileIO.clear(); - } - PARENT::done(); - } }; class CIndexReadSlaveActivity : public CIndexReadSlaveBase @@ -819,7 +816,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... rawSeek = (byte *)temp; } - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) return NULL; const byte *row = currentManager->queryKeyBuffer(); @@ -972,7 +968,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase // IRowStream virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled { keyedLimitCount = sendGetCount(keyedProcessed); @@ -1142,7 +1137,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); @@ -1301,7 +1295,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 9b43086248d..920fa8e61bf 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -310,7 +310,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = 0; Owned dataFile; - Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry); + Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry); if (indexFile) { if (!isFileKey(indexFile)) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index e072e6cc814..9f3eab3b78b 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1248,7 +1248,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem if (limiter) limiter->dec(); // unblocks any requests to start lookup threads } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) = 0; + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const = 0; }; class CKeyLookupLocalBase : public CLookupHandler @@ -1284,7 +1284,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager) { - CStatsScopedThresholdDeltaUpdater scoped(activity.statsUpdater); for (unsigned r=0; rsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger); + KLBlobProviderAdapter adapter(keyManager, nullptr); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1350,6 +1349,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem //Need ptr to std::vector as std::atomic's are not constructable(doesn't have copy constructor) std::unique_ptr>> keyManagers; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; public: CKeyLookupLocalHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) { @@ -1368,6 +1369,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem keyManagers.reset(new std::vector>(parts.size())); for (auto & k: *keyManagers) k = nullptr; + contextLoggers.clear(); + for (unsigned i=0; ireset(); } virtual void addPartNum(unsigned partNum) override { @@ -1382,13 +1392,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy); + keyManager = activity.createPartKeyManager(partNo, copy, contextLoggers[selected]); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } processRows(processing, partNo, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i keyManager{nullptr}; public: - CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) + CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity), contextLogger(jhtreeCacheStatistics, thorJob) { limiter = &activity.lookupThreadLimiter; translators.push_back(nullptr); @@ -1433,33 +1443,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); partKeySet->addIndex(keyIndex.getClear()); } - keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors(), false); + keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); setupTranslation(0, 0, *keyManager); } processRows(processing, 0, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { - if (keyManager) - { - for (size_t i=0; imerge(contextLogger.queryStats()); } - }; class CRemoteLookupHandler : public CLookupHandler { typedef CLookupHandler PARENT; - protected: rank_t lookupSlave = RANK_NULL; mptag_t replyTag = TAG_NULL; @@ -1543,14 +1542,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem for (auto &h: handles) h = 0; } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override - { - /* Note: currently, stats from remote file not tracked */ - } }; class CKeyLookupRemoteHandler : public CRemoteLookupHandler { typedef CRemoteLookupHandler PARENT; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; void initRead(CMessageBuffer &msg, unsigned selected, unsigned partNo, unsigned copy) { @@ -1633,6 +1630,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter = &activity.lookupThreadLimiter; allParts = &activity.allIndexParts; } + virtual void init() override + { + PARENT::init(); + contextLoggers.clear(); + for (unsigned i=0; ireset(); + } virtual StringBuffer &getInfo(StringBuffer &info) const override { return PARENT::getInfo(info).append(", lookupSlave=").append(lookupSlave); @@ -1750,10 +1760,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec. } unsigned __int64 seeks, scans, wildseeks; + unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); - activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); - activity.inactiveStats.sumStatistic(StNumIndexScans, scans); - activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); + mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); + CStatsContextLogger * contextLogger(contextLoggers[selected]); + contextLogger->noteStatistic(StNumIndexSeeks, seeks); + contextLogger->noteStatistic(StNumIndexScans, scans); + contextLogger->noteStatistic(StNumIndexWildSeeks, wildseeks); + contextLogger->noteStatistic(StNumNodeDiskFetches, nodeDiskFetches); + contextLogger->noteStatistic(StNumLeafDiskFetches, leafDiskFetches); + contextLogger->noteStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1763,6 +1779,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem PARENT::end(); doClose(kjs_keyclose); } + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override + { + assertex(parts.size()==contextLoggers.size()); + for (size_t i=0; imerge(contextLogger->queryStats()); + } + } }; class CFetchLocalLookupHandler : public CLookupHandler { @@ -1863,13 +1892,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem diskSeeks++; } } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i> & fileStats, unsigned startOffset) const override {} }; class CReadAheadThread : implements IThreaded { @@ -2189,7 +2220,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem lookupHandler->end(); } } - void getFileStats(std::vector> & fileStats, unsigned startOffset) + void getFileStats(std::vector> & fileStats, unsigned startOffset) const { ForEachItemIn(h, handlers) handlers.item(h)->getFileStats(fileStats, startOffset); @@ -2223,8 +2254,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CPartDescriptorArray allIndexParts; std::vector localIndexParts, localFetchPartMap; IArrayOf tlkKeyIndexes; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; Owned joinFieldsAllocator; OwnedConstThorRow defaultRight; unsigned joinFlags = 0; @@ -2420,7 +2449,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i); const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true); - Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false); tlkManager->setKey(tlkKeyIndex); keyManagers.append(*tlkManager.getClear()); } @@ -2452,10 +2481,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false); } } - IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy) + IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { Owned keyIndex = createPartKeyIndex(partNo, copy, false); - return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) { @@ -2617,7 +2646,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stopReadAhead() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); keyLookupHandlers.flush(); keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result fetchLookupHandlers.flushTS(); @@ -2938,7 +2966,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); @@ -3112,7 +3140,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } ISuperFileDescriptor *superFdesc = numIndexParts?allIndexParts.item(0).queryOwner().querySuperFileDescriptor():nullptr; unsigned numIndexSubFiles = superFdesc?superFdesc->querySubFiles():0; - setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadActivityStatistics); + setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadFileStatistics); setupLookupHandlers(keyLookupHandlers, totalIndexParts, superFdesc, localIndexParts, indexPartToSlaveMap, localKey, forceRemoteKeyedLookup ? ht_remotekeylookup : ht_localkeylookup, ht_remotekeylookup); data.read(totalDataParts); if (totalDataParts) @@ -3368,7 +3396,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); endOfInput = true; // signals to readAhead which is reading input, that is should stop asap. // could be blocked in readAhead(), because CJoinGroup's are no longer being processed @@ -3784,11 +3811,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { return container.queryId(); } - virtual void serializeStats(MemoryBuffer &mb) override + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { + PARENT::gatherActiveStats(activeStats); + // fileStats gather per logical file. Handlers track stats per part. Multiple handlers could be updating the same logical file. + // To allow each handler to update the same logical file without overwriting the other handlers updates, the fileStats are 'merged' into fileStats[] + // So, reset required because the handlers merge stats rather than sets them + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + keyLookupHandlers.getFileStats(fileStats, indexFileStatsTableEntry); fetchLookupHandlers.getFileStats(fileStats, dataFileStatsTableEntry); - + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); + } + virtual void serializeStats(MemoryBuffer &mb) override + { PARENT::serializeStats(mb); mb.append((unsigned)fileStats.size()); for (auto &stats: fileStats) diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 0634b4291cc..3f29955d48c 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -649,13 +649,30 @@ void CMasterActivity::done() void CMasterActivity::updateFileReadCostStats() { - // Updates numDiskReads & readCost in the file attributes and returns the readCost - auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats) + // Returns updates numDiskReads & readCost in the file attributes and returns the readCost + auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats) { - stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); + StringBuffer clusterName; + file->getClusterName(0, clusterName); IPropertyTree & fileAttr = file->queryAttributes(); - cost_type legacyReadCost = getLegacyReadCost(fileAttr, file); - cost_type curReadCost = money2cost_type(calcFileAccessCost(file, 0, curDiskReads)); + cost_type legacyReadCost = 0, curReadCost = 0; + // Legacy files will not have the readCost stored as an attribute + if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads))) + { + // Legacy file: calculate readCost using prev disk reads and new disk reads + stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); + legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads)); + } + stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); + if(useJhtreeCacheStats) + { + stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches) + + stats.getStatisticSum(StNumLeafDiskFetches) + + stats.getStatisticSum(StNumBlobDiskFetches); + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads)); + } + else + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads)); file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost); file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads); return curReadCost; @@ -663,11 +680,17 @@ void CMasterActivity::updateFileReadCostStats() if (fileStats.size()>0) { + ThorActivityKind activityKind = container.getKind(); unsigned fileIndex = 0; diskAccessCost = 0; - for (unsigned i=0; iquerySuperFile(); @@ -677,13 +700,13 @@ void CMasterActivity::updateFileReadCostStats() for (unsigned i=0; iquerySubFile(i, true); - diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]); fileIndex++; } } else { - diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]); fileIndex++; } } @@ -693,7 +716,7 @@ void CMasterActivity::updateFileReadCostStats() { IDistributedFile *file = queryReadFile(0); if (file) - diskAccessCost = updateReadCosts(file, statsCollection); + diskAccessCost += updateReadCosts(true, file, statsCollection); } } diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 5540b72c032..7b43999dbab 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -210,7 +210,10 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool optUnordered = false; // is the output specified as unordered? CriticalSection statsCs; // to be used to protect objects refernce during stat. collection CRuntimeStatisticCollection inactiveStats; // stats collected from previous iteration, to be combined with current 'stats' - std::vector> fileStats; + // fileStats is mutable as it is updated by gatherActiveStats (const member func) + // fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all. + // (Having it in the base class aids setup and resizing.) + mutable std::vector> fileStats; protected: unsigned __int64 queryLocalCycles() const; diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index 80ab3d47800..9574c5e267d 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -115,7 +115,6 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned maxCachedKJManagers = defaultMaxCachedKJManagers; unsigned maxCachedFetchContexts = defaultMaxCachedFetchContexts; unsigned keyLookupMaxProcessThreads = defaultKeyLookupMaxProcessThreads; - CStatsContextLogger contextLogger; class CLookupKey { unsigned hashv = 0; @@ -447,11 +446,13 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, Owned keyManager; unsigned handle = 0; Owned helper; + CStatsContextLogger contextLogger; + public: CKMContainer(CKJService &_service, CKeyLookupContext *_ctx) - : service(_service), ctx(_ctx) + : service(_service), ctx(_ctx), contextLogger(jhtreeCacheStatistics, thorJob) { - keyManager.setown(ctx->createKeyManager(&service.contextLogger)); + keyManager.setown(ctx->createKeyManager(&contextLogger)); StringBuffer tracing; const IDynamicTransform *translator = ctx->queryTranslator(ctx->queryKey().getTracing(tracing)); if (translator) @@ -475,6 +476,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, } inline IHThorKeyedJoinArg *queryHelper() const { return helper; } inline CKJService & queryService() const { return service; } + inline CStatsContextLogger & queryContextLogger() { return contextLogger; } }; template class CKeyedCacheEntry : public CInterface @@ -757,10 +759,8 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned rowCount = getRowCount(); unsigned rowNum = 0; unsigned rowStart = 0; - const CRuntimeStatisticCollection & stats = kmc->queryService().contextLogger.queryStats(); - unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks); - unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans); - unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks); + CStatsContextLogger & contextLogger = kmc->queryContextLogger(); + CRuntimeStatisticCollection startStats(contextLogger.queryStats()); while (!abortSoon) { OwnedConstThorRow row = getRowClear(rowNum++); @@ -770,9 +770,15 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, if (last || (replyMb.length() >= DEFAULT_KEYLOOKUP_MAXREPLYSZ)) { countMarker.write(rowNum-rowStart); - replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks); - replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans); - replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks); + + CRuntimeStatisticCollection deltaStats(startStats.queryMapping()); + contextLogger.updateStatsDeltaTo(deltaStats, startStats); + replyMb.append(deltaStats.getStatisticValue(StNumIndexSeeks)); + replyMb.append(deltaStats.getStatisticValue(StNumIndexScans)); + replyMb.append(deltaStats.getStatisticValue(StNumIndexWildSeeks)); + replyMb.append(deltaStats.getStatisticValue(StNumNodeDiskFetches)); + replyMb.append(deltaStats.getStatisticValue(StNumLeafDiskFetches)); + replyMb.append(deltaStats.getStatisticValue(StNumBlobDiskFetches)); if (activityCtx->useMessageCompression()) { fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray()); @@ -1189,7 +1195,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, public: IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf); - CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag), contextLogger(jhtreeCacheStatistics, thorJob) + CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag) { setupProcessorPool(); } diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 9eb7d43f8bf..207d2a4b0cd 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -78,9 +78,10 @@ const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); -const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics); const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); -const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, indexReadFileStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics); const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ee667d4246c..5801109bf7c 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -154,6 +154,7 @@ extern graph_decl const StatisticsMapping graphStatistics; extern graph_decl const StatisticsMapping indexDistribActivityStatistics; extern graph_decl const StatisticsMapping soapcallActivityStatistics; +extern graph_decl const StatisticsMapping indexReadFileStatistics; class BooleanOnOff { bool &tf;