From d8db4d35aca38d508f27f2661a2fdeedf695e67f Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 7 Nov 2023 05:00:05 +0000 Subject: [PATCH] HPCC-30599 Fix file access costs for keyed join Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 4 + system/jhtree/jhtree.cpp | 7 +- system/jlib/jstats.cpp | 4 +- thorlcr/activities/keyedjoin/thkeyedjoin.cpp | 2 +- .../activities/keyedjoin/thkeyedjoinslave.cpp | 128 ++++++++++++------ thorlcr/graph/thgraphmaster.cpp | 41 ++++-- thorlcr/graph/thgraphslave.hpp | 5 +- thorlcr/slave/slavmain.cpp | 28 ++-- thorlcr/thorutil/thormisc.cpp | 5 +- thorlcr/thorutil/thormisc.hpp | 1 + 10 files changed, 152 insertions(+), 73 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index f85244be7b5..a5f2368a375 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -762,6 +762,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf previous.updateDelta(to, stats); } virtual const LogMsgJobInfo & queryJob() const override { return job; } + void reset() + { + stats.reset(); + } }; extern THORHELPER_API bool isActivitySink(ThorActivityKind kind); diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 53265d82167..4a245bb24a7 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -573,10 +573,13 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface filter->describe(out); } - virtual void mergeStats(CRuntimeStatisticCollection & stats) const + virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { + // IO Stats comming from the keyCursor and jhtree cache stats coming from this class's stats if (keyCursor) - keyCursor->mergeStats(stats); + keyCursor->mergeStats(targetStats); // merge IO stats + if (stats.ctx) + targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats } }; diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 1ca005cb6aa..9830a2dba9f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1322,8 +1322,8 @@ const StatisticsMapping noStatistics({}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, - StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, + StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead}); const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 9b43086248d..920fa8e61bf 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -310,7 +310,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = 0; Owned dataFile; - Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry); + Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry); if (indexFile) { if (!isFileKey(indexFile)) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index e072e6cc814..9f3eab3b78b 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1248,7 +1248,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem if (limiter) limiter->dec(); // unblocks any requests to start lookup threads } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) = 0; + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const = 0; }; class CKeyLookupLocalBase : public CLookupHandler @@ -1284,7 +1284,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager) { - CStatsScopedThresholdDeltaUpdater scoped(activity.statsUpdater); for (unsigned r=0; rsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger); + KLBlobProviderAdapter adapter(keyManager, nullptr); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1350,6 +1349,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem //Need ptr to std::vector as std::atomic's are not constructable(doesn't have copy constructor) std::unique_ptr>> keyManagers; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; public: CKeyLookupLocalHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) { @@ -1368,6 +1369,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem keyManagers.reset(new std::vector>(parts.size())); for (auto & k: *keyManagers) k = nullptr; + contextLoggers.clear(); + for (unsigned i=0; ireset(); } virtual void addPartNum(unsigned partNum) override { @@ -1382,13 +1392,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy); + keyManager = activity.createPartKeyManager(partNo, copy, contextLoggers[selected]); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } processRows(processing, partNo, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i keyManager{nullptr}; public: - CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) + CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity), contextLogger(jhtreeCacheStatistics, thorJob) { limiter = &activity.lookupThreadLimiter; translators.push_back(nullptr); @@ -1433,33 +1443,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); partKeySet->addIndex(keyIndex.getClear()); } - keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors(), false); + keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); setupTranslation(0, 0, *keyManager); } processRows(processing, 0, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { - if (keyManager) - { - for (size_t i=0; imerge(contextLogger.queryStats()); } - }; class CRemoteLookupHandler : public CLookupHandler { typedef CLookupHandler PARENT; - protected: rank_t lookupSlave = RANK_NULL; mptag_t replyTag = TAG_NULL; @@ -1543,14 +1542,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem for (auto &h: handles) h = 0; } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override - { - /* Note: currently, stats from remote file not tracked */ - } }; class CKeyLookupRemoteHandler : public CRemoteLookupHandler { typedef CRemoteLookupHandler PARENT; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; void initRead(CMessageBuffer &msg, unsigned selected, unsigned partNo, unsigned copy) { @@ -1633,6 +1630,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter = &activity.lookupThreadLimiter; allParts = &activity.allIndexParts; } + virtual void init() override + { + PARENT::init(); + contextLoggers.clear(); + for (unsigned i=0; ireset(); + } virtual StringBuffer &getInfo(StringBuffer &info) const override { return PARENT::getInfo(info).append(", lookupSlave=").append(lookupSlave); @@ -1750,10 +1760,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec. } unsigned __int64 seeks, scans, wildseeks; + unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); - activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); - activity.inactiveStats.sumStatistic(StNumIndexScans, scans); - activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); + mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); + CStatsContextLogger * contextLogger(contextLoggers[selected]); + contextLogger->noteStatistic(StNumIndexSeeks, seeks); + contextLogger->noteStatistic(StNumIndexScans, scans); + contextLogger->noteStatistic(StNumIndexWildSeeks, wildseeks); + contextLogger->noteStatistic(StNumNodeDiskFetches, nodeDiskFetches); + contextLogger->noteStatistic(StNumLeafDiskFetches, leafDiskFetches); + contextLogger->noteStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1763,6 +1779,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem PARENT::end(); doClose(kjs_keyclose); } + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override + { + assertex(parts.size()==contextLoggers.size()); + for (size_t i=0; imerge(contextLogger->queryStats()); + } + } }; class CFetchLocalLookupHandler : public CLookupHandler { @@ -1863,13 +1892,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem diskSeeks++; } } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i> & fileStats, unsigned startOffset) const override {} }; class CReadAheadThread : implements IThreaded { @@ -2189,7 +2220,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem lookupHandler->end(); } } - void getFileStats(std::vector> & fileStats, unsigned startOffset) + void getFileStats(std::vector> & fileStats, unsigned startOffset) const { ForEachItemIn(h, handlers) handlers.item(h)->getFileStats(fileStats, startOffset); @@ -2223,8 +2254,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CPartDescriptorArray allIndexParts; std::vector localIndexParts, localFetchPartMap; IArrayOf tlkKeyIndexes; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; Owned joinFieldsAllocator; OwnedConstThorRow defaultRight; unsigned joinFlags = 0; @@ -2420,7 +2449,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i); const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true); - Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false); tlkManager->setKey(tlkKeyIndex); keyManagers.append(*tlkManager.getClear()); } @@ -2452,10 +2481,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false); } } - IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy) + IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { Owned keyIndex = createPartKeyIndex(partNo, copy, false); - return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) { @@ -2617,7 +2646,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stopReadAhead() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); keyLookupHandlers.flush(); keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result fetchLookupHandlers.flushTS(); @@ -2938,7 +2966,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); @@ -3112,7 +3140,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } ISuperFileDescriptor *superFdesc = numIndexParts?allIndexParts.item(0).queryOwner().querySuperFileDescriptor():nullptr; unsigned numIndexSubFiles = superFdesc?superFdesc->querySubFiles():0; - setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadActivityStatistics); + setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadFileStatistics); setupLookupHandlers(keyLookupHandlers, totalIndexParts, superFdesc, localIndexParts, indexPartToSlaveMap, localKey, forceRemoteKeyedLookup ? ht_remotekeylookup : ht_localkeylookup, ht_remotekeylookup); data.read(totalDataParts); if (totalDataParts) @@ -3368,7 +3396,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); endOfInput = true; // signals to readAhead which is reading input, that is should stop asap. // could be blocked in readAhead(), because CJoinGroup's are no longer being processed @@ -3784,11 +3811,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { return container.queryId(); } - virtual void serializeStats(MemoryBuffer &mb) override + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { + PARENT::gatherActiveStats(activeStats); + // fileStats gather per logical file. Handlers track stats per part. Multiple handlers could be updating the same logical file. + // To allow each handler to update the same logical file without overwriting the other handlers updates, the fileStats are 'merged' into fileStats[] + // So, reset required because the handlers merge stats rather than sets them + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + keyLookupHandlers.getFileStats(fileStats, indexFileStatsTableEntry); fetchLookupHandlers.getFileStats(fileStats, dataFileStatsTableEntry); - + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); + } + virtual void serializeStats(MemoryBuffer &mb) override + { PARENT::serializeStats(mb); mb.append((unsigned)fileStats.size()); for (auto &stats: fileStats) diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 0634b4291cc..3f29955d48c 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -649,13 +649,30 @@ void CMasterActivity::done() void CMasterActivity::updateFileReadCostStats() { - // Updates numDiskReads & readCost in the file attributes and returns the readCost - auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats) + // Returns updates numDiskReads & readCost in the file attributes and returns the readCost + auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats) { - stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); + StringBuffer clusterName; + file->getClusterName(0, clusterName); IPropertyTree & fileAttr = file->queryAttributes(); - cost_type legacyReadCost = getLegacyReadCost(fileAttr, file); - cost_type curReadCost = money2cost_type(calcFileAccessCost(file, 0, curDiskReads)); + cost_type legacyReadCost = 0, curReadCost = 0; + // Legacy files will not have the readCost stored as an attribute + if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads))) + { + // Legacy file: calculate readCost using prev disk reads and new disk reads + stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); + legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads)); + } + stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); + if(useJhtreeCacheStats) + { + stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches) + + stats.getStatisticSum(StNumLeafDiskFetches) + + stats.getStatisticSum(StNumBlobDiskFetches); + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads)); + } + else + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads)); file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost); file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads); return curReadCost; @@ -663,11 +680,17 @@ void CMasterActivity::updateFileReadCostStats() if (fileStats.size()>0) { + ThorActivityKind activityKind = container.getKind(); unsigned fileIndex = 0; diskAccessCost = 0; - for (unsigned i=0; iquerySuperFile(); @@ -677,13 +700,13 @@ void CMasterActivity::updateFileReadCostStats() for (unsigned i=0; iquerySubFile(i, true); - diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]); fileIndex++; } } else { - diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]); fileIndex++; } } @@ -693,7 +716,7 @@ void CMasterActivity::updateFileReadCostStats() { IDistributedFile *file = queryReadFile(0); if (file) - diskAccessCost = updateReadCosts(file, statsCollection); + diskAccessCost += updateReadCosts(true, file, statsCollection); } } diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 5540b72c032..7b43999dbab 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -210,7 +210,10 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool optUnordered = false; // is the output specified as unordered? CriticalSection statsCs; // to be used to protect objects refernce during stat. collection CRuntimeStatisticCollection inactiveStats; // stats collected from previous iteration, to be combined with current 'stats' - std::vector> fileStats; + // fileStats is mutable as it is updated by gatherActiveStats (const member func) + // fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all. + // (Having it in the base class aids setup and resizing.) + mutable std::vector> fileStats; protected: unsigned __int64 queryLocalCycles() const; diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index 80ab3d47800..9574c5e267d 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -115,7 +115,6 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned maxCachedKJManagers = defaultMaxCachedKJManagers; unsigned maxCachedFetchContexts = defaultMaxCachedFetchContexts; unsigned keyLookupMaxProcessThreads = defaultKeyLookupMaxProcessThreads; - CStatsContextLogger contextLogger; class CLookupKey { unsigned hashv = 0; @@ -447,11 +446,13 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, Owned keyManager; unsigned handle = 0; Owned helper; + CStatsContextLogger contextLogger; + public: CKMContainer(CKJService &_service, CKeyLookupContext *_ctx) - : service(_service), ctx(_ctx) + : service(_service), ctx(_ctx), contextLogger(jhtreeCacheStatistics, thorJob) { - keyManager.setown(ctx->createKeyManager(&service.contextLogger)); + keyManager.setown(ctx->createKeyManager(&contextLogger)); StringBuffer tracing; const IDynamicTransform *translator = ctx->queryTranslator(ctx->queryKey().getTracing(tracing)); if (translator) @@ -475,6 +476,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, } inline IHThorKeyedJoinArg *queryHelper() const { return helper; } inline CKJService & queryService() const { return service; } + inline CStatsContextLogger & queryContextLogger() { return contextLogger; } }; template class CKeyedCacheEntry : public CInterface @@ -757,10 +759,8 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned rowCount = getRowCount(); unsigned rowNum = 0; unsigned rowStart = 0; - const CRuntimeStatisticCollection & stats = kmc->queryService().contextLogger.queryStats(); - unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks); - unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans); - unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks); + CStatsContextLogger & contextLogger = kmc->queryContextLogger(); + CRuntimeStatisticCollection startStats(contextLogger.queryStats()); while (!abortSoon) { OwnedConstThorRow row = getRowClear(rowNum++); @@ -770,9 +770,15 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, if (last || (replyMb.length() >= DEFAULT_KEYLOOKUP_MAXREPLYSZ)) { countMarker.write(rowNum-rowStart); - replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks); - replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans); - replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks); + + CRuntimeStatisticCollection deltaStats(startStats.queryMapping()); + contextLogger.updateStatsDeltaTo(deltaStats, startStats); + replyMb.append(deltaStats.getStatisticValue(StNumIndexSeeks)); + replyMb.append(deltaStats.getStatisticValue(StNumIndexScans)); + replyMb.append(deltaStats.getStatisticValue(StNumIndexWildSeeks)); + replyMb.append(deltaStats.getStatisticValue(StNumNodeDiskFetches)); + replyMb.append(deltaStats.getStatisticValue(StNumLeafDiskFetches)); + replyMb.append(deltaStats.getStatisticValue(StNumBlobDiskFetches)); if (activityCtx->useMessageCompression()) { fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray()); @@ -1189,7 +1195,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, public: IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf); - CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag), contextLogger(jhtreeCacheStatistics, thorJob) + CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag) { setupProcessorPool(); } diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 9eb7d43f8bf..207d2a4b0cd 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -78,9 +78,10 @@ const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); -const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics); const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); -const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, indexReadFileStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics); const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ee667d4246c..5801109bf7c 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -154,6 +154,7 @@ extern graph_decl const StatisticsMapping graphStatistics; extern graph_decl const StatisticsMapping indexDistribActivityStatistics; extern graph_decl const StatisticsMapping soapcallActivityStatistics; +extern graph_decl const StatisticsMapping indexReadFileStatistics; class BooleanOnOff { bool &tf;