Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #18048

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 52 additions & 68 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _cr
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId wfScopeId(SSTworkflow,wfid);

StatsScopeId rootScopeId(SSTworkflow,wfid);
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId));
collector->beginScope(graphScopeId);
}

Expand Down Expand Up @@ -2659,83 +2659,67 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified)
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
filter.addScope(""); // Needed to match scope
// when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost
// (Costs from child graphs and activities should have been summed up to graph/subgraph level already)
// when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost
// (Costs from all levels below graph should be summed upto at least graph level already)
// i.e. need 2 levels of nesting
filter.setIncludeNesting(2);
// includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats",
// causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global"
filter.addSource("global");
filter.addOutputStatistic(StCostFileAccess);
filter.addRequiredStat(StCostFileAccess);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
cost_type totalCost = 0;
for (it->first(); it->isValid(); )
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(globalScopeId));

class StatsCollectionAggregatesLoader : public IWuScopeVisitor
{
cost_type value = 0;
if (it->getStat(StCostFileAccess, value))
{
totalCost += value;
it->nextSibling();
}
else
public:
StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {}

virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override
{
it->next();
statsCollection->setStatistic(extra.queryScope(), kind, value);
}
}
return totalCost;
}
virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); }
virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); }
virtual void noteException(IConstWUException & exception) override { throwUnexpected(); }
private:
Linked<IStatisticCollection> statsCollection;
};

void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
{
filter.addScope("");
filter.addSource("global");
}
filter.setIncludeNesting(1);
filter.addOutputStatistic(StSizeGraphSpill);
filter.addRequiredStat(StSizeGraphSpill);
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph);
const unsigned numStats = mapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(mapping.getKind(i));
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
peakSizeSpill = 0;
for (it->first(); it->isValid(); )
{
stat_type value = 0;
if (it->getStat(StSizeGraphSpill, value))
{
if (value>peakSizeSpill)
peakSizeSpill = value;
it->nextSibling();
}
else
{
it->next();
}
}

StatsCollectionAggregatesLoader aggregatesLoader(statsCollection);
Owned<IConstWUScopeIterator> iter = &workunit.getScopeIterator(filter);
ForEach(*iter)
iter->playProperties(aggregatesLoader);
}

void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType)
// Replace the stats at the specified scope level
void StatisticsAggregator::recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned sgId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it stands sourceStats must be subgraph stats - anything else would be invalid given the explicitly passed in { wfid, graphName, sgId }..

I think you can pass in just sourceStats and extract the scope instead (sourceStats->getFullStats).
Then use that to ensureScope, instead of 'IStatisticCollection::setStatistic' that currently does via StatsCollectionAggregatesLoader::noteStatistic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't think 'recordStats' needs or should be part of CStatisticCollection.
I think the only methods that are needed in CStatisticCollection so that can be used by the aggregate class are

  1. a method that ensures the scope exists.
  2. a way to set/check dirty,
  3. and a way to update an [aggregte] stats.

then this method can be something like:

void StatisticsAggregator::copyAggregates(IStatisticCollection * sourceStats) 
{
    StringBuffer scope;
    sourceStats->getFullScope(scope);

    IStatisticCollection * targetStats = statsCollection->ensureScope(scope); // create intermediate scopes and returns tail
    bool wasUpdated = false;
    
    // More efficient to iterate over stats rather than mapping...
    unsigned numStats = sourceStats->getNumStatistics();
    for (unsigned s=0; s<numStats; s++)
    {
        StatisticKind kind;
        unsigned __int64 value;
        sourceStats->getStatistic(kind, value, s);
        if (kind != (StatisticKind)(kind & StKindMask))
            continue; // ignore variants
        if (mapping.hasKind(kind))
        {
            if (targetStats->updateStatistic(kind, value, StatsMergeReplace))
                wasUpdated=true;
        }
    }
    if (wasUpdated)
        targetStats->markDirty();

{
stat_type peakSizeSpill = 0;
gatherSpillSize(wu, scope, peakSizeSpill);
if (peakSizeSpill)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax);
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);
statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId});
}

// Recalculate aggregates and then write the aggregates to global stats (dali)
void StatisticsAggregator::updateAggregates(IWorkUnit *wu)
{
if (!statsCollection)
return;

AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
};

