From e786af8e4d3d666eb0adcee9904f30ce56a1e8c9 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Mon, 30 Oct 2023 16:38:39 +0000 Subject: [PATCH] HPCC-29657 Create global stats in AgentContext so it's shared with all graphs and other minor changes Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 80 ++++++++++++++++--------------- ecl/eclagent/agentctx.hpp | 2 + ecl/eclagent/eclagent.ipp | 20 +++++++- ecl/eclagent/eclgraph.cpp | 9 ++-- system/jlib/jstats.cpp | 12 ++--- system/jlib/jstats.h | 2 +- thorlcr/master/thdemonserver.cpp | 24 +++++++--- thorlcr/master/thgraphmanager.cpp | 2 +- 8 files changed, 91 insertions(+), 60 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 9b64f4c9f20..aa1ccfc4980 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2680,47 +2680,50 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap } loadGlobalAggregates(workunit); - Owned root = getWUGraphProgress(wuid, true); - if (root) + if (graphName) { - Owned graphPT = root->getPropTree(graphName); - if (!graphPT) - return; - - StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); - StatsScopeId graphScopeId(SSTgraph, graphName); - - Owned iter = graphPT->getElements("*"); - ForEach(*iter) + Owned root = getWUGraphProgress(wuid, true); + if (root) { - StatsScopeId sgScopeId; - IPropertyTree * sgPT = & iter->query(); - const char * sgName = sgPT->queryName(); - if (strcmp(sgName, "node")==0) - continue; - verifyex(sgScopeId.setScopeText(sgName)); - MemoryBuffer compressed; - sgPT->getPropBin("Stats", compressed); - if (!compressed.length()) - break; - MemoryBuffer serialized; - decompressToBuffer(serialized, compressed); + Owned graphPT = root->getPropTree(graphName); + if (!graphPT) + return; - unsigned version; - serialized.read(version); - byte kind; - serialized.read(kind); + StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); + StatsScopeId graphScopeId(SSTgraph, graphName); - StatsScopeId childId; - childId.deserialize(serialized, version); - int statsMinDepth = 0, statsMaxDepth = INT_MAX; - if (aggregatesOnly) + Owned iter = graphPT->getElements("*"); + ForEach(*iter) { - // Only store stats for subgraph level - statsMinDepth = 3; // this is subgraph level - statsMaxDepth = 3; + StatsScopeId sgScopeId; + IPropertyTree * sgPT = & iter->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + verifyex(sgScopeId.setScopeText(sgName)); + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + break; + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId childId; + childId.deserialize(serialized, version); + int statsMinDepth = 0, statsMaxDepth = INT_MAX; + if (aggregatesOnly) + { + // Only store stats for subgraph level + statsMinDepth = 3; // this is subgraph level + statsMaxDepth = 3; + } + statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); } - statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); } } } @@ -2763,13 +2766,12 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) // if clearStats==true then the existing stats are cleared for the given scope and descendants IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats) { - bool wasCreated; - StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(graphName)); StatsScopeId wfScopeId(SSTworkflow, wfid); StatsScopeId sgScopeId(SSTsubgraph, sgId); + bool wasCreated; IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated); if (clearStats && !wasCreated) sgScopeCollection->clearStats(); @@ -2807,7 +2809,7 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) { StatisticKind kind = aggregateKindsMapping.getKind(i); stat_type value; - if (cur.getStatistic(kind, value) && value) + if (cur.getStatistic(kind, value)) { StringBuffer s; wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); @@ -2822,7 +2824,7 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) } } }; - if (refreshAggregates()) // Only serialize if the aggregates has changed + if (refreshAggregates()) // Only serialize if the aggregates have changed { StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping); statsCollection->visit(statsAggregatorWriter); diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..9d142312269 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; + virtual void updateAggregates(IWorkUnit* lockedwu) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 20b73dced30..db29e9292a8 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -250,6 +250,14 @@ public: { return ctx->queryAgentMachineCost(); }; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph); + }; + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + ctx->updateAggregates(lockedwu); + } protected: IAgentContext * ctx; @@ -392,6 +400,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + GlobalStatisticCollection globalStats; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +714,15 @@ public: { return agentMachineCost; } + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats + return wuRead->updateStats (graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection); + } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + globalStats.updateAggregates(lockedwu); + } }; //--------------------------------------------------------------------------- @@ -1048,7 +1066,6 @@ public: graphAgentContext.set(&_agent); agent = &graphAgentContext; aborted = false; - globalStats.load(*wu, graphName, true); } void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml); @@ -1084,7 +1101,6 @@ protected: IProbeManager * probeManager; unsigned wfid; bool aborted; - GlobalStatisticCollection globalStats; }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index ced91432092..17eefa66907 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -1347,8 +1347,7 @@ void EclGraph::updateLibraryProgress() EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear()); + Owned progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1491,13 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear()); + return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph); } void EclGraph::updateAggregates(IWorkUnit* lockedwu) { - globalStats.updateAggregates(lockedwu); + agent->updateAggregates(lockedwu); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) @@ -1549,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); + globalStats.load(*wu, graphName, true); return eclGraph.getClear(); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index bfe07007664..e613b45d9f1 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1330,7 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); -const StatisticsMapping aggregateKindStatistics({StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); +const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1872,8 +1872,7 @@ class CStatisticCollection : public CInterfaceOf { if (*scope=='\0') { - updateStatistic(kind, value, StatsMergeReplace); - return true; + return updateStatistic(kind, value, StatsMergeReplace); } else { @@ -1970,7 +1969,7 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); return true; } - virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override { bool wasCreated; return ensureSubScope(search, hasChildren, wasCreated); @@ -1988,7 +1987,8 @@ class CStatisticCollection : public CInterfaceOf wasCreated = true; return ret; } - virtual IStatisticCollection * querySubScope(const StatsScopeId & search) override + + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override { return children.find(&search); } @@ -2008,7 +2008,7 @@ class CStatisticCollection : public CInterfaceOf { curScope = curScope->querySubScope(scopeItem); if (!curScope) - break; + return nullptr; } return curScope; } diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index edf369ffcaf..0855c1136a6 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -141,8 +141,8 @@ interface IStatisticCollection : public IInterface virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0; - virtual IStatisticCollection * querySubScope(const StatsScopeId & search) = 0; virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0; virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void pruneChildStats() = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 0c2b12ddb4c..7c66ce7c0e9 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -144,7 +144,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { CGraphBase &graph = activeGraphs.item(g); Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection.getClear()); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -153,8 +153,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer CGraphBase &graph = activeGraphs.item(g2); unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); - // Prune subgraph descendant stats as they have been serialized and no longer needed. - globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); } updateAggregates(wu); queryServerStatus().commitProperties(); @@ -176,11 +174,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned wfid = graph->queryJob().getWfid(); { Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection.getClear()); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), graph); } - // Prune subgraph descendant stats as they have been serialized and no longer needed. - globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); Owned wu = ¤tWU.lock(); if (startTimeStamp) { @@ -293,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { unsigned startTime = graphStarts.item(g); reportGraph(graph, true, success, startTime, 0); + // Prune subgraph descendant stats as they have been serialized and no longer needed. + const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName(); + unsigned wfid = graph->queryJob().getWfid(); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); activeGraphs.remove(g); graphStarts.remove(g); } @@ -301,6 +301,18 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { synchronized block(mutex); reportActiveGraphs(true, false); + if (activeGraphs.ordinality()) + { + + CJobBase & activeJob = activeGraphs.item(0).queryJob(); + const char *graphName = ((CJobMaster &)activeJob).queryGraphName(); + unsigned wfid = activeJob.getWfid(); + ForEachItemIn (g, activeGraphs) + { + CGraphBase &graph = activeGraphs.item(g); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); + } + } activeGraphs.kill(); } virtual void updateAggregates(IWorkUnit * lockedWu) override diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 11db5322847..84d66c30898 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1118,7 +1118,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, podInfo.report(wu); } if (globals->getPropBool("@watchdogProgressEnabled")) - queryDeMonServer()->loadStats(workunit, graphName, true); + queryDeMonServer()->loadStats(workunit, nullptr, true); //graphName==nullptr so that sg stats are not loaded as it's not possible to resume graph part the way through setWuid(workunit.queryWuid(), workunit.queryClusterName());