From bf1859fcd24db38429ed2d57cc0fee4fb5fa792d Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 7 Nov 2023 05:00:05 +0000 Subject: [PATCH] HPCC-30599 Fix file access costs for keyed join Signed-off-by: Shamser Ahmed --- dali/base/dadfs.cpp | 7 ++++ dali/base/dadfs.hpp | 1 + system/jhtree/jhtree.cpp | 6 ++- system/jlib/jstats.cpp | 3 +- system/jlib/jstats.h | 1 - .../activities/indexread/thindexreadslave.cpp | 8 +++- .../activities/keyedjoin/thkeyedjoinslave.cpp | 31 ++++++++++------ thorlcr/graph/thgraphmaster.cpp | 37 ++++++++++++++----- thorlcr/slave/slavmain.cpp | 6 +++ 9 files changed, 72 insertions(+), 28 deletions(-) diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 6ad138b9e3e..bfd17564299 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -214,6 +214,13 @@ extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWr return accessCost; } +extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads) +{ + StringBuffer clusterName; + f->getClusterName(0, clusterName); + return money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, numDiskReads)); +} + RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn) { partno--; diff --git a/dali/base/dadfs.hpp b/dali/base/dadfs.hpp index c1fa5f44a5c..f038248326c 100644 --- a/dali/base/dadfs.hpp +++ b/dali/base/dadfs.hpp @@ -888,6 +888,7 @@ extern da_decl void ensureFileScope(const CDfsLogicalFileName &dlfn, unsigned ti extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool readreq,bool createreq,bool allowquery,const char *specialnotallowedmsg); extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost); extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads); +extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads); constexpr bool defaultPrivilegedUser = true; constexpr bool defaultNonPrivilegedUser = false; diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 90bb4e9d403..99bbd5d5684 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -573,10 +573,12 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface filter->describe(out); } - virtual void mergeStats(CRuntimeStatisticCollection & stats) const + virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { if (keyCursor) - keyCursor->mergeStats(stats); + keyCursor->mergeStats(targetStats); + if (stats.ctx) + targetStats.merge(stats.ctx->queryStats()); } }; diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dafcc75ed2..36a7ba1b4b8 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1322,8 +1322,7 @@ const StatisticsMapping noStatistics({}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, - StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead}); const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index c86d18c1937..3092e4541b3 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -533,7 +533,6 @@ class jlib_decl CRuntimeStatistic }; class CNestedRuntimeStatisticMap; - //The CRuntimeStatisticCollection used to gather statistics for an activity - it has no notion of its scope, but can contain nested scopes. //Some of the functions have node parameters which have no meaning for the base implementation, but are used by the derived class //CRuntimeSummaryStatisticCollection which is used fro summarising stats from multiple different worker nodes. diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..38df9cc39de 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -350,6 +350,8 @@ class CIndexReadSlaveBase : public CSlaveActivity } void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) { + if (!currentManager) + return; if (fileStats.size()>0) { ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); @@ -357,10 +359,12 @@ class CIndexReadSlaveBase : public CSlaveActivity { unsigned subfile, lnum; if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); + currentManager->mergeStats(*fileStats[fileTableStart+subfile]); } else - mergeStats(*fileStats[fileTableStart], partIO); + { + currentManager->mergeStats(*fileStats[fileTableStart]); + } } } void updateStats() diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index e9903b4ecb4..a973d9e74d8 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -959,6 +959,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { protected: CKeyedJoinSlave &activity; + CStatsContextLogger contextLogger; + CStatsCtxLoggerDeltaUpdater statsUpdater; IThorRowInterfaces *rowIf; IHThorKeyedJoinArg *helper = nullptr; std::vector queues; @@ -995,7 +997,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } public: CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this), - activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit) + activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit), + contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger) { helper = activity.helper; } @@ -1084,6 +1087,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stop() { + CStatsScopedDeltaUpdater scoped(statsUpdater); stopped = true; join(); for (auto &queue : queues) @@ -1155,6 +1159,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void flush() { + CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); // NB: queueLookup() must be protected from re-entry by caller for (unsigned b=0; bsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger); + KLBlobProviderAdapter adapter(keyManager, &contextLogger); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1381,7 +1386,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy); + keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } @@ -1749,10 +1754,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec. } unsigned __int64 seeks, scans, wildseeks; + unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); + mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); activity.inactiveStats.sumStatistic(StNumIndexScans, scans); activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); + activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches); + activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches); + activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1802,6 +1812,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void process(CThorExpandingRowArray &processing, unsigned selected) override { + CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater); unsigned partCopy = parts[selected]; unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; @@ -2222,8 +2233,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CPartDescriptorArray allIndexParts; std::vector localIndexParts, localFetchPartMap; IArrayOf tlkKeyIndexes; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; Owned joinFieldsAllocator; OwnedConstThorRow defaultRight; unsigned joinFlags = 0; @@ -2419,7 +2428,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i); const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true); - Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false); tlkManager->setKey(tlkKeyIndex); keyManagers.append(*tlkManager.getClear()); } @@ -2451,10 +2460,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false); } } - IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy) + IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { Owned keyIndex = createPartKeyIndex(partNo, copy, false); - return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) { @@ -2616,7 +2625,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stopReadAhead() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); keyLookupHandlers.flush(); keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result fetchLookupHandlers.flushTS(); @@ -2938,7 +2946,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); @@ -3368,7 +3376,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); endOfInput = true; // signals to readAhead which is reading input, that is should stop asap. // could be blocked in readAhead(), because CJoinGroup's are no longer being processed diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index dae32d807ca..d3ba4828a59 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -649,6 +649,7 @@ void CMasterActivity::done() void CMasterActivity::updateFileReadCostStats() { + ThorActivityKind activityKind = container.getKind(); if (fileStats.size()>0) { unsigned fileIndex = 0; @@ -657,6 +658,19 @@ void CMasterActivity::updateFileReadCostStats() IDistributedFile *file = queryReadFile(i); if (file) { + bool usesJhtreeCache = false; + if (TAKkeyedjoin == activityKind || TAKindexread == activityKind) + { + // Index uses jhtree caches, so use actual fetches to calculate cost + usesJhtreeCache = true; + if (0 == i) + { + stat_type numActualDiskReads = fileStats[fileIndex]->getStatisticSum(StNumNodeDiskFetches) + + fileStats[fileIndex]->getStatisticSum(StNumLeafDiskFetches) + + fileStats[fileIndex]->getStatisticSum(StNumBlobDiskFetches); + diskAccessCost += calcFileAccessCost(file, 0, numActualDiskReads); + } + } IDistributedSuperFile *super = file->querySuperFile(); if (super) { @@ -665,9 +679,8 @@ void CMasterActivity::updateFileReadCostStats() { IDistributedFile &subFile = super->querySubFile(i, true); stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads); - StringBuffer clusterName; - subFile.getClusterName(0, clusterName); - diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads)); + if (!usesJhtreeCache) + diskAccessCost += calcFileAccessCost(&subFile, 0, numDiskReads); subFile.addAttrValue("@numDiskReads", numDiskReads); fileIndex++; } @@ -675,9 +688,9 @@ void CMasterActivity::updateFileReadCostStats() else { stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads); - StringBuffer clusterName; - file->getClusterName(0, clusterName); - diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads)); + if (!usesJhtreeCache) + diskAccessCost += calcFileAccessCost(file, 0, numDiskReads); + file->addAttrValue("@numDiskReads", numDiskReads); fileIndex++; } @@ -690,9 +703,15 @@ void CMasterActivity::updateFileReadCostStats() if (file) { stat_type numDiskReads = statsCollection.getStatisticSum(StNumDiskReads); - StringBuffer clusterName; - file->getClusterName(0, clusterName); - diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads)); + if (TAKindexread == activityKind) + { + stat_type numActualDiskReads = statsCollection.getStatisticSum(StNumNodeDiskFetches) + + statsCollection.getStatisticSum(StNumLeafDiskFetches) + + statsCollection.getStatisticSum(StNumBlobDiskFetches); + diskAccessCost += calcFileAccessCost(file, 0, numActualDiskReads); + } + else + diskAccessCost += calcFileAccessCost(file, 0, numDiskReads); file->addAttrValue("@numDiskReads", numDiskReads); } } diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index 634964dc80a..4e25342301f 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -761,6 +761,9 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks); unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans); unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks); + unsigned __int64 startNodeDiskFetches = stats.getStatisticValue(StNumNodeDiskFetches); + unsigned __int64 startLeafDiskFetches = stats.getStatisticValue(StNumLeafDiskFetches); + unsigned __int64 startBlobDiskFetches = stats.getStatisticValue(StNumBlobDiskFetches); while (!abortSoon) { OwnedConstThorRow row = getRowClear(rowNum++); @@ -773,6 +776,9 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks); replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans); replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks); + replyMb.append(stats.getStatisticValue(StNumNodeDiskFetches)-startNodeDiskFetches); + replyMb.append(stats.getStatisticValue(StNumLeafDiskFetches)-startLeafDiskFetches); + replyMb.append(stats.getStatisticValue(StNumBlobDiskFetches)-startBlobDiskFetches); if (activityCtx->useMessageCompression()) { fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());