From 06217149d917bc9c74e7fc5d410de6f374d5eef2 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 15 Sep 2023 12:02:50 +0100 Subject: [PATCH] HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 252 +++++++++++++++++------- common/workunit/workunit.hpp | 24 ++- common/workunit/workunit.ipp | 4 +- ecl/eclagent/agentctx.hpp | 2 + ecl/eclagent/eclagent.cpp | 8 - ecl/eclagent/eclagent.ipp | 20 +- ecl/eclagent/eclgraph.cpp | 26 +-- plugins/cassandra/cassandrawu.cpp | 8 +- system/jlib/jstats.cpp | 306 ++++++++++++++++++++++++------ system/jlib/jstats.h | 49 +++-- thorlcr/graph/thgraphmaster.cpp | 4 +- thorlcr/master/thdemonserver.cpp | 38 +++- thorlcr/master/thdemonserver.hpp | 3 + thorlcr/master/thgraphmanager.cpp | 6 +- 14 files changed, 573 insertions(+), 177 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 64f569e5284..d2cc8ca9867 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,14 +183,17 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * statsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + if (statsCollection) + collector.setown(createStatisticsGatherer(statsCollection)); + else + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } @@ -2657,85 +2660,190 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } return totalCost; } +}; + +GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(aggregateKindStatistics) +{ + // Construct statsCollection here as GlobalStatisticCollection::load() is optional + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); } -//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) -cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) +void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - filter.addScope(""); // Needed to match scope - // when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost - // (Costs from child graphs and activities should have been summed up to graph/subgraph level already) - // when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost - // (Costs from all levels below graph should be summed upto at least graph level already) - // i.e. need 2 levels of nesting - filter.setIncludeNesting(2); - // includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", - // causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" - filter.addSource("global"); - filter.addOutputStatistic(StCostFileAccess); - filter.addRequiredStat(StCostFileAccess); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - cost_type totalCost = 0; - for (it->first(); it->isValid(); ) + const char * _wuid = workunit.queryWuid(); + if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit { - cost_type value = 0; - if (it->getStat(StCostFileAccess, value)) - { - totalCost += value; - it->nextSibling(); - } - else + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); + wuid.set(_wuid); + } + + loadGlobalAggregates(workunit); + if (isEmptyString(graphName)) + { + Owned root = getWUGraphProgress(wuid, true); + if (root) { - it->next(); + Owned graphPT = root->getPropTree(graphName); + if (!graphPT) + return; + + StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); + StatsScopeId graphScopeId(SSTgraph, graphName); + + Owned iter = graphPT->getElements("*"); + ForEach(*iter) + { + StatsScopeId sgScopeId; + IPropertyTree * sgPT = & iter->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + verifyex(sgScopeId.setScopeText(sgName)); + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + break; + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId childId; + childId.deserialize(serialized, version); + int statsMinDepth = 0, statsMaxDepth = INT_MAX; + if (aggregatesOnly) + { + // Only store stats for subgraph level + statsMinDepth = 3; // this is subgraph level + statsMaxDepth = 3; + } + statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); + } } } - return totalCost; } -void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) +void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - { - filter.addScope(""); - filter.addSource("global"); - } - filter.setIncludeNesting(1); - filter.addOutputStatistic(StSizeGraphSpill); - filter.addRequiredStat(StSizeGraphSpill); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - peakSizeSpill = 0; - for (it->first(); it->isValid(); ) + class StatsCollectionAggregatesLoader : public IWuScopeVisitor { - stat_type value = 0; - if (it->getStat(StSizeGraphSpill, value)) + public: + StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} + + virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override { - if (value>peakSizeSpill) - peakSizeSpill = value; - it->nextSibling(); + statsCollection->setStatistic(extra.queryScope(), kind, value); } - else + virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } + virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } + virtual void noteException(IConstWUException & exception) override { throwUnexpected(); } + private: + Linked statsCollection; + }; + + WuScopeFilter filter; + filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); + const unsigned numStats = aggregateKindsMapping.numStatistics(); + for (unsigned i=0; i iter = &workunit.getScopeIterator(filter); + ForEach(*iter) + iter->playProperties(aggregatesLoader); +} + +// getCollectionForUpdate() returns IStatisticCollection for the given subgraph +// if clearStats==true then the existing stats are cleared for the given scope and descendants +IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats) +{ + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + + bool wasCreated; + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated); + if (clearStats && !wasCreated) + sgScopeCollection->clearStats(); + sgScopeCollection->markDirty(); + return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); +} + +// Recalculate aggregates from subgraph scope +bool GlobalStatisticCollection::refreshAggregates() +{ + return statsCollection->refreshAggregates(aggregateKindsMapping); +} + +// Load aggregates in collection from global stats +void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) +{ + class StatisticsAggregatesWriter : implements IStatisticVisitor + { + const StatisticsMapping & aggregateKindsMapping; + const unsigned numStats; + Linked wu; + public: + StatisticsAggregatesWriter(IWorkUnit * _wu, const StatisticsMapping & _aggregateKindsMapping): wu(_wu), aggregateKindsMapping(_aggregateKindsMapping), numStats(aggregateKindsMapping.numStatistics()) {} + + virtual bool visitScope(const IStatisticCollection & cur) { - it->next(); + switch (cur.queryScopeType()) + { + case SSTglobal: + case SSTworkflow: + case SSTgraph: + for (unsigned i=0; isetStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); + } + } + } + if (cur.queryScopeType()==SSTgraph) + return false; + else + return true; + default: + return false; + } } + }; + if (refreshAggregates()) // Only serialize if the aggregates have changed + { + StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping); + statsCollection->visit(statsAggregatorWriter); } } -void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) +// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) +void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId) { - stat_type peakSizeSpill = 0; - gatherSpillSize(wu, scope, peakSizeSpill); - if (peakSizeSpill) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + IStatisticCollection * sgScopeCollection = statsCollection->querySubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (sgScopeCollection) + sgScopeCollection->pruneChildStats(); } + //--------------------------------------------------------------------------------------------------------------------- @@ -3805,8 +3913,8 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } protected: @@ -3820,8 +3928,8 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } protected: @@ -4125,9 +4233,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { - return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); + return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) { @@ -4437,8 +4545,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override - { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override + { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } virtual IStringVal & getAbortBy(IStringVal & str) const @@ -10268,9 +10376,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const { - return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); + return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } void CLocalWUGraph::setName(const char *str) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 60c06b5a56d..544478d1b72 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator }; //--------------------------------------------------------------------------------------------------------------------- - //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1302,7 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1725,8 +1724,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); -extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 @@ -1785,4 +1782,23 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName); + +class WORKUNIT_API GlobalStatisticCollection : public CInterface +{ +public: + GlobalStatisticCollection(); + + void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly); + void loadGlobalAggregates(IConstWorkUnit &workunit); + IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats); + bool refreshAggregates(); + IStatisticCollection * queryCollection() { return statsCollection; } + void updateAggregates(IWorkUnit *wu); + void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId); +private: + Owned statsCollection; + const StatisticsMapping & aggregateKindsMapping; + StringBuffer wuid; +}; + #endif diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 4f166ea67fb..46ea428c84a 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..9d142312269 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; + virtual void updateAggregates(IWorkUnit* lockedwu) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index dc22e1f69dd..b2b1b356a95 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,10 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); - if (diskAccessCost) - w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(w, nullptr, SSTglobal); addTimings(w); switch (w->getState()) @@ -2513,10 +2509,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); - if (diskAccessCost) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, scope, SSTworkflow); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..401732d36e0 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -250,6 +250,14 @@ public: { return ctx->queryAgentMachineCost(); }; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph); + }; + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + ctx->updateAggregates(lockedwu); + } protected: IAgentContext * ctx; @@ -392,6 +400,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + GlobalStatisticCollection globalStats; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +714,15 @@ public: { return agentMachineCost; } + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats + return wuRead->updateStats(graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection); + } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + globalStats.updateAggregates(lockedwu); + } }; //--------------------------------------------------------------------------- @@ -1055,7 +1073,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index d4ffdd99c42..fc153cb5011 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + parent.updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -897,10 +897,6 @@ void EclSubGraph::updateProgress() if (cost) lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); } - Owned statsCollection = stats.getResult(); - const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ; - if (costDiskAccess) - lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } } } @@ -927,6 +923,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) } ForEachItemIn(i2, subgraphs) subgraphs.item(i2).updateProgress(progress); + + Owned statsCollection = progress.getResult(); + const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection); + if (costDiskAccess) + progress.addStatistic(StCostFileAccess, costDiskAccess); } bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies) @@ -1277,10 +1278,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } if (agent->queryRemoteWorkunit()) @@ -1349,7 +1346,8 @@ void EclGraph::updateLibraryProgress() { EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); + + Owned progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1492,7 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph); +} + +void EclGraph::updateAggregates(IWorkUnit* lockedwu) +{ + agent->updateAggregates(lockedwu); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); + globalStats.load(*wu, nullptr, true); return eclGraph.getClear(); } diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..94f8439e7ad 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { } @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { - return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge); + return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 8ebcdcadef5..73a571e492f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1330,6 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); +const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1429,6 +1430,8 @@ StringBuffer & StatsScopeId::getScopeText(StringBuffer & out) const return out.append(ChannelScopePrefix).append(id); case SSTunknown: return out.append(name); + case SSTglobal: + return out; default: #ifdef _DEBUG throwUnexpected(); @@ -1747,11 +1750,11 @@ enum }; class CStatisticCollection; -static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version); +static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version); //MORE: Create an implementation with no children typedef StructArrayOf StatsArray; -class CollectionHashTable : public SuperHashTableOf +class CollectionHashTable : public SuperHashTableOf { public: ~CollectionHashTable() { _releaseAll(); } @@ -1782,33 +1785,35 @@ class SortedCollectionIterator : public ArrayIIteratorOf { friend class CollectionHashTable; -public: - CStatisticCollection(CStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) - { - } - CStatisticCollection(CStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) + // deserialize without storing the stats + void deserializeNoStats(MemoryBuffer & in, unsigned version) { - id.deserialize(in, version); - unsigned numStats; in.read(numStats); - stats.ensureCapacity(numStats); while (numStats-- > 0) - { Statistic next (in, version); - stats.append(next); - } - unsigned numChildren; in.read(numChildren); - children.ensure(numChildren); while (numChildren-- > 0) { - CStatisticCollection * next = deserializeCollection(this, in, version); - children.add(*next); + byte kind; + in.read(kind); + StatsScopeId childId; + childId.deserialize(in, version); + deserializeNoStats(in, version); } } +public: + CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) + { + } + + CStatisticCollection(IStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) + { + id.deserialize(in, version); + deserialize(in, version, 0, INT_MAX); + } virtual byte getCollectionType() const { return SCintermediate; } @@ -1831,10 +1836,11 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent) + if (parent && queryScopeType()!=SSTglobal) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1862,6 +1868,24 @@ class CStatisticCollection : public CInterfaceOf } return false; } + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) override + { + if (*scope=='\0') + { + return updateStatistic(kind, value, StatsMergeReplace); + } + else + { + StatsScopeId childScopeId; + if (!childScopeId.setScopeText(scope, &scope) || (*scope!=':' && *scope!='\0')) + throw makeStringExceptionV(JLIBERR_UnexpectedValue, "'%s' does not appear to be a valid scope id", scope); + IStatisticCollection * child = ensureSubScope(childScopeId, true); + + if (*scope==':') + scope++; + return child->setStatistic(scope, kind, value); + } + } virtual unsigned getNumStatistics() const override { return stats.ordinality(); @@ -1916,13 +1940,13 @@ class CStatisticCollection : public CInterfaceOf } //other public interface functions - void addStatistic(StatisticKind kind, unsigned __int64 value) + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { Statistic s(kind, value); stats.append(s); } - void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1931,28 +1955,71 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } - - CStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override + { + bool wasCreated; + return ensureSubScope(search, hasChildren, wasCreated); + } + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) override { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. - CStatisticCollection * match = children.find(&search); + wasCreated = false; + IStatisticCollection * match = children.find(&search); if (match) return match; - CStatisticCollection * ret = new CStatisticCollection(this, search); + IStatisticCollection * ret = new CStatisticCollection(this, search); children.add(*ret); + wasCreated = true; return ret; } - virtual void serialize(MemoryBuffer & out) const + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override + { + return children.find(&search); + } + + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) override + { + IStatisticCollection * curScope = this; + for (const auto & scopeItem: path) + curScope = curScope->ensureSubScope(scopeItem, true, wasCreated); // n.b. this will always return a valid pointer + return curScope; + } + + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (const auto & scopeItem: path) + { + curScope = curScope->querySubScope(scopeItem); + if (!curScope) + return nullptr; + } + return curScope; + } + + // Note, once this is called child scope pointers will be invalid + virtual void pruneChildStats() override + { + children.kill(); + } + + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1967,6 +2034,43 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } + virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + { + unsigned numStats; + in.read(numStats); + stats.ensureCapacity(numStats); + while (numStats-- > 0) + { + Statistic next(in, version); + if (minDepth <= 0) + stats.append(next); + } + unsigned numChildren; + in.read(numChildren); + children.ensure(numChildren); + while (numChildren-- > 0) + { + byte kind; + in.read(kind); + StatsScopeId childId; + childId.deserialize(in, version); + deserializeChild(childId, in, version, minDepth, maxDepth); + } + } + + virtual void deserializeChild(const StatsScopeId & childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + { + if (maxDepth > 0) + { + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version, (minDepth-1), (maxDepth-1)); + } + else + { + deserializeNoStats(in, version); + } + } + inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -1994,12 +2098,90 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } + virtual bool refreshAggregates(const StatisticsMapping & mapping) override + { + if (isDirty==false) + return false; + CRuntimeStatisticCollection totals(mapping); + Owned totalUpdated = createBitSet(mapping.numStatistics()); + return refreshAggregates(totals, *totalUpdated); + } + + virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) override + { + const StatisticsMapping & mapping = totals.queryMapping(); + bool updated = false; + if (isDirty==false) + { + ForEachItemIn(i, stats) + { + Statistic & stat = stats.element(i); + StatisticKind kind = stat.queryKind(); + if (kind != (StatisticKind)(kind & StKindMask)) + continue; // ignore variants->not supported by CRuntimeStatisticCollection + unsigned index = mapping.getIndex(kind); + if (index!=mapping.numStatistics()) + { + totals.mergeStatistic(kind, stat.queryValue()); + isTotalUpdated.set(index); + updated = true; + } + } + } + else + { + CRuntimeStatisticCollection childTotals(mapping); + Owned childTotalUpdated = createBitSet(mapping.numStatistics()); + for (auto & child : children) + { + if (child.refreshAggregates(childTotals, *childTotalUpdated)) + updated = true; + } + if (updated) + { + unsigned i = childTotalUpdated->scan(0, true);; + while (NotFound != i) + { + StatisticKind kind = childTotals.getKind(i); + unsigned __int64 value = childTotals.queryStatisticByIndex(i).get(); + updateStatistic(kind, value, StatsMergeReplace); + totals.mergeStatistic(kind, value); + isTotalUpdated.set(i); + i = childTotalUpdated->scan(i+1, true); + } + } + isDirty=false; + } + return updated; + } + + virtual void clearStats() override + { + stats.clear(true); + // Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid + SuperHashIteratorOf iter(children, false); + for (iter.first(); iter.isValid(); iter.next()) + iter.query().clearStats(); + } + + virtual void addChild(IStatisticCollection *stats) override + { + children.add(*LINK(stats)); + } + + virtual void markDirty() override + { + isDirty=true; + if (parent) parent->markDirty(); + } + private: StatsScopeId id; - CStatisticCollection * parent; + IStatisticCollection * parent; protected: CollectionHashTable children; StatsArray stats; + bool isDirty = false; // used to track which scope has changed (used to workout what aggregates to recalculate) }; StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const @@ -2017,7 +2199,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const SuperHashIteratorOf iter(children, false); for (iter.first(); iter.isValid(); iter.next()) iter.query().toXML(out); - out.append("\n"); + out.append(" // Scope id=\""); + id.getScopeText(out).append("\"\n"); return out; } @@ -2071,12 +2254,18 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c class CRootStatisticCollection : public CStatisticCollection { public: - CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(NULL, _id), creatorType(_creatorType), creator(_creator) + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) + : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) + { + whenCreated = getTimeStampNowValue(); + IStatisticCollection * child = ensureSubScope(graphScopeId, true); + child->addChild(sgCollection); + } + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } - CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(NULL, in, version) + CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(nullptr, in, version) { byte creatorTypeByte; in.read(creatorTypeByte); @@ -2085,13 +2274,13 @@ class CRootStatisticCollection : public CStatisticCollection in.read(whenCreated); } - virtual byte getCollectionType() const { return SCroot; } + virtual byte getCollectionType() const override { return SCroot; } - virtual unsigned __int64 queryWhenCreated() const + virtual unsigned __int64 queryWhenCreated() const override { return whenCreated; } - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { CStatisticCollection::serialize(out); out.append((byte)creatorType); @@ -2113,7 +2302,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - class StatAggregator : implements IStatisticVisitor { public: @@ -2167,14 +2355,13 @@ void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * col collection->serialize(out); } -static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version) +static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version) { byte kind; in.read(kind); switch (kind) { case SCroot: - assertex(!parent); return new CRootStatisticCollection(in, version); case SCintermediate: return new CStatisticCollection(parent, in, version); @@ -2190,49 +2377,59 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId) +{ + return new CStatisticCollection(parent, scopeId); +} + +IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) +{ + //creator unused at the moment. + return new CRootStatisticCollection(creatorType, creator, rootScope, graphScopeId, sgCollection); +} //-------------------------------------------------------------------------------------------------------------------- class StatisticGatherer : implements CInterfaceOf { public: - StatisticGatherer(CStatisticCollection * scope) : rootScope(scope) + StatisticGatherer(IStatisticCollection * scope) : rootScope(scope) { scopes.append(*scope); } virtual void beginScope(const StatsScopeId & id) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(id, true)); } virtual void beginActivityScope(unsigned id) override { StatsScopeId scopeId(SSTactivity, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginSubGraphScope(unsigned id) override { StatsScopeId scopeId(SSTsubgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginEdgeScope(unsigned id, unsigned oid) override { StatsScopeId scopeId(SSTedge, id, oid); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginChildGraphScope(unsigned id) override { StatsScopeId scopeId(SSTchildgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginChannelScope(unsigned id) override { StatsScopeId scopeId(SSTchannel, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void endScope() override @@ -2241,12 +2438,12 @@ class StatisticGatherer : implements CInterfaceOf } virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.addStatistic(kind, value); } virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.updateStatistic(kind, value, mergeAction); } virtual IStatisticCollection * getResult() override @@ -2255,15 +2452,18 @@ class StatisticGatherer : implements CInterfaceOf } protected: - ICopyArrayOf scopes; - Linked rootScope; + ICopyArrayOf scopes; + Linked rootScope; }; extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) { - //creator unused at the moment. - Owned rootCollection = new CRootStatisticCollection(creatorType, creator, rootScope); - return new StatisticGatherer(rootCollection); + return new StatisticGatherer(new CRootStatisticCollection(creatorType, creator, rootScope)); +} + +extern IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats) +{ + return new StatisticGatherer(stats); } //-------------------------------------------------------------------------------------------------------------------- @@ -2609,7 +2809,7 @@ StringBuffer & CRuntimeStatisticCollection::toStr(StringBuffer &str) const unsigned __int64 rawValue = getStatisticValue(rawKind); if (rawValue) value += convertMeasure(rawKind, kind, rawValue); - } + } if (value) { const char * name = queryStatisticName(serialKind); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index c86d18c1937..95b78f7d0a0 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -25,6 +25,7 @@ #include #include "jstatcodes.h" +#include "jset.hpp" typedef unsigned __int64 stat_type; typedef unsigned __int64 cost_type; // Decimal currency amount multiplied by 10^6 @@ -104,6 +105,20 @@ interface IStatisticCollectionIterator; interface IStatisticGatherer; interface IStatisticVisitor; +enum StatsMergeAction +{ + StatsMergeKeepNonZero, + StatsMergeReplace, + StatsMergeSum, + StatsMergeMin, + StatsMergeMax, + StatsMergeAppend, + StatsMergeFirst, + StatsMergeLast, +}; + +class StatisticsMapping; +class CRuntimeStatisticCollection; interface IStatisticCollection : public IInterface { public: @@ -114,6 +129,7 @@ interface IStatisticCollection : public IInterface virtual unsigned getNumStatistics() const = 0; virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0; virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0; + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) = 0; virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; @@ -123,6 +139,21 @@ interface IStatisticCollection : public IInterface virtual StringBuffer &toXML(StringBuffer &out) const = 0; virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; + virtual void pruneChildStats() = 0; + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool refreshAggregates(const StatisticsMapping & mapping) = 0; + virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void deserializeChild(const StatsScopeId & scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void clearStats() = 0; + virtual void addChild(IStatisticCollection *stats) = 0; + virtual void markDirty() = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -134,17 +165,6 @@ interface IStatisticVisitor virtual bool visitScope(const IStatisticCollection & cur) = 0; // return true to iterate through children }; -enum StatsMergeAction -{ - StatsMergeKeepNonZero, - StatsMergeReplace, - StatsMergeSum, - StatsMergeMin, - StatsMergeMax, - StatsMergeAppend, - StatsMergeFirst, - StatsMergeLast, -}; interface IStatisticGatherer : public IInterface { @@ -493,6 +513,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics; extern const jlib_decl StatisticsMapping diskReadRemoteStatistics; extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics; extern const jlib_decl StatisticsMapping jhtreeCacheStatistics; +extern const jlib_decl StatisticsMapping aggregateKindStatistics; //--------------------------------------------------------------------------------------------------------------------- @@ -867,6 +888,7 @@ extern jlib_decl stat_type readStatisticValue(const char * cur, const char * * e extern jlib_decl unsigned __int64 mergeStatisticValue(unsigned __int64 prevValue, unsigned __int64 newValue, StatsMergeAction mergeAction); +extern jlib_decl bool includeStatisticIfZero(StatisticKind kind); extern jlib_decl StatisticMeasure queryMeasure(StatisticKind kind); extern jlib_decl const char * queryStatisticName(StatisticKind kind); extern jlib_decl void queryLongStatisticName(StringBuffer & out, StatisticKind kind); @@ -883,8 +905,11 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); +extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); +extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * sgCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } @@ -943,5 +968,5 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value); extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); - +extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root); #endif diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index dae32d807ca..b16b1579365 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2902,9 +2902,11 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb) void CMasterGraph::getStats(IStatisticGatherer &stats) { stats.addStatistic(StNumSlaves, queryClusterWidth()); + cost_type costDiskAccess = getDiskAccessCost(); + if (costDiskAccess) + stats.addStatistic(StCostFileAccess, costDiskAccess); // graph specific stats - graphStats.getStats(stats); Owned iter; diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 5de19dced0b..dc4fc3f4068 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + GlobalStatisticCollection globalStatsCollection; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -91,16 +92,13 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer if (costLimit || finished) { const cost_type sgCost = money2cost_type(calcCost(thorManagerRate, duration) + calcCost(thorWorkerRate, duration) * numberOfMachines); - cost_type costDiskAccess = graph.getDiskAccessCost(); if (finished) { if (sgCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } - const cost_type totalCost = workunitCost + sgCost + costDiskAccess; + const cost_type totalCost = workunitCost + sgCost + graph.getDiskAccessCost(); if (costLimit>0 && totalCost > costLimit) { LOG(MCwarning, thorJob, "ABORT job cost exceeds limit"); @@ -145,7 +143,8 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -155,6 +154,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + updateAggregates(wu); queryServerStatus().commitProperties(); } catch (IException *E) @@ -173,10 +173,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), graph); } - Owned wu = ¤tWU.lock(); if (startTimeStamp) { @@ -186,7 +186,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -290,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { unsigned startTime = graphStarts.item(g); reportGraph(graph, true, success, startTime, 0); + // Prune subgraph descendant stats as they have been serialized and no longer needed. + const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName(); + unsigned wfid = graph->queryJob().getWfid(); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); activeGraphs.remove(g); graphStarts.remove(g); } @@ -298,8 +301,27 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { synchronized block(mutex); reportActiveGraphs(true, false); + if (activeGraphs.ordinality()) + { + CJobBase & activeJob = activeGraphs.item(0).queryJob(); + const char *graphName = ((CJobMaster &)activeJob).queryGraphName(); + unsigned wfid = activeJob.getWfid(); + ForEachItemIn (g, activeGraphs) + { + CGraphBase &graph = activeGraphs.item(g); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); + } + } activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + globalStatsCollection.updateAggregates(lockedWu); + } + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) override + { + globalStatsCollection.load(workunit, graphName, aggregatesOnly); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..70cfa5a97b7 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -24,12 +24,15 @@ interface IWUGraphProgress; class CGraphBase; +interface IConstWorkUnit; interface IDeMonServer : extends IInterface { virtual void takeHeartBeat(MemoryBuffer &progressMbb) = 0; virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 65d506fbd08..07456b72f39 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1117,6 +1117,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, if (isContainerized() && podInfo.hasStdDev()) podInfo.report(wu); } + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadStats(workunit, nullptr, true); //graphName==nullptr so that sg stats are not loaded(at present, partial graph resumption not possible) setWuid(workunit.queryWuid(), workunit.queryClusterName()); @@ -1134,7 +1136,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, graphScope, SSTgraph); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); + removeJob(*job); } catch (IException *e)