Skip to content

Commit

Permalink
HPCC-29657 Create global stats in AgentContext so it's shared with al…
Browse files Browse the repository at this point in the history
…l graphs and other minor changes

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 31, 2023
1 parent e468996 commit 1925dc9
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 60 deletions.
80 changes: 41 additions & 39 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2680,47 +2680,50 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
}

loadGlobalAggregates(workunit);
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (root)
if (isEmptyString(graphName))
{
Owned<IPropertyTree> graphPT = root->getPropTree(graphName);
if (!graphPT)
return;

StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0));
StatsScopeId graphScopeId(SSTgraph, graphName);

Owned<IPropertyTreeIterator> iter = graphPT->getElements("*");
ForEach(*iter)
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (root)
{
StatsScopeId sgScopeId;
IPropertyTree * sgPT = & iter->query();
const char * sgName = sgPT->queryName();
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));
MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
break;
MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);
Owned<IPropertyTree> graphPT = root->getPropTree(graphName);
if (!graphPT)
return;

unsigned version;
serialized.read(version);
byte kind;
serialized.read(kind);
StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0));
StatsScopeId graphScopeId(SSTgraph, graphName);

StatsScopeId childId;
childId.deserialize(serialized, version);
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
if (aggregatesOnly)
Owned<IPropertyTreeIterator> iter = graphPT->getElements("*");
ForEach(*iter)
{
// Only store stats for subgraph level
statsMinDepth = 3; // this is subgraph level
statsMaxDepth = 3;
StatsScopeId sgScopeId;
IPropertyTree * sgPT = & iter->query();
const char * sgName = sgPT->queryName();
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));
MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
break;
MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);

unsigned version;
serialized.read(version);
byte kind;
serialized.read(kind);

StatsScopeId childId;
childId.deserialize(serialized, version);
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
if (aggregatesOnly)
{
// Only store stats for subgraph level
statsMinDepth = 3; // this is subgraph level
statsMaxDepth = 3;
}
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
}
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
}
}
}
Expand Down Expand Up @@ -2763,13 +2766,12 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)
// if clearStats==true then the existing stats are cleared for the given scope and descendants
IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats)
{
bool wasCreated;

StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);

bool wasCreated;
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated);
if (clearStats && !wasCreated)
sgScopeCollection->clearStats();
Expand Down Expand Up @@ -2807,7 +2809,7 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
{
StatisticKind kind = aggregateKindsMapping.getKind(i);
stat_type value;
if (cur.getStatistic(kind, value) && value)
if (cur.getStatistic(kind, value))
{
StringBuffer s;
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace);
Expand All @@ -2822,7 +2824,7 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
}
}
};
if (refreshAggregates()) // Only serialize if the aggregates has changed
if (refreshAggregates()) // Only serialize if the aggregates have changed
{
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
Expand Down
2 changes: 2 additions & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
};

#endif // AGENTCTX_HPP_INCL
20 changes: 18 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ public:
{
return ctx->queryAgentMachineCost();
};
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph);
};
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
ctx->updateAggregates(lockedwu);
}

protected:
IAgentContext * ctx;
Expand Down Expand Up @@ -392,6 +400,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
GlobalStatisticCollection globalStats;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +714,15 @@ public:
{
return agentMachineCost;
}
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats (graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection);
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
globalStats.updateAggregates(lockedwu);
}
};

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1048,7 +1066,6 @@ public:
graphAgentContext.set(&_agent);
agent = &graphAgentContext;
aborted = false;
globalStats.load(*wu, graphName, true);
}

void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml);
Expand Down Expand Up @@ -1084,7 +1101,6 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection globalStats;
};


Expand Down
9 changes: 4 additions & 5 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,7 @@ void EclGraph::updateLibraryProgress()
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();

Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear());
Owned<IWUGraphStats> progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1491,13 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear());
return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
globalStats.updateAggregates(lockedwu);
agent->updateAggregates(lockedwu);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down Expand Up @@ -1549,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
globalStats.load(*wu, graphName, true);
return eclGraph.getClear();
}

