Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #17786

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 8 additions & 96 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,7 @@ GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(a
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly)
void GlobalStatisticCollection::loadExistingAggregates(IConstWorkUnit &workunit)
{
const char * _wuid = workunit.queryWuid();
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit
Expand All @@ -2679,57 +2679,6 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
wuid.set(_wuid);
}

loadGlobalAggregates(workunit);
if (isEmptyString(graphName))
{
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (root)
{
Owned<IPropertyTree> graphPT = root->getPropTree(graphName);
if (!graphPT)
return;

StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0));
StatsScopeId graphScopeId(SSTgraph, graphName);

Owned<IPropertyTreeIterator> iter = graphPT->getElements("*");
ForEach(*iter)
{
StatsScopeId sgScopeId;
IPropertyTree * sgPT = & iter->query();
const char * sgName = sgPT->queryName();
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));
MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
break;
MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);

unsigned version;
serialized.read(version);
byte kind;
serialized.read(kind);

StatsScopeId childId;
childId.deserialize(serialized, version);
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
if (aggregatesOnly)
{
// Only store stats for subgraph level
statsMinDepth = 3; // this is subgraph level
statsMaxDepth = 3;
}
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
}
}
}
}

void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)
{
class StatsCollectionAggregatesLoader : public IWuScopeVisitor
{
public:
Expand Down Expand Up @@ -2781,57 +2730,20 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(Statist
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. It is adding a "Root" scope with an id of wfid underneath a scope of workflow/graph/subgraph. So effectively in the collection it is global->wfid->graph->subgraph->wfid->.
It make me more convinced it is a mistake to try and merge these two concepts.
It also means roxie works rather differently from eclagent/thor.

}

// Recalculate aggregates for global, workflow and graph scopes
bool GlobalStatisticCollection::refreshAggregates()
{
return statsCollection->refreshAggregates(aggregateKindsMapping);
}

// Recalculate aggregates and then write the aggregates to global stats (dali)
void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
{
class StatisticsAggregatesWriter : implements IStatisticVisitor
struct AggregateUpdatedCallBackFunc : implements IWhenAggregateUpdatedCallBack
{
const StatisticsMapping & aggregateKindsMapping;
const unsigned numStats;
Linked<IWorkUnit> wu;
public:
StatisticsAggregatesWriter(IWorkUnit * _wu, const StatisticsMapping & _aggregateKindsMapping): wu(_wu), aggregateKindsMapping(_aggregateKindsMapping), numStats(aggregateKindsMapping.numStatistics()) {}

virtual bool visitScope(const IStatisticCollection & cur)
AggregateUpdatedCallBackFunc(IWorkUnit *_wu) : wu(_wu) {}
void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
switch (cur.queryScopeType())
{
case SSTglobal:
case SSTworkflow:
case SSTgraph:
for (unsigned i=0; i<numStats; ++i)
{
StatisticKind kind = aggregateKindsMapping.getKind(i);
stat_type value;
if (cur.getStatistic(kind, value))
{
if (value || includeStatisticIfZero(kind))
{
StringBuffer s;
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace);
}
}
}
if (cur.queryScopeType()==SSTgraph)
return false;
else
return true;
default:
return false;
}
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
}
};
if (refreshAggregates()) // Only serialize if the aggregates have changed
{
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
}
} aggregateUpdatedCallBackFunc(wu);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be simpler to use a std::function, e.g.:

    AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
    {
        wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
    };
    statsCollection->refreshAggregates(aggregateKindsMapping, f);


statsCollection->refreshAggregates(aggregateKindsMapping, aggregateUpdatedCallBackFunc);
}

// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation)
Expand Down
4 changes: 1 addition & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1788,10 +1788,8 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
void loadExistingAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId);
Expand Down
2 changes: 1 addition & 1 deletion ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
};

Expand Down
10 changes: 5 additions & 5 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ public:
{
return ctx->queryAgentMachineCost();
};
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph);
return ctx->updateStats(activeWfid, graphName, subgraph);
};
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
Expand Down Expand Up @@ -714,10 +714,10 @@ public:
{
return agentMachineCost;
}
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats(graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection);
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats(graphName, queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, subgraph, false, sgCollection);
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
Expand Down
8 changes: 4 additions & 4 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection);
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is here. I assume it is setting the total file access cost from all the activities. My suspicion is this code isn't quite in the right place. Something like
statscollection->aggregateChildren(StCostFileAccess);
instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing relocated from EclSubGraph::updateProgress() line 900. I hadn't considered changing how it was working.

I'll modify to add a member function to IStatisticCollection and use that.

}
Expand Down Expand Up @@ -1347,7 +1347,7 @@ void EclGraph::updateLibraryProgress()
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();

Owned<IWUGraphStats> progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id);
Owned<IWUGraphStats> progress = agent->updateStats(wfid, queryGraphName(), cur.id);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1490,7 +1490,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph);
return agent->updateStats(activeWfid, queryGraphName(), subgraph);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
Expand Down Expand Up @@ -1547,7 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
globalStats.load(*wu, nullptr, true);
globalStats.loadExistingAggregates(*wu);
return eclGraph.getClear();
}

Expand Down
Loading
Loading