Skip to content

Commit

Permalink
HPCC-31062 Include dfu file op cost in workunit aggregates
Browse files Browse the repository at this point in the history
Note, this commit also fixes an issue where the GetDFUWorkunit esp service
doesn't return the FileAccessCost field because the default version of
the GetDFUWorkunit doesn't have this field. The issue is resolved by
updating the default version of the GetDFUWorkunit service.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Mar 27, 2024
1 parent 1e2fe1a commit f67ec42
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
34 changes: 29 additions & 5 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2681,19 +2681,22 @@ void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit
};

WuScopeFilter filter;
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph);
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph).addScopeType(SSToperation);
const unsigned numStats = mapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(mapping.getKind(i));
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph
filter.setDepth(0,3); // 0=global, 1=workflow, (2=graph, 3=subgraph | 2=>dfu)
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
filter.finishedFilter();

StatsCollectionAggregatesLoader aggregatesLoader(statsCollection);
Owned<IConstWUScopeIterator> iter = &workunit.getScopeIterator(filter);
ForEach(*iter)
{
iter->playProperties(aggregatesLoader);
if (iter->getScopeType()==SSToperation && !iter->nextSibling()) // don't descend operation. only top operation scope needed
break;
}
}

// Replace the stats at the specified scope level
Expand Down Expand Up @@ -8927,7 +8930,7 @@ void CLocalWorkUnit::setStatistic(StatisticCreatorType creatorType, const char *
if (mergeAction != StatsMergeAppend)
{
StringBuffer xpath;
xpath.append("Statistic[@creator='").append(creator).append("'][@scope='").append(scope).append("'][@kind='").append(kindName).append("']");
xpath.append("Statistic[@scope='").append(scope).append("'][@kind='").append(kindName).append("']");
statTree = stats->queryPropTree(xpath.str());
}

Expand Down Expand Up @@ -8955,6 +8958,7 @@ void CLocalWorkUnit::setStatistic(StatisticCreatorType creatorType, const char *
mergeAction = StatsMergeAppend;
}

unsigned __int64 deltaValue = 0;
if (mergeAction != StatsMergeAppend) // RKC->GH Is this right??
{
unsigned __int64 oldValue = statTree->getPropInt64("@value", 0);
Expand All @@ -8963,14 +8967,18 @@ void CLocalWorkUnit::setStatistic(StatisticCreatorType creatorType, const char *
if (oldMax < oldValue)
oldMax = oldValue;

statTree->setPropInt64("@value", mergeStatisticValue(oldValue, value, mergeAction));
unsigned __int64 newValue = mergeStatisticValue(oldValue, value, mergeAction);
statTree->setPropInt64("@value", newValue);
statTree->setPropInt64("@count", count + oldCount);
if (maxValue > oldMax)
statTree->setPropInt64("@max", maxValue);
deltaValue = newValue - oldValue;
}
else
{
statTree->setPropInt64("@value", value);
deltaValue = value;

statTree->setPropInt64("@count", count);
if (maxValue)
statTree->setPropInt64("@max", maxValue);
Expand Down Expand Up @@ -9002,6 +9010,22 @@ void CLocalWorkUnit::setStatistic(StatisticCreatorType creatorType, const char *
}
if (kind == StCostCompile)
p->setPropInt64("@costCompile", value);

// Special case - update aggregates for dfu FileAccessCost. This is needed because although
// fileservices can update dfu cost in the workunit, it does not have a mechanism to update
// the aggregates for this stat.
if (scopeType == SSTdfuworkunit && kind == StCostFileAccess && value)
{
StringBuffer currentScope(scope), parent;
while (getParentScope(parent.clear(), currentScope.str()))
{
currentScope.set(parent);
StatisticScopeType sst = getScopeType(queryScopeTail(currentScope.str()));
if (sst!=SSTnone)
setStatistic(creatorType, creator, sst, currentScope.str(), StCostFileAccess, "", deltaValue, 1, 0, StatsMergeSum);
}
setStatistic(creatorType, creator, SSTglobal, "", StCostFileAccess, "", deltaValue, 1, 0, StatsMergeSum);
}
}

void CLocalWorkUnit::_loadStatistics() const
Expand Down
2 changes: 1 addition & 1 deletion esp/scm/ws_fs.ecm
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ ESPservice [
ESPmethod [resp_xsl_default("/esp/xslt/showresult.xslt")] ShowResult(ShowResultRequest, ShowResultResponse);
ESPmethod [resp_xsl_default("/esp/xslt/dfuwu_search.xslt")] DFUWUSearch(DFUWUSearchRequest, DFUWUSearchResponse);
ESPmethod [resp_xsl_default("/esp/xslt/dfu_workunits.xslt")] GetDFUWorkunits(GetDFUWorkunits, GetDFUWorkunitsResponse);
ESPmethod [resp_xsl_default("/esp/xslt/dfu_wuid.xslt")] GetDFUWorkunit(GetDFUWorkunit, GetDFUWorkunitResponse);
ESPmethod [min_ver("1.26"), resp_xsl_default("/esp/xslt/dfu_wuid.xslt")] GetDFUWorkunit(GetDFUWorkunit, GetDFUWorkunitResponse);
ESPmethod [resp_xsl_default("/esp/xslt/dfu_progress.xslt")] GetDFUProgress(ProgressRequest, ProgressResponse);

ESPmethod [resp_xsl_default("/esp/xslt/dfu_wuid.xslt")] CreateDFUWorkunit(CreateDFUWorkunit, CreateDFUWorkunitResponse);
Expand Down

0 comments on commit f67ec42

Please sign in to comment.