Skip to content

Commit

Permalink
Merge pull request #18023 from shamser/issue30599
Browse files Browse the repository at this point in the history
HPCC-30599 Fix file access costs for keyed join

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Dec 19, 2023
2 parents a8f8622 + 461a8ae commit 5d78149
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 74 deletions.
5 changes: 4 additions & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
mutable CRuntimeStatisticCollection stats;
public:
CStatsContextLogger(const CRuntimeStatisticCollection &_mapping, const LogMsgJobInfo & _job=unknownJob) : job(_job), stats(_mapping) {}

void reset()
{
stats.reset();
}
virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
{
VALOG(cat, job, code, format, args);
Expand Down
7 changes: 5 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,13 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
filter->describe(out);
}

virtual void mergeStats(CRuntimeStatisticCollection & stats) const
virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const
{
// IO Stats coming from the keyCursor and jhtree cache stats coming from this class's stats
if (keyCursor)
keyCursor->mergeStats(stats);
keyCursor->mergeStats(targetStats); // merge IO stats
if (ctx)
targetStats.merge(ctx->queryStats()); // merge jhtree cache stats
}
};

Expand Down
4 changes: 2 additions & 2 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,8 @@ const StatisticsMapping noStatistics({});
const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks,
StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles,
StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches,
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips,
StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch});
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips,
StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead});

const StatisticsMapping allStatistics(StKindAll);
const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class CKeyedJoinMaster : public CMasterActivity
totalIndexParts = 0;

Owned<IDistributedFile> dataFile;
Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry);
Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry);
if (indexFile)
{
if (!isFileKey(indexFile))
Expand Down
130 changes: 85 additions & 45 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

Large diffs are not rendered by default.

41 changes: 32 additions & 9 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,25 +649,48 @@ void CMasterActivity::done()

void CMasterActivity::updateFileReadCostStats()
{
// Updates numDiskReads & readCost in the file attributes and returns the readCost
auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats)
// Update numDiskReads & readCost in the file attributes and return readCost
auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
{
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
IPropertyTree & fileAttr = file->queryAttributes();
cost_type legacyReadCost = getLegacyReadCost(fileAttr, file);
cost_type curReadCost = money2cost_type(calcFileAccessCost(file, 0, curDiskReads));
cost_type legacyReadCost = 0, curReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
if(useJhtreeCacheStats)
{
stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches)
+ stats.getStatisticSum(StNumLeafDiskFetches)
+ stats.getStatisticSum(StNumBlobDiskFetches);
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads));
}
else
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
return curReadCost;
};

if (fileStats.size()>0)
{
ThorActivityKind activityKind = container.getKind();
unsigned fileIndex = 0;
diskAccessCost = 0;
for (unsigned i=0; i<readFiles.size();i++)
for (unsigned i=0; i<readFiles.size(); i++)
{
IDistributedFile *file = queryReadFile(i);
bool useJhtreeCache = false;
// Index uses jhtree caches, so use actual fetches to calculate cost
// To determine entry is an index file entry, use the test (i==0) because index file is always the first file
if ((TAKindexread == activityKind) || ((TAKkeyedjoin == activityKind) && (0 == i)))
useJhtreeCache = true;
if (file)
{
IDistributedSuperFile *super = file->querySuperFile();
Expand All @@ -677,13 +700,13 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<numSubFiles; i++)
{
IDistributedFile &subFile = super->querySubFile(i, true);
diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]);
diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
fileIndex++;
}
}
else
{
diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]);
diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
fileIndex++;
}
}
Expand All @@ -693,7 +716,7 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile *file = queryReadFile(0);
if (file)
diskAccessCost = updateReadCosts(file, statsCollection);
diskAccessCost += updateReadCosts(true, file, statsCollection);
}
}

Expand Down
5 changes: 4 additions & 1 deletion thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool optUnordered = false; // is the output specified as unordered?
CriticalSection statsCs; // to be used to protect objects refernce during stat. collection
CRuntimeStatisticCollection inactiveStats; // stats collected from previous iteration, to be combined with current 'stats'
std::vector<OwnedPtr<CRuntimeStatisticCollection>> fileStats;
// fileStats is mutable as it is updated by gatherActiveStats (const member func)
// fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all.
// (Having it in the base class aids setup and resizing.)
mutable std::vector<OwnedPtr<CRuntimeStatisticCollection>> fileStats;

