diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 64f569e5284..5daca9812df 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,14 +183,17 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * statsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + if (statsCollection) + collector.setown(createStatisticsGatherer(statsCollection)); + else + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } @@ -2657,85 +2660,104 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } return totalCost; } -} +}; -//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) -cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) +GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(aggregateKindStatistics) { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - filter.addScope(""); // Needed to match scope - // when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost - // (Costs from child graphs and activities should have been summed up to graph/subgraph level already) - // when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost - // (Costs from all levels below graph should be summed upto at least graph level already) - // i.e. need 2 levels of nesting - filter.setIncludeNesting(2); - // includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", - // causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" - filter.addSource("global"); - filter.addOutputStatistic(StCostFileAccess); - filter.addRequiredStat(StCostFileAccess); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - cost_type totalCost = 0; - for (it->first(); it->isValid(); ) - { - cost_type value = 0; - if (it->getStat(StCostFileAccess, value)) - { - totalCost += value; - it->nextSibling(); - } - else - { - it->next(); - } - } - return totalCost; + // Construct statsCollection here as GlobalStatisticCollection::load() is optional + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); } -void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) +void GlobalStatisticCollection::loadExistingAggregates(IConstWorkUnit &workunit) { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else + const char * _wuid = workunit.queryWuid(); + if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit { - filter.addScope(""); - filter.addSource("global"); + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); + wuid.set(_wuid); } - filter.setIncludeNesting(1); - filter.addOutputStatistic(StSizeGraphSpill); - filter.addRequiredStat(StSizeGraphSpill); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - peakSizeSpill = 0; - for (it->first(); it->isValid(); ) + + class StatsCollectionAggregatesLoader : public IWuScopeVisitor { - stat_type value = 0; - if (it->getStat(StSizeGraphSpill, value)) + public: + StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} + + virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override { - if (value>peakSizeSpill) - peakSizeSpill = value; - it->nextSibling(); + statsCollection->setStatistic(extra.queryScope(), kind, value); } - else + virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } + virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } + virtual void noteException(IConstWUException & exception) override { throwUnexpected(); } + private: + Linked statsCollection; + }; + + WuScopeFilter filter; + filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); + const unsigned numStats = aggregateKindsMapping.numStatistics(); + for (unsigned i=0; i iter = &workunit.getScopeIterator(filter); + ForEach(*iter) + iter->playProperties(aggregatesLoader); +} + +// getCollectionForUpdate() returns IStatisticCollection for the given subgraph +// if clearStats==true then the existing stats are cleared for the given scope and descendants +IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats) +{ + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + + bool wasCreated; + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated); + if (clearStats && !wasCreated) + sgScopeCollection->clearStats(); + // Marking the collection dirty here is not ideal. It would be better to have a call to IStatisticCollection::setStatistic mark the scope as dirty. + // However, this would be inefficient as each call to IStatisticCollection::setStatistic would require the dirty flag to be set for all parent scopes. + sgScopeCollection->markDirty(); + return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); +} + +// Recalculate aggregates and then write the aggregates to global stats (dali) +void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) +{ + struct AggregateUpdatedCallBackFunc : implements IWhenAggregateUpdatedCallBack + { + Linked wu; + AggregateUpdatedCallBackFunc(IWorkUnit *_wu) : wu(_wu) {} + void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) { - it->next(); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); } - } + } aggregateUpdatedCallBackFunc(wu); + + statsCollection->refreshAggregates(aggregateKindsMapping, aggregateUpdatedCallBackFunc); } -void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) +// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) +void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId) { - stat_type peakSizeSpill = 0; - gatherSpillSize(wu, scope, peakSizeSpill); - if (peakSizeSpill) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + IStatisticCollection * sgScopeCollection = statsCollection->querySubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (sgScopeCollection) + sgScopeCollection->pruneChildStats(); } + //--------------------------------------------------------------------------------------------------------------------- @@ -3805,8 +3827,8 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } protected: @@ -3820,8 +3842,8 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } protected: @@ -4125,9 +4147,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { - return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); + return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) { @@ -4437,8 +4459,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override - { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override + { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } virtual IStringVal & getAbortBy(IStringVal & str) const @@ -10268,9 +10290,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const { - return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); + return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } void CLocalWUGraph::setName(const char *str) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 60c06b5a56d..5140b4d4309 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator }; //--------------------------------------------------------------------------------------------------------------------- - //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1302,7 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1725,8 +1724,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); -extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 @@ -1785,4 +1782,21 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName); + +class WORKUNIT_API GlobalStatisticCollection : public CInterface +{ +public: + GlobalStatisticCollection(); + + void loadExistingAggregates(IConstWorkUnit &workunit); + IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats); + IStatisticCollection * queryCollection() { return statsCollection; } + void updateAggregates(IWorkUnit *wu); + void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId); +private: + Owned statsCollection; + const StatisticsMapping & aggregateKindsMapping; + StringBuffer wuid; +}; + #endif diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 4f166ea67fb..46ea428c84a 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..f55331677dc 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; + virtual void updateAggregates(IWorkUnit* lockedwu) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index dc22e1f69dd..b2b1b356a95 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,10 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); - if (diskAccessCost) - w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(w, nullptr, SSTglobal); addTimings(w); switch (w->getState()) @@ -2513,10 +2509,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); - if (diskAccessCost) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, scope, SSTworkflow); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..8ffcc2b9cfb 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -250,6 +250,14 @@ public: { return ctx->queryAgentMachineCost(); }; + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + return ctx->updateStats(activeWfid, graphName, subgraph); + }; + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + ctx->updateAggregates(lockedwu); + } protected: IAgentContext * ctx; @@ -392,6 +400,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + GlobalStatisticCollection globalStats; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +714,15 @@ public: { return agentMachineCost; } + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + Owned sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, graphName, subgraph, true); // true=>clear existing stats + return wuRead->updateStats(graphName, queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, subgraph, false, sgCollection); + } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + globalStats.updateAggregates(lockedwu); + } }; //--------------------------------------------------------------------------- @@ -1055,7 +1073,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index d4ffdd99c42..224f4dd0a9b 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + parent.updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -897,10 +897,6 @@ void EclSubGraph::updateProgress() if (cost) lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); } - Owned statsCollection = stats.getResult(); - const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ; - if (costDiskAccess) - lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } } } @@ -927,6 +923,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) } ForEachItemIn(i2, subgraphs) subgraphs.item(i2).updateProgress(progress); + + Owned statsCollection = progress.getResult(); + const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess); + if (costDiskAccess) + progress.addStatistic(StCostFileAccess, costDiskAccess); } bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies) @@ -1277,10 +1278,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } if (agent->queryRemoteWorkunit()) @@ -1349,7 +1346,8 @@ void EclGraph::updateLibraryProgress() { EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); + + Owned progress = agent->updateStats(wfid, queryGraphName(), cur.id); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1492,7 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return agent->updateStats(activeWfid, queryGraphName(), subgraph); +} + +void EclGraph::updateAggregates(IWorkUnit* lockedwu) +{ + agent->updateAggregates(lockedwu); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); + globalStats.loadExistingAggregates(*wu); return eclGraph.getClear(); } diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..94f8439e7ad 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { } @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { - return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge); + return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 8ebcdcadef5..dd2874b9e77 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1330,6 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); +const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1429,6 +1430,8 @@ StringBuffer & StatsScopeId::getScopeText(StringBuffer & out) const return out.append(ChannelScopePrefix).append(id); case SSTunknown: return out.append(name); + case SSTglobal: + return out; default: #ifdef _DEBUG throwUnexpected(); @@ -1751,7 +1754,7 @@ static CStatisticCollection * deserializeCollection(CStatisticCollection * paren //MORE: Create an implementation with no children typedef StructArrayOf StatsArray; -class CollectionHashTable : public SuperHashTableOf +class CollectionHashTable : public SuperHashTableOf { public: ~CollectionHashTable() { _releaseAll(); } @@ -1782,37 +1785,20 @@ class SortedCollectionIterator : public ArrayIIteratorOf { friend class CollectionHashTable; + public: - CStatisticCollection(CStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) + CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) { } - CStatisticCollection(CStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) + CStatisticCollection(IStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) { id.deserialize(in, version); - - unsigned numStats; - in.read(numStats); - stats.ensureCapacity(numStats); - while (numStats-- > 0) - { - Statistic next (in, version); - stats.append(next); - } - - unsigned numChildren; - in.read(numChildren); - children.ensure(numChildren); - while (numChildren-- > 0) - { - CStatisticCollection * next = deserializeCollection(this, in, version); - children.add(*next); - } + deserialize(in, version); } virtual byte getCollectionType() const { return SCintermediate; } - //interface IStatisticCollection: virtual StringBuffer &toXML(StringBuffer &out) const override; virtual StatisticScopeType queryScopeType() const override @@ -1834,7 +1820,8 @@ class CStatisticCollection : public CInterfaceOf if (parent) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1862,6 +1849,39 @@ class CStatisticCollection : public CInterfaceOf } return false; } + virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const override + { + bool found = false; + ForEachItemIn(i, stats) + { + const Statistic & cur = stats.item(i); + if (cur.kind == kind) + { + value += cur.value; + found = true; + } + } + return found; + } + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) override + { + if (*scope=='\0') + { + return updateStatistic(kind, value, StatsMergeReplace); + } + else + { + StatsScopeId childScopeId; + const char * next; + if (!childScopeId.setScopeText(scope, &next) || (*next!=':' && *next!='\0')) + throw makeStringExceptionV(JLIBERR_UnexpectedValue, "'%s' does not appear to be a valid scope id", scope); + IStatisticCollection * child = ensureSubScope(childScopeId, true); + + if (*next==':') + next++; + return child->setStatistic(next, kind, value); + } + } virtual unsigned getNumStatistics() const override { return stats.ordinality(); @@ -1916,13 +1936,13 @@ class CStatisticCollection : public CInterfaceOf } //other public interface functions - void addStatistic(StatisticKind kind, unsigned __int64 value) + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { Statistic s(kind, value); stats.append(s); } - void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1931,28 +1951,71 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } - - CStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override + { + bool wasCreated; + return ensureSubScope(search, hasChildren, wasCreated); + } + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) override { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. - CStatisticCollection * match = children.find(&search); + wasCreated = false; + IStatisticCollection * match = children.find(&search); if (match) return match; - CStatisticCollection * ret = new CStatisticCollection(this, search); + IStatisticCollection * ret = new CStatisticCollection(this, search); children.add(*ret); + wasCreated = true; return ret; } - virtual void serialize(MemoryBuffer & out) const + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override + { + return children.find(&search); + } + + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) override + { + IStatisticCollection * curScope = this; + for (const auto & scopeItem: path) + curScope = curScope->ensureSubScope(scopeItem, true, wasCreated); // n.b. this will always return a valid pointer + return curScope; + } + + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (const auto & scopeItem: path) + { + curScope = curScope->querySubScope(scopeItem); + if (!curScope) + return nullptr; + } + return curScope; + } + + // Note, once this is called child scope pointers will be invalid + virtual void pruneChildStats() override + { + children.kill(); + } + + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1967,6 +2030,35 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } + virtual void deserialize(MemoryBuffer & in, unsigned version) override + { + unsigned numStats; + in.read(numStats); + stats.ensureCapacity(numStats); + while (numStats-- > 0) + { + Statistic next(in, version); + stats.append(next); + } + unsigned numChildren; + in.read(numChildren); + children.ensure(numChildren); + while (numChildren-- > 0) + { + byte kind; + in.read(kind); + StatsScopeId childId; + childId.deserialize(in, version); + deserializeChild(childId, in, version); + } + } + + virtual void deserializeChild(const StatsScopeId & childId, MemoryBuffer & in, unsigned version) override + { + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version); + } + inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -1994,12 +2086,121 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } + virtual bool refreshAggregates(const StatisticsMapping & mapping, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) override + { + if (isDirty==false) + return false; + CRuntimeStatisticCollection totals(mapping); + Owned totalUpdated = createBitSet(mapping.numStatistics()); + return refreshAggregates(totals, *totalUpdated, fWhenAggregateUpdated); + } + + virtual bool refreshAggregates(CRuntimeStatisticCollection & parentTotals, IBitSet & isTotalUpdated, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) override + { + const StatisticsMapping & mapping = parentTotals.queryMapping(); + // if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend) + // Also if at sg scope, do not descend as aggregates do not need to be generated from below sg level + // (if aggregates should be calculated from descendants of sg scope, then remove the second test) + if (isDirty==false || id.queryScopeType()==SSTsubgraph) + { + // return the aggregates at this scope level + ForEachItemIn(i, stats) + { + Statistic & stat = stats.element(i); + StatisticKind kind = stat.queryKind(); + if (kind != (StatisticKind)(kind & StKindMask)) + continue; // ignore variants->not supported by CRuntimeStatisticCollection + unsigned index = mapping.getIndex(kind); + if (index!=mapping.numStatistics()) + { + // Totals required from this level by parent, even if they are not dirty + parentTotals.mergeStatistic(kind, stat.queryValue()); + if (isDirty) + isTotalUpdated.set(index); + } + } + if (isDirty) + { + isDirty=false; + return true; + } + else + return false; + } + else + { + // descend down to lower level to obtain totals required for aggregation and then aggregate + CRuntimeStatisticCollection childTotals(mapping); + Owned childTotalUpdated = createBitSet(mapping.numStatistics()); + for (auto & child : children) + { + child.refreshAggregates(childTotals, *childTotalUpdated, fWhenAggregateUpdated); + } + // 1) Set any values that has changed for this scope and 2) update ALL totals for parent + const unsigned numStats = mapping.numStatistics(); + for (unsigned i=0; itest(i)) + { + if (updateStatistic(kind, value, StatsMergeReplace)) + { + if (value || includeStatisticIfZero(kind)) + { + StringBuffer s; + (fWhenAggregateUpdated)(getFullScope(s).str(), queryScopeType(), kind, value); + isTotalUpdated.set(i); + } + } + } + // All totals still required at parent level (even unchanged totals) + parentTotals.mergeStatistic(kind, value); + } + isDirty=false; + return true; + } + } + + virtual void clearStats() override + { + stats.clear(true); + // Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid + SuperHashIteratorOf iter(children, false); + for (iter.first(); iter.isValid(); iter.next()) + iter.query().clearStats(); + } + + virtual void addChild(IStatisticCollection *stats) override + { + children.add(*LINK(stats)); + } + + virtual void markDirty() override + { + isDirty=true; + if (parent) parent->markDirty(); + } + + virtual stat_type aggregateStatistic(StatisticKind kind) const override + { + stat_type sum; + if (!getStatisticSum(kind, sum)) // get sum of statistics at this level + { + // if no stats at this level, then get sum of stats from children + for (auto & child : children) + sum += child.aggregateStatistic(kind); + } + return sum; + } + private: StatsScopeId id; - CStatisticCollection * parent; + IStatisticCollection * parent; protected: CollectionHashTable children; StatsArray stats; + bool isDirty = false; // used to track which scope has changed (used to workout what aggregates to recalculate) }; StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const @@ -2071,12 +2272,18 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c class CRootStatisticCollection : public CStatisticCollection { public: - CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(NULL, _id), creatorType(_creatorType), creator(_creator) + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) + : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) + { + whenCreated = getTimeStampNowValue(); + IStatisticCollection * child = ensureSubScope(graphScopeId, true); + child->addChild(sgCollection); + } + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } - CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(NULL, in, version) + CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(nullptr, in, version) { byte creatorTypeByte; in.read(creatorTypeByte); @@ -2085,13 +2292,13 @@ class CRootStatisticCollection : public CStatisticCollection in.read(whenCreated); } - virtual byte getCollectionType() const { return SCroot; } + virtual byte getCollectionType() const override { return SCroot; } - virtual unsigned __int64 queryWhenCreated() const + virtual unsigned __int64 queryWhenCreated() const override { return whenCreated; } - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { CStatisticCollection::serialize(out); out.append((byte)creatorType); @@ -2113,52 +2320,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - -class StatAggregator : implements IStatisticVisitor -{ -public: - StatAggregator(StatisticKind _kind) : kind(_kind) {} - - virtual bool visitScope(const IStatisticCollection & cur) - { - switch (cur.queryScopeType()) - { - //If there is a match for the stat in any of these containers, then avoid summing any child scopes - case SSTglobal: - case SSTgraph: - case SSTsubgraph: - case SSTsection: - case SSTchildgraph: - case SSTworkflow: - { - stat_type value; - if (cur.getStatistic(kind, value)) - { - total += value; - return false; - } - return true; - } - //Default is to sum the value for this scope and children => recurse. E.g. activity and any child activities. - default: - total += cur.queryStatistic(kind); - return true; - } - } - stat_type getTotal() const { return total; } -private: - stat_type total = 0; - StatisticKind kind; -}; - - -stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection) -{ - StatAggregator aggregator(kind); - statsCollection->visit(aggregator); - return aggregator.getTotal(); -} - //--------------------------------------------------------------------------------------------------------------------- void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection) @@ -2174,7 +2335,6 @@ static CStatisticCollection * deserializeCollection(CStatisticCollection * paren switch (kind) { case SCroot: - assertex(!parent); return new CRootStatisticCollection(in, version); case SCintermediate: return new CStatisticCollection(parent, in, version); @@ -2190,49 +2350,59 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId) +{ + return new CStatisticCollection(parent, scopeId); +} + +IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) +{ + //creator unused at the moment. + return new CRootStatisticCollection(creatorType, creator, rootScope, graphScopeId, sgCollection); +} //-------------------------------------------------------------------------------------------------------------------- class StatisticGatherer : implements CInterfaceOf { public: - StatisticGatherer(CStatisticCollection * scope) : rootScope(scope) + StatisticGatherer(IStatisticCollection * scope) : rootScope(scope) { scopes.append(*scope); } virtual void beginScope(const StatsScopeId & id) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(id, true)); } virtual void beginActivityScope(unsigned id) override { StatsScopeId scopeId(SSTactivity, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginSubGraphScope(unsigned id) override { StatsScopeId scopeId(SSTsubgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginEdgeScope(unsigned id, unsigned oid) override { StatsScopeId scopeId(SSTedge, id, oid); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginChildGraphScope(unsigned id) override { StatsScopeId scopeId(SSTchildgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginChannelScope(unsigned id) override { StatsScopeId scopeId(SSTchannel, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void endScope() override @@ -2241,12 +2411,12 @@ class StatisticGatherer : implements CInterfaceOf } virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.addStatistic(kind, value); } virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.updateStatistic(kind, value, mergeAction); } virtual IStatisticCollection * getResult() override @@ -2255,15 +2425,18 @@ class StatisticGatherer : implements CInterfaceOf } protected: - ICopyArrayOf scopes; - Linked rootScope; + ICopyArrayOf scopes; + Linked rootScope; }; extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) { - //creator unused at the moment. - Owned rootCollection = new CRootStatisticCollection(creatorType, creator, rootScope); - return new StatisticGatherer(rootCollection); + return new StatisticGatherer(new CRootStatisticCollection(creatorType, creator, rootScope)); +} + +extern IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats) +{ + return new StatisticGatherer(stats); } //-------------------------------------------------------------------------------------------------------------------- @@ -2609,7 +2782,7 @@ StringBuffer & CRuntimeStatisticCollection::toStr(StringBuffer &str) const unsigned __int64 rawValue = getStatisticValue(rawKind); if (rawValue) value += convertMeasure(rawKind, kind, rawValue); - } + } if (value) { const char * name = queryStatisticName(serialKind); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index c86d18c1937..c6a4b9aec1d 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -25,6 +25,7 @@ #include #include "jstatcodes.h" +#include "jset.hpp" typedef unsigned __int64 stat_type; typedef unsigned __int64 cost_type; // Decimal currency amount multiplied by 10^6 @@ -104,6 +105,25 @@ interface IStatisticCollectionIterator; interface IStatisticGatherer; interface IStatisticVisitor; +enum StatsMergeAction +{ + StatsMergeKeepNonZero, + StatsMergeReplace, + StatsMergeSum, + StatsMergeMin, + StatsMergeMax, + StatsMergeAppend, + StatsMergeFirst, + StatsMergeLast, +}; + +interface IWhenAggregateUpdatedCallBack +{ + virtual void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) = 0; +}; + +class StatisticsMapping; +class CRuntimeStatisticCollection; interface IStatisticCollection : public IInterface { public: @@ -114,6 +134,8 @@ interface IStatisticCollection : public IInterface virtual unsigned getNumStatistics() const = 0; virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0; virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0; + virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const = 0; + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) = 0; virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; @@ -123,6 +145,22 @@ interface IStatisticCollection : public IInterface virtual StringBuffer &toXML(StringBuffer &out) const = 0; virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; + virtual void pruneChildStats() = 0; + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool refreshAggregates(const StatisticsMapping & mapping, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) = 0; + virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; + virtual void deserializeChild(const StatsScopeId & scopeId, MemoryBuffer & in, unsigned version) = 0; + virtual void clearStats() = 0; + virtual void addChild(IStatisticCollection *stats) = 0; + virtual void markDirty() = 0; + virtual stat_type aggregateStatistic(StatisticKind kind) const = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -134,17 +172,6 @@ interface IStatisticVisitor virtual bool visitScope(const IStatisticCollection & cur) = 0; // return true to iterate through children }; -enum StatsMergeAction -{ - StatsMergeKeepNonZero, - StatsMergeReplace, - StatsMergeSum, - StatsMergeMin, - StatsMergeMax, - StatsMergeAppend, - StatsMergeFirst, - StatsMergeLast, -}; interface IStatisticGatherer : public IInterface { @@ -493,6 +520,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics; extern const jlib_decl StatisticsMapping diskReadRemoteStatistics; extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics; extern const jlib_decl StatisticsMapping jhtreeCacheStatistics; +extern const jlib_decl StatisticsMapping aggregateKindStatistics; //--------------------------------------------------------------------------------------------------------------------- @@ -867,6 +895,7 @@ extern jlib_decl stat_type readStatisticValue(const char * cur, const char * * e extern jlib_decl unsigned __int64 mergeStatisticValue(unsigned __int64 prevValue, unsigned __int64 newValue, StatsMergeAction mergeAction); +extern jlib_decl bool includeStatisticIfZero(StatisticKind kind); extern jlib_decl StatisticMeasure queryMeasure(StatisticKind kind); extern jlib_decl const char * queryStatisticName(StatisticKind kind); extern jlib_decl void queryLongStatisticName(StringBuffer & out, StatisticKind kind); @@ -883,8 +912,11 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); +extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); +extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * sgCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } @@ -942,6 +974,5 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget }; extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value); -extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); - +extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root); #endif diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index dae32d807ca..b16b1579365 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2902,9 +2902,11 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb) void CMasterGraph::getStats(IStatisticGatherer &stats) { stats.addStatistic(StNumSlaves, queryClusterWidth()); + cost_type costDiskAccess = getDiskAccessCost(); + if (costDiskAccess) + stats.addStatistic(StCostFileAccess, costDiskAccess); // graph specific stats - graphStats.getStats(stats); Owned iter; diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 5de19dced0b..92f861d23f8 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + GlobalStatisticCollection globalStatsCollection; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -91,16 +92,13 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer if (costLimit || finished) { const cost_type sgCost = money2cost_type(calcCost(thorManagerRate, duration) + calcCost(thorWorkerRate, duration) * numberOfMachines); - cost_type costDiskAccess = graph.getDiskAccessCost(); if (finished) { if (sgCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } - const cost_type totalCost = workunitCost + sgCost + costDiskAccess; + const cost_type totalCost = workunitCost + sgCost + graph.getDiskAccessCost(); if (costLimit>0 && totalCost > costLimit) { LOG(MCwarning, thorJob, "ABORT job cost exceeds limit"); @@ -145,7 +143,8 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -155,6 +154,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + updateAggregates(wu); queryServerStatus().commitProperties(); } catch (IException *E) @@ -173,10 +173,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), graph); } - Owned wu = ¤tWU.lock(); if (startTimeStamp) { @@ -186,7 +186,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -290,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { unsigned startTime = graphStarts.item(g); reportGraph(graph, true, success, startTime, 0); + // Prune subgraph descendant stats as they have been serialized and no longer needed. + const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName(); + unsigned wfid = graph->queryJob().getWfid(); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); activeGraphs.remove(g); graphStarts.remove(g); } @@ -298,8 +301,27 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { synchronized block(mutex); reportActiveGraphs(true, false); + if (activeGraphs.ordinality()) + { + CJobBase & activeJob = activeGraphs.item(0).queryJob(); + const char *graphName = ((CJobMaster &)activeJob).queryGraphName(); + unsigned wfid = activeJob.getWfid(); + ForEachItemIn (g, activeGraphs) + { + CGraphBase &graph = activeGraphs.item(g); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); + } + } activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + globalStatsCollection.updateAggregates(lockedWu); + } + virtual void loadExistingAggregates(IConstWorkUnit &workunit) override + { + globalStatsCollection.loadExistingAggregates(workunit); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..3931b540ded 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -24,12 +24,15 @@ interface IWUGraphProgress; class CGraphBase; +interface IConstWorkUnit; interface IDeMonServer : extends IInterface { virtual void takeHeartBeat(MemoryBuffer &progressMbb) = 0; virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadExistingAggregates(IConstWorkUnit &workunit) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 65d506fbd08..c1c4d6ac3a9 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1117,6 +1117,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, if (isContainerized() && podInfo.hasStdDev()) podInfo.report(wu); } + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadExistingAggregates(workunit); setWuid(workunit.queryWuid(), workunit.queryClusterName()); @@ -1134,7 +1136,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, graphScope, SSTgraph); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); + removeJob(*job); } catch (IException *e)