diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 25f2eeadd44..51339adf1c6 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2659,7 +2659,7 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } } -void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit) +void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit) { StatsScopeId globalScopeId(SSTglobal, (unsigned)0); statsCollection.setown(createStatisticCollection(globalScopeId)); @@ -2682,9 +2682,9 @@ void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit) WuScopeFilter filter; filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); - const unsigned numStats = aggregateKindsMapping.numStatistics(); + const unsigned numStats = mapping.numStatistics(); for (unsigned i=0; irecordStats(aggregateKindsMapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId}); + statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId}); } // Recalculate aggregates and then write the aggregates to global stats (dali) @@ -2712,12 +2712,12 @@ void StatisticsAggregator::updateAggregates(IWorkUnit *wu) if (!statsCollection) return; - std::function f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) + AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) { wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); }; - statsCollection->refreshAggregates(aggregateKindsMapping, f); + statsCollection->refreshAggregates(mapping, f); } //--------------------------------------------------------------------------------------------------------------------- diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 38e4a31457a..a6579c85d3b 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1786,13 +1786,13 @@ extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, u class WORKUNIT_API StatisticsAggregator : public CInterface { public: - StatisticsAggregator() : aggregateKindsMapping(aggregateKindStatistics) {} - void loadExistingAggregates(IConstWorkUnit &workunit); + StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {} + void loadExistingAggregates(const IConstWorkUnit &workunit); void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId); void updateAggregates(IWorkUnit *wu); private: Owned statsCollection; - const StatisticsMapping & aggregateKindsMapping; + const StatisticsMapping & mapping; }; #endif diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index d919c6c087c..73444006563 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -125,7 +125,7 @@ struct IAgentContext : extends IGlobalCodeContext virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; virtual void updateAggregates(IWorkUnit* lockedwu) = 0; - virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0; + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 99a6eb4edd0..a9de717e852 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -509,7 +509,7 @@ class EclAgentPluginCtx : public SimplePluginCtx //======================================================================================= EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler) - : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler) + : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), statsAggregator(stdAggregateKindStatistics) { isAborting = false; isStandAloneExe = false; diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index c8c35aaa2bc..3aafca17a3e 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -254,7 +254,7 @@ public: { ctx->updateAggregates(lockedwu); } - virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) override + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override { ctx->mergeAggregatorStats(stats, wfid, graphname, sgId); } @@ -718,10 +718,9 @@ public: { statsAggregator.updateAggregates(lockedwu); } - virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) override + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override { - Linked statsCollection = stats.getResult(); - statsAggregator.recordStats(statsCollection, wfid, graphname, sgId); + statsAggregator.recordStats(&stats, wfid, graphname, sgId); } }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index 2430d1cce19..e1de571cecd 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,7 +879,8 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - agent->mergeAggregatorStats(stats, parent.queryWfid(), parent.queryGraphName(), id); + Owned statsCollection = stats.getResult(); + agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id); if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); @@ -1351,7 +1352,8 @@ void EclGraph::updateLibraryProgress() Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); IStatisticGatherer & stats = progress->queryStatsBuilder(); cur.updateProgress(stats); - agent->mergeAggregatorStats(stats, wfid, queryGraphName(), cur.id); + Owned statsCollection = stats.getResult(); + agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id); } } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 13e567a7606..6b694eb46c5 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1331,7 +1331,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); -const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); +const StatisticsMapping stdAggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1799,7 +1799,7 @@ class CStatisticCollection : public CInterfaceOf isDirty=true; if (parent) parent->markDirty(); } - void refreshAggregates(CRuntimeStatisticCollection & parentTotals, std::function & fWhenAggregateUpdated) + void refreshAggregates(CRuntimeStatisticCollection & parentTotals, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) { const StatisticsMapping & mapping = parentTotals.queryMapping(); // if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend) @@ -1811,8 +1811,8 @@ class CStatisticCollection : public CInterfaceOf { Statistic & stat = stats.element(i); StatisticKind kind = stat.queryKind(); - if (kind != (StatisticKind)(kind & StKindMask)) - continue; // ignore variants + if (queryStatsVariant(kind) != 0) + continue; // ignore variants (shouldn't happen as the mapping ensure only the aggregator kinds are present) if (mapping.hasKind(kind)) { // Totals required from this level by parent, even if they are not dirty @@ -1941,20 +1941,6 @@ class CStatisticCollection : public CInterfaceOf kind = cur.kind; value = cur.value; } - virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const override - { - bool found = false; - ForEachItemIn(i, stats) - { - const Statistic & cur = stats.item(i); - if (cur.kind == kind) - { - value += cur.value; - found = true; - } - } - return found; - } virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) override { assertex(!filter); @@ -2092,7 +2078,7 @@ class CStatisticCollection : public CInterfaceOf for (auto const & cur : children) cur.visit(visitor); } - virtual void refreshAggregates(const StatisticsMapping & mapping, std::function & fWhenAggregateUpdated) override + virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) override { if (isDirty) { @@ -2102,8 +2088,8 @@ class CStatisticCollection : public CInterfaceOf } virtual stat_type aggregateStatistic(StatisticKind kind) const override { - stat_type sum; - if (!getStatisticSum(kind, sum)) // get sum of statistics at this level + stat_type sum = 0; + if (!getStatistic(kind, sum)) // get sum of statistics at this level { // if no stats at this level, then get sum of stats from children for (auto & child : children) @@ -2115,7 +2101,7 @@ class CStatisticCollection : public CInterfaceOf { CStatisticCollection * curSrcCollection = static_cast(sourceStatsCollection); const StatsScopeId * scopeItem = path.begin(); - + // n.b. sourceStatsCollection has workflow as root but this collection has global as root // Locate the collection with the stats and make curSrcCollection point to that if (!curSrcCollection || curSrcCollection->queryScopeId().compare(*scopeItem)!=0) return; // Required path doesn't exist in source collection so nothing more to do here @@ -2134,7 +2120,7 @@ class CStatisticCollection : public CInterfaceOf ForEachItemIn(i, curSrcCollection->stats) { Statistic & cur = curSrcCollection->stats.element(i); - if (cur.kind != (StatisticKind)(cur.kind & StKindMask)) + if (queryStatsVariant(cur.kind) != 0) continue; // ignore variants if (mapping.hasKind(cur.kind)) { diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index d9603aef04b..cacdb23684e 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -105,6 +105,8 @@ interface IStatisticGatherer; interface IStatisticVisitor; class jlib_decl StatisticsMapping; +typedef std::function AggregateUpdatedCallBackFunc; + interface IStatisticCollection : public IInterface { public: @@ -115,7 +117,6 @@ interface IStatisticCollection : public IInterface virtual unsigned getNumStatistics() const = 0; virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0; virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0; - virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const = 0; virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; @@ -126,7 +127,7 @@ interface IStatisticCollection : public IInterface virtual StringBuffer &toXML(StringBuffer &out) const = 0; virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; - virtual void refreshAggregates(const StatisticsMapping & mapping, std::function & fWhenAggregateUpdated) = 0; + virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) = 0; virtual stat_type aggregateStatistic(StatisticKind kind) const = 0; virtual void recordStats(const StatisticsMapping & mapping, IStatisticCollection * statsCollection, std::initializer_list path) = 0; }; @@ -503,7 +504,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics; extern const jlib_decl StatisticsMapping diskReadRemoteStatistics; extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics; extern const jlib_decl StatisticsMapping jhtreeCacheStatistics; -extern const jlib_decl StatisticsMapping aggregateKindStatistics; +extern const jlib_decl StatisticsMapping stdAggregateKindStatistics; //--------------------------------------------------------------------------------------------------------------------- diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 1b0065508bc..5b4094c3315 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -133,9 +133,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); IStatisticGatherer & statsBuilder = stats->queryStatsBuilder(); - reportGraph(statsBuilder, & graph); + reportGraph(statsBuilder, &graph); // Merge only the stats at the specified scope level - Linked statsCollection = statsBuilder.getResult(); + Owned statsCollection = statsBuilder.getResult(); statsAggregator.recordStats(statsCollection, wfid, graphName, graph.queryGraphId()); } void reportActiveGraphs(bool finished, bool success=true) @@ -201,7 +201,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - DeMonServer() + DeMonServer() : statsAggregator(stdAggregateKindStatistics) { lastReport = msTick(); reportRate = globals->getPropInt("@watchdogProgressInterval", 30);