Expand Down
12 changes: 6 additions & 6 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk
const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles});
const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles});
const StatisticsMapping aggregateKindStatistics({StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});
const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});

const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode)
{
Expand Down Expand Up @@ -1872,8 +1872,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
if (*scope=='\0')
{
updateStatistic(kind, value, StatsMergeReplace);
return true;
return updateStatistic(kind, value, StatsMergeReplace);
}
else
{
Expand Down Expand Up @@ -1970,7 +1969,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
stats.append(s);
return true;
}
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren)
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override
{
bool wasCreated;
return ensureSubScope(search, hasChildren, wasCreated);
Expand All @@ -1988,7 +1987,8 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
wasCreated = true;
return ret;
}
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) override

virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override
{
return children.find(&search);
}
Expand All @@ -2008,7 +2008,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
curScope = curScope->querySubScope(scopeItem);
if (!curScope)
break;
return nullptr;
}
return curScope;
}
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ interface IStatisticCollection : public IInterface
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0;
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) = 0;
virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<const StatsScopeId> path, bool & wasCreated) = 0;
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0;
virtual IStatisticCollection * querySubScopePath(std::initializer_list<const StatsScopeId> path) = 0;
virtual void pruneChildStats() = 0;
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
Expand Down
24 changes: 18 additions & 6 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
CGraphBase &graph = activeGraphs.item(g);
Owned<IStatisticCollection> sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection.getClear());
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection);
reportGraph(stats->queryStatsBuilder(), &graph);
}
Owned<IWorkUnit> wu = &currentWU.lock();
Expand All @@ -153,8 +153,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
CGraphBase &graph = activeGraphs.item(g2);
unsigned startTime = graphStarts.item(g2);
reportStatus(wu, graph, startTime, finished, success);
// Prune subgraph descendant stats as they have been serialized and no longer needed.
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId());
}
updateAggregates(wu);
queryServerStatus().commitProperties();
Expand All @@ -176,11 +174,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
unsigned wfid = graph->queryJob().getWfid();
{
Owned<IStatisticCollection> sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection.getClear());
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection);
reportGraph(stats->queryStatsBuilder(), graph);
}
// Prune subgraph descendant stats as they have been serialized and no longer needed.
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId());
Owned<IWorkUnit> wu = &currentWU.lock();
if (startTimeStamp)
{
Expand Down Expand Up @@ -293,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
unsigned startTime = graphStarts.item(g);
reportGraph(graph, true, success, startTime, 0);
// Prune subgraph descendant stats as they have been serialized and no longer needed.
const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName();
unsigned wfid = graph->queryJob().getWfid();
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId());
activeGraphs.remove(g);
graphStarts.remove(g);
}
Expand All @@ -301,6 +301,18 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
synchronized block(mutex);
reportActiveGraphs(true, false);
if (activeGraphs.ordinality())
{

CJobBase & activeJob = activeGraphs.item(0).queryJob();
const char *graphName = ((CJobMaster &)activeJob).queryGraphName();
unsigned wfid = activeJob.getWfid();
ForEachItemIn (g, activeGraphs)
{
CGraphBase &graph = activeGraphs.item(g);
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId());
}
}
activeGraphs.kill();
}
virtual void updateAggregates(IWorkUnit * lockedWu) override
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
podInfo.report(wu);
}
if (globals->getPropBool("@watchdogProgressEnabled"))
queryDeMonServer()->loadStats(workunit, graphName, true);
queryDeMonServer()->loadStats(workunit, nullptr, true); //graphName==nullptr so that sg stats are not loaded(at present, partial graph resumption not possible)

setWuid(workunit.queryWuid(), workunit.queryClusterName());

Expand Down

0 comments on commit 1925dc9

Please sign in to comment.