From 37454b493986d5f5f1be67fc57233d327faccdf9 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 7 Nov 2023 05:00:05 +0000 Subject: [PATCH] HPCC-30599 Fix file access costs for keyed join Signed-off-by: Shamser Ahmed --- system/jhtree/jhtree.cpp | 6 ++-- system/jlib/jstats.cpp | 3 +- .../activities/indexread/thindexreadslave.cpp | 8 +++-- .../activities/keyedjoin/thkeyedjoinslave.cpp | 31 ++++++++++++------- thorlcr/graph/thgraphmaster.cpp | 26 +++++++++++----- thorlcr/slave/slavmain.cpp | 6 ++++ 6 files changed, 55 insertions(+), 25 deletions(-) diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 90bb4e9d403..99bbd5d5684 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -573,10 +573,12 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface filter->describe(out); } - virtual void mergeStats(CRuntimeStatisticCollection & stats) const + virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { if (keyCursor) - keyCursor->mergeStats(stats); + keyCursor->mergeStats(targetStats); + if (stats.ctx) + targetStats.merge(stats.ctx->queryStats()); } }; diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dafcc75ed2..36a7ba1b4b8 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1322,8 +1322,7 @@ const StatisticsMapping noStatistics({}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, - StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead}); const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..38df9cc39de 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -350,6 +350,8 @@ class CIndexReadSlaveBase : public CSlaveActivity } void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) { + if (!currentManager) + return; if (fileStats.size()>0) { ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); @@ -357,10 +359,12 @@ class CIndexReadSlaveBase : public CSlaveActivity { unsigned subfile, lnum; if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); + currentManager->mergeStats(*fileStats[fileTableStart+subfile]); } else - mergeStats(*fileStats[fileTableStart], partIO); + { + currentManager->mergeStats(*fileStats[fileTableStart]); + } } } void updateStats() diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index e9903b4ecb4..a973d9e74d8 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -959,6 +959,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { protected: CKeyedJoinSlave &activity; + CStatsContextLogger contextLogger; + CStatsCtxLoggerDeltaUpdater statsUpdater; IThorRowInterfaces *rowIf; IHThorKeyedJoinArg *helper = nullptr; std::vector queues; @@ -995,7 +997,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } public: CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this), - activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit) + activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit), + contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger) { helper = activity.helper; } @@ -1084,6 +1087,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stop() { + CStatsScopedDeltaUpdater scoped(statsUpdater); stopped = true; join(); for (auto &queue : queues) @@ -1155,6 +1159,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void flush() { + CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); // NB: queueLookup() must be protected from re-entry by caller for (unsigned b=0; bsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger); + KLBlobProviderAdapter adapter(keyManager, &contextLogger); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1381,7 +1386,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy); + keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } @@ -1749,10 +1754,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec. } unsigned __int64 seeks, scans, wildseeks; + unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); + mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); activity.inactiveStats.sumStatistic(StNumIndexScans, scans); activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); + activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches); + activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches); + activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1802,6 +1812,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void process(CThorExpandingRowArray &processing, unsigned selected) override { + CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater); unsigned partCopy = parts[selected]; unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; @@ -2222,8 +2233,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CPartDescriptorArray allIndexParts; std::vector localIndexParts, localFetchPartMap; IArrayOf tlkKeyIndexes; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; Owned joinFieldsAllocator; OwnedConstThorRow defaultRight; unsigned joinFlags = 0; @@ -2419,7 +2428,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i); const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true); - Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false); tlkManager->setKey(tlkKeyIndex); keyManagers.append(*tlkManager.getClear()); } @@ -2451,10 +2460,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false); } } - IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy) + IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { Owned keyIndex = createPartKeyIndex(partNo, copy, false); - return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) { @@ -2616,7 +2625,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stopReadAhead() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); keyLookupHandlers.flush(); keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result fetchLookupHandlers.flushTS(); @@ -2938,7 +2946,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); @@ -3368,7 +3376,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); endOfInput = true; // signals to readAhead which is reading input, that is should stop asap. // could be blocked in readAhead(), because CJoinGroup's are no longer being processed diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 60770d5e23c..8d9f85a2001 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -650,13 +650,12 @@ void CMasterActivity::done() void CMasterActivity::updateFileReadCostStats() { // Returns updates numDiskReads & readCost in the file attributes and returns the readCost - auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats) + auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats) { - stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); StringBuffer clusterName; file->getClusterName(0, clusterName); - cost_type legacyReadCost = 0, curReadCost = 0; IPropertyTree & fileAttr = file->queryAttributes(); + cost_type legacyReadCost = 0, curReadCost = 0; // Legacy files will not have the readCost stored as an attribute if (!hasReadWriteCostFields(&fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads))) { @@ -664,7 +663,16 @@ void CMasterActivity::updateFileReadCostStats() stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads)); } - curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads)); + stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); + if(useJhtreeCacheStats) + { + stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches) + + stats.getStatisticSum(StNumLeafDiskFetches) + + stats.getStatisticSum(StNumBlobDiskFetches); + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads)); + } + else + curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads)); file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost); file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads); return curReadCost; @@ -676,6 +684,10 @@ void CMasterActivity::updateFileReadCostStats() for (unsigned i=0; iquerySuperFile(); @@ -685,13 +697,13 @@ void CMasterActivity::updateFileReadCostStats() for (unsigned i=0; iquerySubFile(i, true); - diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]); fileIndex++; } } else { - diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]); + diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]); fileIndex++; } } @@ -701,7 +713,7 @@ void CMasterActivity::updateFileReadCostStats() { IDistributedFile *file = queryReadFile(0); if (file) - diskAccessCost += updateReadCosts(file, statsCollection); + diskAccessCost += updateReadCosts(useJhtreeCache, file, statsCollection); } } diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index 634964dc80a..4e25342301f 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -761,6 +761,9 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks); unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans); unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks); + unsigned __int64 startNodeDiskFetches = stats.getStatisticValue(StNumNodeDiskFetches); + unsigned __int64 startLeafDiskFetches = stats.getStatisticValue(StNumLeafDiskFetches); + unsigned __int64 startBlobDiskFetches = stats.getStatisticValue(StNumBlobDiskFetches); while (!abortSoon) { OwnedConstThorRow row = getRowClear(rowNum++); @@ -773,6 +776,9 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks); replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans); replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks); + replyMb.append(stats.getStatisticValue(StNumNodeDiskFetches)-startNodeDiskFetches); + replyMb.append(stats.getStatisticValue(StNumLeafDiskFetches)-startLeafDiskFetches); + replyMb.append(stats.getStatisticValue(StNumBlobDiskFetches)-startBlobDiskFetches); if (activityCtx->useMessageCompression()) { fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());