diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index f85244be7b5..aad74478a3e 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -682,7 +682,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf mutable CRuntimeStatisticCollection stats; public: CStatsContextLogger(const CRuntimeStatisticCollection &_mapping, const LogMsgJobInfo & _job=unknownJob) : job(_job), stats(_mapping) {} - + void reset() + { + stats.reset(); + } virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0))) { VALOG(cat, job, code, format, args); diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 53265d82167..4a245bb24a7 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -573,10 +573,13 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface filter->describe(out); } - virtual void mergeStats(CRuntimeStatisticCollection & stats) const + virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { + // IO Stats comming from the keyCursor and jhtree cache stats coming from this class's stats if (keyCursor) - keyCursor->mergeStats(stats); + keyCursor->mergeStats(targetStats); // merge IO stats + if (stats.ctx) + targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats } }; diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 1ca005cb6aa..9830a2dba9f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1322,8 +1322,8 @@ const StatisticsMapping noStatistics({}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, - StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, + StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead}); const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 9b43086248d..920fa8e61bf 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -310,7 +310,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = 0; Owned dataFile; - Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry); + Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry); if (indexFile) { if (!isFileKey(indexFile)) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index e072e6cc814..3edc40700b1 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1248,7 +1248,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem if (limiter) limiter->dec(); // unblocks any requests to start lookup threads } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) = 0; + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const = 0; }; class CKeyLookupLocalBase : public CLookupHandler @@ -1284,7 +1284,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager) { - CStatsScopedThresholdDeltaUpdater scoped(activity.statsUpdater); for (unsigned r=0; rsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger); + KLBlobProviderAdapter adapter(keyManager, nullptr); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1350,6 +1349,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem //Need ptr to std::vector as std::atomic's are not constructable(doesn't have copy constructor) std::unique_ptr>> keyManagers; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; public: CKeyLookupLocalHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) { @@ -1368,6 +1369,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem keyManagers.reset(new std::vector>(parts.size())); for (auto & k: *keyManagers) k = nullptr; + if (contextLoggers.size() > 0) + { + for (auto contextLogger: contextLoggers) + contextLogger->reset(); + } + else + { + for (unsigned i=0; i & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy); + keyManager = activity.createPartKeyManager(partNo, copy, contextLoggers[selected]); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } processRows(processing, partNo, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i keyManager{nullptr}; public: - CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) + CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity), contextLogger(jhtreeCacheStatistics, thorJob) { limiter = &activity.lookupThreadLimiter; translators.push_back(nullptr); @@ -1433,33 +1444,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); partKeySet->addIndex(keyIndex.getClear()); } - keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors(), false); + keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); setupTranslation(0, 0, *keyManager); } processRows(processing, 0, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { - if (keyManager) - { - for (size_t i=0; imerge(contextLogger.queryStats()); } - }; class CRemoteLookupHandler : public CLookupHandler { typedef CLookupHandler PARENT; - protected: rank_t lookupSlave = RANK_NULL; mptag_t replyTag = TAG_NULL; @@ -1543,14 +1543,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem for (auto &h: handles) h = 0; } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override - { - /* Note: currently, stats from remote file not tracked */ - } }; class CKeyLookupRemoteHandler : public CRemoteLookupHandler { typedef CRemoteLookupHandler PARENT; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; void initRead(CMessageBuffer &msg, unsigned selected, unsigned partNo, unsigned copy) { @@ -1633,6 +1631,20 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter = &activity.lookupThreadLimiter; allParts = &activity.allIndexParts; } + virtual void init() override + { + PARENT::init(); + if (contextLoggers.size() > 0) + { + for (auto contextLogger: contextLoggers) + contextLogger->reset(); + } + else + { + for (unsigned i=0; i