protected:
unsigned __int64 queryLocalCycles() const;
Expand Down
28 changes: 17 additions & 11 deletions thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
unsigned maxCachedKJManagers = defaultMaxCachedKJManagers;
unsigned maxCachedFetchContexts = defaultMaxCachedFetchContexts;
unsigned keyLookupMaxProcessThreads = defaultKeyLookupMaxProcessThreads;
CStatsContextLogger contextLogger;
class CLookupKey
{
unsigned hashv = 0;
Expand Down Expand Up @@ -447,11 +446,13 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
Owned<IKeyManager> keyManager;
unsigned handle = 0;
Owned<IHThorKeyedJoinArg> helper;
CStatsContextLogger contextLogger;

public:
CKMContainer(CKJService &_service, CKeyLookupContext *_ctx)
: service(_service), ctx(_ctx)
: service(_service), ctx(_ctx), contextLogger(jhtreeCacheStatistics, thorJob)
{
keyManager.setown(ctx->createKeyManager(&service.contextLogger));
keyManager.setown(ctx->createKeyManager(&contextLogger));
StringBuffer tracing;
const IDynamicTransform *translator = ctx->queryTranslator(ctx->queryKey().getTracing(tracing));
if (translator)
Expand All @@ -475,6 +476,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
}
inline IHThorKeyedJoinArg *queryHelper() const { return helper; }
inline CKJService & queryService() const { return service; }
inline CStatsContextLogger & queryContextLogger() { return contextLogger; }
};
template<class KEY, class ITEM>
class CKeyedCacheEntry : public CInterface
Expand Down Expand Up @@ -757,10 +759,8 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
unsigned rowCount = getRowCount();
unsigned rowNum = 0;
unsigned rowStart = 0;
const CRuntimeStatisticCollection & stats = kmc->queryService().contextLogger.queryStats();
unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks);
unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans);
unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks);
CStatsContextLogger & contextLogger = kmc->queryContextLogger();
CRuntimeStatisticCollection startStats(contextLogger.queryStats());
while (!abortSoon)
{
OwnedConstThorRow row = getRowClear(rowNum++);
Expand All @@ -770,9 +770,15 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
if (last || (replyMb.length() >= DEFAULT_KEYLOOKUP_MAXREPLYSZ))
{
countMarker.write(rowNum-rowStart);
replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks);
replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans);
replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks);

CRuntimeStatisticCollection deltaStats(startStats.queryMapping());
contextLogger.updateStatsDeltaTo(deltaStats, startStats);
replyMb.append(deltaStats.getStatisticValue(StNumIndexSeeks));
replyMb.append(deltaStats.getStatisticValue(StNumIndexScans));
replyMb.append(deltaStats.getStatisticValue(StNumIndexWildSeeks));
replyMb.append(deltaStats.getStatisticValue(StNumNodeDiskFetches));
replyMb.append(deltaStats.getStatisticValue(StNumLeafDiskFetches));
replyMb.append(deltaStats.getStatisticValue(StNumBlobDiskFetches));
if (activityCtx->useMessageCompression())
{
fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
Expand Down Expand Up @@ -1189,7 +1195,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IKJService>);

CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag), contextLogger(jhtreeCacheStatistics, thorJob)
CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag)
{
setupProcessorPool();
}
Expand Down
5 changes: 3 additions & 2 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ const StatisticsMapping soapcallStatistics({StTimeSoapcall});
const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);
const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, indexReadFileStatistics);
const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics);
const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ extern graph_decl const StatisticsMapping graphStatistics;
extern graph_decl const StatisticsMapping indexDistribActivityStatistics;
extern graph_decl const StatisticsMapping soapcallActivityStatistics;

extern graph_decl const StatisticsMapping indexReadFileStatistics;
class BooleanOnOff
{
bool &tf;
Expand Down

0 comments on commit 5d78149

Please sign in to comment.