statsCollection->refreshAggregates(mapping, f);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see: https://github.com/hpcc-systems/HPCC-Platform/actions/runs/6968880782/job/18963669852
Failing to compile in Windows.

The std::function vs 'std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)>' I think.

It would be better if the callback function type was a typedef defined in jstats.h, and then used here.

}

//---------------------------------------------------------------------------------------------------------------------


Expand Down
16 changes: 13 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator
};

//---------------------------------------------------------------------------------------------------------------------

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1725,8 +1724,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down Expand Up @@ -1785,4 +1782,17 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);


class WORKUNIT_API StatisticsAggregator : public CInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the comment in cpp, re. moving StatisticsAggregator into jstats (and extracting helper functions that need wu)

{
public:
StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {}
void loadExistingAggregates(const IConstWorkUnit &workunit);
void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId);
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
const StatisticsMapping & mapping;
};

#endif
2 changes: 2 additions & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0;
};

#endif // AGENTCTX_HPP_INCL
21 changes: 8 additions & 13 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class EclAgentPluginCtx : public SimplePluginCtx
//=======================================================================================

EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), statsAggregator(stdAggregateKindStatistics)
{
isAborting = false;
isStandAloneExe = false;
Expand Down Expand Up @@ -1988,10 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
if (diskAccessCost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(w, nullptr, SSTglobal);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2245,12 +2241,15 @@ void EclAgent::runProcess(IEclProcess *process)
ForEachItemIn(i2, queryLibraries)
queryLibraries.item(i2).destroyGraph();

if (rowManager)
{
WorkunitUpdate wu = updateWorkUnit();
WuStatisticTarget statsTarget(wu, "eclagent");
rowManager->reportPeakStatistics(statsTarget, 0);
rowManager->getMemoryUsage();//Causes statistics to be written to logfile
updateAggregates(wu);
if (rowManager)
{
WuStatisticTarget statsTarget(wu, "eclagent");
rowManager->reportPeakStatistics(statsTarget, 0);
rowManager->getMemoryUsage();//Causes statistics to be written to logfile
}
}

rowManager.clear(); // Must go before the allocatorCache
Expand Down Expand Up @@ -2513,10 +2512,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
if (diskAccessCost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(wu, scope, SSTworkflow);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
21 changes: 19 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,15 @@ public:
virtual double queryAgentMachineCost() const override
{
return ctx->queryAgentMachineCost();
};
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
ctx->updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
ctx->mergeAggregatorStats(stats, wfid, graphname, sgId);
}

protected:
IAgentContext * ctx;
Expand Down Expand Up @@ -392,6 +400,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
StatisticsAggregator statsAggregator;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +714,14 @@ public:
{
return agentMachineCost;
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
statsAggregator.updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
statsAggregator.recordStats(&stats, wfid, graphname, sgId);
}
};

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1055,7 +1072,7 @@ public:
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down
28 changes: 16 additions & 12 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,12 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id);
if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
agent->updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand All @@ -897,10 +899,6 @@ void EclSubGraph::updateProgress()
if (cost)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
}
Owned<IStatisticCollection> statsCollection = stats.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ;
if (costDiskAccess)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}
}
}
Expand All @@ -927,6 +925,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
}
ForEachItemIn(i2, subgraphs)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means that the cost will be recorded in the subgraphs stats rather than the global stats as it was before. Does that mean it will not be picked up by the stats aggregator?
If you cannot restart a graph I don't think it makes a difference, but worth discussing - it may make a difference for eclwatch responsiveness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the aggregated total should still be published at the global level after this change (unless I'm missing something),
but instead of it being calculated and set directly into global stats, it is calculated and set indirectly via the statsAggregator (inside updateProgress).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this statistic wasn't picked up by the aggregator. It needs to be in the subgraph scope. However, the aggregator will add the statistic back in as a global statistic.

}

bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies)
Expand Down Expand Up @@ -1277,10 +1280,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope);
if (costDiskAccess)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1349,8 +1348,12 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
cur.updateProgress(progress->queryStatsBuilder());

Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
IStatisticGatherer & stats = progress->queryStatsBuilder();
cur.updateProgress(stats);
Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id);
}
}

Expand Down Expand Up @@ -1492,7 +1495,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return wu->updateStats(queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down Expand Up @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
statsAggregator.loadExistingAggregates(*wu);
return eclGraph.getClear();
}

Expand Down
Loading
Loading