-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #18048
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,9 +188,9 @@ CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _cr | |
{ | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(_rootScope)); | ||
StatsScopeId wfScopeId(SSTworkflow,wfid); | ||
|
||
StatsScopeId rootScopeId(SSTworkflow,wfid); | ||
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); | ||
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); | ||
collector->beginScope(graphScopeId); | ||
} | ||
|
||
|
@@ -2659,83 +2659,67 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu | |
} | ||
} | ||
|
||
//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) | ||
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) | ||
void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit) | ||
{ | ||
WuScopeFilter filter; | ||
if (!isEmptyString(scope)) | ||
filter.addScope(scope); | ||
else | ||
filter.addScope(""); // Needed to match scope | ||
// when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost | ||
// (Costs from child graphs and activities should have been summed up to graph/subgraph level already) | ||
// when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost | ||
// (Costs from all levels below graph should be summed upto at least graph level already) | ||
// i.e. need 2 levels of nesting | ||
filter.setIncludeNesting(2); | ||
// includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", | ||
// causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" | ||
filter.addSource("global"); | ||
filter.addOutputStatistic(StCostFileAccess); | ||
filter.addRequiredStat(StCostFileAccess); | ||
filter.finishedFilter(); | ||
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter); | ||
cost_type totalCost = 0; | ||
for (it->first(); it->isValid(); ) | ||
StatsScopeId globalScopeId(SSTglobal, (unsigned)0); | ||
statsCollection.setown(createStatisticCollection(globalScopeId)); | ||
|
||
class StatsCollectionAggregatesLoader : public IWuScopeVisitor | ||
{ | ||
cost_type value = 0; | ||
if (it->getStat(StCostFileAccess, value)) | ||
{ | ||
totalCost += value; | ||
it->nextSibling(); | ||
} | ||
else | ||
public: | ||
StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} | ||
|
||
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override | ||
{ | ||
it->next(); | ||
statsCollection->setStatistic(extra.queryScope(), kind, value); | ||
} | ||
} | ||
return totalCost; | ||
} | ||
virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } | ||
virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } | ||
virtual void noteException(IConstWUException & exception) override { throwUnexpected(); } | ||
private: | ||
Linked<IStatisticCollection> statsCollection; | ||
}; | ||
|
||
void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) | ||
{ | ||
WuScopeFilter filter; | ||
if (!isEmptyString(scope)) | ||
filter.addScope(scope); | ||
else | ||
{ | ||
filter.addScope(""); | ||
filter.addSource("global"); | ||
} | ||
filter.setIncludeNesting(1); | ||
filter.addOutputStatistic(StSizeGraphSpill); | ||
filter.addRequiredStat(StSizeGraphSpill); | ||
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); | ||
const unsigned numStats = mapping.numStatistics(); | ||
for (unsigned i=0; i<numStats; ++i) | ||
filter.addOutputStatistic(mapping.getKind(i)); | ||
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph | ||
filter.setSources(SSFsearchGlobalStats); | ||
filter.setIncludeNesting(0); | ||
filter.finishedFilter(); | ||
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter); | ||
peakSizeSpill = 0; | ||
for (it->first(); it->isValid(); ) | ||
{ | ||
stat_type value = 0; | ||
if (it->getStat(StSizeGraphSpill, value)) | ||
{ | ||
if (value>peakSizeSpill) | ||
peakSizeSpill = value; | ||
it->nextSibling(); | ||
} | ||
else | ||
{ | ||
it->next(); | ||
} | ||
} | ||
|
||
StatsCollectionAggregatesLoader aggregatesLoader(statsCollection); | ||
Owned<IConstWUScopeIterator> iter = &workunit.getScopeIterator(filter); | ||
ForEach(*iter) | ||
iter->playProperties(aggregatesLoader); | ||
} | ||
|
||
void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) | ||
// Replace the stats at the specified scope level | ||
void StatisticsAggregator::recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned sgId) | ||
{ | ||
stat_type peakSizeSpill = 0; | ||
gatherSpillSize(wu, scope, peakSizeSpill); | ||
if (peakSizeSpill) | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(graphName)); | ||
StatsScopeId wfScopeId(SSTworkflow, wfid); | ||
StatsScopeId sgScopeId(SSTsubgraph, sgId); | ||
statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId}); | ||
} | ||
|
||
// Recalculate aggregates and then write the aggregates to global stats (dali) | ||
void StatisticsAggregator::updateAggregates(IWorkUnit *wu) | ||
{ | ||
if (!statsCollection) | ||
return; | ||
|
||
AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) | ||
{ | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); | ||
}; | ||
|
||
statsCollection->refreshAggregates(mapping, f); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see: https://github.com/hpcc-systems/HPCC-Platform/actions/runs/6968880782/job/18963669852 The std::function vs 'std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)>' I think. It would be better if the callback function type was a typedef defined in jstats.h, and then used here. |
||
} | ||
|
||
//--------------------------------------------------------------------------------------------------------------------- | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator | |
}; | ||
|
||
//--------------------------------------------------------------------------------------------------------------------- | ||
|
||
//! IWorkUnit | ||
//! Provides high level access to WorkUnit "header" data. | ||
interface IWorkUnit; | ||
|
@@ -1725,8 +1724,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti | |
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); | ||
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); | ||
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); | ||
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); | ||
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); | ||
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); | ||
extern WORKUNIT_API void descheduleWorkunit(char const * wuid); | ||
#if 0 | ||
|
@@ -1785,4 +1782,17 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in | |
|
||
extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName); | ||
|
||
|
||
class WORKUNIT_API StatisticsAggregator : public CInterface | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see the comment in cpp, re. moving StatisticsAggregator into jstats (and extracting helper functions that need wu) |
||
{ | ||
public: | ||
StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {} | ||
void loadExistingAggregates(const IConstWorkUnit &workunit); | ||
void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId); | ||
void updateAggregates(IWorkUnit *wu); | ||
private: | ||
Owned<IStatisticCollection> statsCollection; | ||
const StatisticsMapping & mapping; | ||
}; | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -879,10 +879,12 @@ void EclSubGraph::updateProgress() | |
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); | ||
IStatisticGatherer & stats = progress->queryStatsBuilder(); | ||
updateProgress(stats); | ||
|
||
Owned<IStatisticCollection> statsCollection = stats.getResult(); | ||
agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id); | ||
if (startGraphTime || elapsedGraphCycles) | ||
{ | ||
WorkunitUpdate lockedwu(agent->updateWorkUnit()); | ||
agent->updateAggregates(lockedwu); | ||
StringBuffer subgraphid; | ||
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); | ||
if (startGraphTime) | ||
|
@@ -897,10 +899,6 @@ void EclSubGraph::updateProgress() | |
if (cost) | ||
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); | ||
} | ||
Owned<IStatisticCollection> statsCollection = stats.getResult(); | ||
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ; | ||
if (costDiskAccess) | ||
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); | ||
} | ||
} | ||
} | ||
|
@@ -927,6 +925,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) | |
} | ||
ForEachItemIn(i2, subgraphs) | ||
subgraphs.item(i2).updateProgress(progress); | ||
|
||
Owned<IStatisticCollection> statsCollection = progress.getResult(); | ||
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess); | ||
if (costDiskAccess) | ||
progress.addStatistic(StCostFileAccess, costDiskAccess); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change means that the cost will be recorded in the subgraphs stats rather than the global stats as it was before. Does that mean it will not be picked up by the stats aggregator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the aggregated total should still be published at the global level after this change (unless I'm missing something), There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, this statistic wasn't picked up by the aggregator. It needs to be in the subgraph scope. However, the aggregator will add the statistic back in as a global statistic. |
||
} | ||
|
||
bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies) | ||
|
@@ -1277,10 +1280,6 @@ void EclGraph::execute(const byte * parentExtract) | |
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); | ||
if (cost) | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); | ||
|
||
const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope); | ||
if (costDiskAccess) | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); | ||
} | ||
|
||
if (agent->queryRemoteWorkunit()) | ||
|
@@ -1349,8 +1348,12 @@ void EclGraph::updateLibraryProgress() | |
{ | ||
EclSubGraph & cur = graphs.item(idx); | ||
unsigned wfid = cur.parent.queryWfid(); | ||
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); | ||
cur.updateProgress(progress->queryStatsBuilder()); | ||
|
||
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); | ||
IStatisticGatherer & stats = progress->queryStatsBuilder(); | ||
cur.updateProgress(stats); | ||
Owned<IStatisticCollection> statsCollection = stats.getResult(); | ||
agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id); | ||
} | ||
} | ||
|
||
|
@@ -1492,7 +1495,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) | |
|
||
IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) | ||
{ | ||
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); | ||
return wu->updateStats(queryGraphName(), creatorType, creator, activeWfid, subgraph, false); | ||
} | ||
|
||
void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) | ||
|
@@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa | |
|
||
Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); | ||
eclGraph->createFromXGMML(dll, xgmml); | ||
statsAggregator.loadExistingAggregates(*wu); | ||
return eclGraph.getClear(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it stands sourceStats must be subgraph stats - anything else would be invalid given the explicitly passed in { wfid, graphName, sgId }..
I think you can pass in just sourceStats and extract the scope instead (sourceStats->getFullStats).
Then use that to ensureScope, instead of 'IStatisticCollection::setStatistic' that currently does via StatsCollectionAggregatesLoader::noteStatistic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I don't think 'recordStats' needs or should be part of CStatisticCollection.
I think the only methods that are needed in CStatisticCollection so that can be used by the aggregate class are
then this method can be something like: