Skip to content

Commit

Permalink
Merge pull request #18110 from shamser/issue30937
Browse files Browse the repository at this point in the history
HPCC-30937 Update readCost, writeCost, numDiskReads and numDiskWrites periodically

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jan 25, 2024
2 parents 6fe9800 + 8198608 commit 7787e78
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 41 deletions.
10 changes: 10 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDisk
return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads);
}

extern da_decl cost_type calcDiskWriteCost(const StringArray & clusters, stat_type numDiskWrites)
{
if (!numDiskWrites)
return 0;
cost_type writeCost = 0;
ForEachItemIn(idx, clusters)
writeCost += calcFileAccessCost(clusters.item(idx), numDiskWrites, 0);
return writeCost;
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
{
partno--;
Expand Down
1 change: 1 addition & 0 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool
extern da_decl cost_type calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays);
extern da_decl cost_type calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl cost_type calcDiskWriteCost(const StringArray & clusters, stat_type numDiskWrites);
constexpr bool defaultPrivilegedUser = true;
constexpr bool defaultNonPrivilegedUser = false;

Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/fetch/thfetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,16 @@ class CFetchActivityMaster : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/indexread/thindexread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,16 @@ class CIndexReadBase : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
6 changes: 5 additions & 1 deletion thorlcr/activities/indexwrite/thindexwrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class IndexWriteActivityMaster : public CMasterActivity
props.setPropInt64("@totalCRC", totalCRC);
}
props.setPropInt("@formatCrc", helper->getFormatCrc());
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
if (isLocal)
{
props.setPropBool("@local", true);
Expand All @@ -310,7 +312,6 @@ class IndexWriteActivityMaster : public CMasterActivity
bloom->setProp("@bloomProbability", pval.str());
}
container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
updateFileWriteCostStats(*fileDesc, props, statsCollection.getStatisticSum(StNumDiskWrites));
if (!dlfn.isExternal())
queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc);
}
Expand Down Expand Up @@ -418,6 +419,9 @@ class IndexWriteActivityMaster : public CMasterActivity
{
CMasterActivity::getActivityStats(stats);
stats.addStatistic(StNumDuplicateKeys, cummulativeDuplicateKeyCount);
diskAccessCost = calcDiskWriteCost(clusters, statsCollection.getStatisticSum(StNumDiskWrites));
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
};

Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,16 @@ class CKeyedJoinMaster : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
21 changes: 19 additions & 2 deletions thorlcr/activities/thdiskbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,17 @@ void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
CSlavePartMapping::serializeNullMap(dst);
}

void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CDiskReadMasterBase::done()
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}

Expand Down Expand Up @@ -284,7 +292,8 @@ void CWriteMasterBase::publish()
}
if (TDWrestricted & diskHelperBase->getFlags())
props.setPropBool("restricted", true );
updateFileWriteCostStats(*fileDesc, props, statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), diskHelperBase->getTempUsageCount(), TDXtemporary & diskHelperBase->getFlags(), getDiskOutputKind(diskHelperBase->getFlags()), &clusters);
if (!dlfn.isExternal())
{
Expand Down Expand Up @@ -432,6 +441,14 @@ void CWriteMasterBase::done()
}
}

void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcDiskWriteCost(clusters, statsCollection.getStatisticSum(StNumDiskWrites));
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
{
if (mb.length()) // if 0 implies aborted out from this slave.
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/thdiskbase.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public:
virtual void init() override;
virtual void kill() override;
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override;
virtual void getActivityStats(IStatisticGatherer & stats) override;
virtual void done() override;
virtual void validateFile(IDistributedFile *file) { }
virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override;
Expand All @@ -62,6 +63,7 @@ public:
virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
virtual void init();
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
virtual void getActivityStats(IStatisticGatherer & stats) override;
virtual void done();
virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb);
};
Expand Down
59 changes: 26 additions & 33 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,6 @@ void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
{
statsCollection.getStats(stats);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
Expand Down Expand Up @@ -647,22 +645,18 @@ void CMasterActivity::done()
}
}

void CMasterActivity::updateFileReadCostStats()
// calcFileReadCostStats calculates and returns the read costs for all files read by the activity
// In addition, if updateFileProps==true, it updates the file attributes with @readCost and @numDiskReads
// Note: should be called once per activity with "updateFileProps==true" to avoid double counting
cost_type CMasterActivity::calcFileReadCostStats(bool updateFileProps)
{
// Update numDiskReads & readCost in the file attributes and return readCost
auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
// 1) Returns readCost 2) if updateFilePros==true, updates file attributes with @readCost and @numDiskReads
auto updateReadCosts = [updateFileProps](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
{
StringBuffer clusterName;
file->getClusterName(0, clusterName);
IPropertyTree & fileAttr = file->queryAttributes();
cost_type legacyReadCost = 0, curReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = calcFileAccessCost(clusterName, 0, prevDiskReads);
}
cost_type curReadCost = 0;
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
if(useJhtreeCacheStats)
{
Expand All @@ -673,11 +667,23 @@ void CMasterActivity::updateFileReadCostStats()
}
else
curReadCost = calcFileAccessCost(clusterName, 0, curDiskReads);
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);

if (updateFileProps)
{
cost_type legacyReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = calcFileAccessCost(clusterName, 0, prevDiskReads);
}
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
}
return curReadCost;
};

cost_type readCost = 0;
if (fileStats.size()>0)
{
ThorActivityKind activityKind = container.getKind();
Expand All @@ -700,13 +706,13 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<numSubFiles; i++)
{
IDistributedFile &subFile = super->querySubFile(i, true);
diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
readCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
fileIndex++;
}
}
else
{
diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
readCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
fileIndex++;
}
}
Expand All @@ -716,22 +722,9 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile *file = queryReadFile(0);
if (file)
diskAccessCost += updateReadCosts(true, file, statsCollection);
}
}

void CMasterActivity::updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites)
{
if (numDiskWrites)
{
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), numDiskWrites);
assertex(fileDesc.numClusters()>=1);
StringBuffer clusterName;
fileDesc.getClusterGroupName(0, clusterName);// Note: calculating for 1st cluster. (Future: calc for >1 clusters)
cost_type writeCost = calcFileAccessCost(clusterName, numDiskWrites, 0);
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
diskAccessCost = writeCost;
readCost = updateReadCosts(true, file, statsCollection);
}
return readCost;
}

//////////////////////
Expand Down
3 changes: 1 addition & 2 deletions thorlcr/graph/thgraphmaster.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ protected:
unsigned queryReadFileId(const char *lfnName);
IDistributedFile *findReadFile(const char *lfnName);
IDistributedFile *lookupReadFile(const char *lfnName, AccessMode mode, bool jobTemp, bool temp, bool opt, bool statsForMultipleFiles=false, const StatisticsMapping &statsMapping=diskReadRemoteStatistics, unsigned * fileStatsStartEntry=nullptr);
void updateFileReadCostStats();
void updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites);
cost_type calcFileReadCostStats(bool updateFileProps);
virtual void process() { }
public:
IMPLEMENT_IINTERFACE_USING(CActivityBase)
Expand Down

0 comments on commit 7787e78

Please sign in to comment.