diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 5493bd0888f..8aafdda2765 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,20 +183,27 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + if (globalStatsCollection) + { + StatsScopeId sgScopeId(SSTsubgraph, id); + collector.setown(createStatisticsGatherer(globalStatsCollection->getCollection(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true))); + } + else + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } void CWuGraphStats::beforeDispose() { collector->endScope(); + StringBuffer tag; tag.append("sg").append(id); @@ -2659,6 +2666,87 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } } +GlobalStatisticCollection::GlobalStatisticCollection() +{ + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); +} + +void GlobalStatisticCollection::load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly) +{ + if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit + { + if (!wuid.isEmpty()) + { + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); + } + wuid.set(_wuid); + } + Owned root = getWUGraphProgress(wuid, true); + if (!root) + return; + Owned iter = root->getElements("*"); + ForEach(*iter) + { + IPropertyTree * graphPT = &iter->query(); + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphPT->queryName())); + StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt64("@wfid",0)); + Owned iter2 = graphPT->getElements("./*"); + ForEach(*iter2) + { + StatsScopeId sgScopeId; + IPropertyTree * sgPT = & iter2->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + verifyex(sgScopeId.setScopeText(sgName)); + + if (missingScopesOnly) // Skip any scopes that are already in the stats collection + { + IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId}); + if (sgCollection) + continue; + } + + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + break; + + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId childId; + childId.deserialize(serialized, version); + statsCollection->deserializeChild(childId, serialized, version, statsMaxDepth); + } + } +} + +bool GlobalStatisticCollection::refreshAggregates(std::vector & aggregateKinds) +{ + std::vector totals(aggregateKinds.size()); + return statsCollection->refreshAggregates(aggregateKinds, totals); +} + +void GlobalStatisticCollection::visit(IStatisticVisitor & target) const +{ + statsCollection->visit(target); +} + +IStatisticCollection * GlobalStatisticCollection::getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats) +{ + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (clearStats) + sgScopeCollection->clearStats(); + return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); +} class StatisticsAggregatesWriter : implements IStatisticVisitor { @@ -2669,6 +2757,8 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor virtual bool visitScope(const IStatisticCollection & cur) { + StringBuffer s2, s3; + DBGLOG("visitScope %s -> %s", cur.getScope(s2).str(), cur.getFullScope(s3).str() ); switch (cur.queryScopeType()) { case SSTglobal: @@ -2683,51 +2773,28 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); } } - if (cur.queryScopeType()==SSTgraph) - return false; - else + //if (cur.queryScopeType()==SSTgraph) + // return false; + //else return true; default: - return false; + //return false; + return true; } } }; -void updateAggregates(IWorkUnit *wu) -{ - // updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics. - // - // The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor: - // it could do this by: - // 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer - // 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats - // 2) Replace existing code related to gathering stats from CMasterGraph and serializing with - // a new method that updates the statistics to GlobalStatisticCollection - // 3) New method for gathering and serializing graph stats: - // (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection - // - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case) - // - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children. - // This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph. - // (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly, - // (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. - // (b) this member function would be called by DeMonServer at fixed intevals - // 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates - // - this is need because multiple calls to refreshAggregates should not add to existing aggregates - // Subsequent iteration: - // 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats - Owned root = getWUGraphProgress(wu->queryWuid(), true); - if (root) - { - Owned stats = createGlobalStatisticCollection(root); - std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; - std::vector totals(aggregateKinds.size()); - - stats->refreshAggregates(aggregateKinds, totals); - - StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); - stats->visit(statsAggregatorWriter); - } +void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection) +{ + // Further improvements: + // 1) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. + // 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats + std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; + StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); + if(statsCollection.refreshAggregates(aggregateKinds)) // Only serialize if the aggregates has changed + statsCollection.visit(statsAggregatorWriter); } + //--------------------------------------------------------------------------------------------------------------------- @@ -3797,8 +3864,8 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } protected: @@ -3812,8 +3879,8 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } protected: @@ -4117,9 +4184,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); + return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) { @@ -4429,8 +4496,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override - { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override + { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } virtual IStringVal & getAbortBy(IStringVal & str) const @@ -10260,9 +10327,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const { - return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); + return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } void CLocalWUGraph::setName(const char *str) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index dd1ca7f8722..053c0359126 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1175,6 +1175,23 @@ interface IConstWUScopeIterator : extends IScmIterator //--------------------------------------------------------------------------------------------------------------------- +class WORKUNIT_API GlobalStatisticCollection : public CInterface +{ +public: + GlobalStatisticCollection(); + bool refreshAggregates(std::vector & aggregateKinds); + void visit(IStatisticVisitor & target) const; + // getCollection() returns IStatisticCollection for given rootScope + // if clearStats==true then the existing stats are cleared for the given scope + IStatisticCollection * getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats); + // statsMaxDepth = load stats up until this depth (e.g. 3 means loads stats up until sg scope ) + void load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly); + IStatisticCollection * queryCollection() { return statsCollection; } +private: + Owned statsCollection; + StringBuffer wuid; +}; + //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1302,7 +1319,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1725,7 +1742,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API void updateAggregates(IWorkUnit *wu); +extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 4f166ea67fb..85cfc396693 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 9a8afecd43f..961bd894fe8 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,7 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(w); addTimings(w); switch (w->getState()) @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..0cc843d9c18 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -1055,7 +1055,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } @@ -1083,6 +1083,7 @@ protected: IProbeManager * probeManager; unsigned wfid; bool aborted; + GlobalStatisticCollection statsCache; }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index eacfad7f992..8fc9117685f 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + parent.updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - updateAggregates(wu); } if (agent->queryRemoteWorkunit()) @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &statsCache); +} + +void EclGraph::updateAggregates(IWorkUnit* lockedwu) +{ + ::updateAggregates(lockedwu, statsCache); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..19c89b89df9 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { } @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge); + return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index ec6ae7db3cb..866e1f87799 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1813,10 +1813,11 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent && queryScopeType()!=SSTworkflow) + if (parent && queryScopeType()!=SSTglobal) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1904,7 +1905,7 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); } - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1913,13 +1914,19 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override @@ -1934,7 +1941,27 @@ class CStatisticCollection : public CInterfaceOf return ret; } - virtual void serialize(MemoryBuffer & out) const + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + curScope = curScope->ensureSubScope(*it, true); // n.b. this will always return a valid pointer + return curScope; + } + + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + { + curScope = children.find(&(*it)); + if (!curScope) + return nullptr; + } + return curScope; + } + + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1949,7 +1976,7 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } - virtual void deserialize(MemoryBuffer & in, unsigned version) override + virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMaxDepth=0) override { unsigned numStats; in.read(numStats); @@ -1957,22 +1984,28 @@ class CStatisticCollection : public CInterfaceOf while (numStats-- > 0) { Statistic next (in, version); - stats.append(next); + if (statsMaxDepth==0) stats.append(next); } - unsigned numChildren; in.read(numChildren); children.ensure(numChildren); + statsMaxDepth = (statsMaxDepth > 0) ? (statsMaxDepth - 1) : 0; while (numChildren-- > 0) { byte kind; in.read(kind); StatsScopeId childId; childId.deserialize(in, version); - IStatisticCollection * collection = ensureSubScope(childId, true); - collection->deserialize(in, version); + deserializeChild(childId, in, version, statsMaxDepth); } } + + virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, unsigned statsMaxDepth=0) override + { + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version, statsMaxDepth); + } + inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -2015,8 +2048,7 @@ class CStatisticCollection : public CInterfaceOf if (iteratorVec!=aggregateKinds.end()) { unsigned pos = iteratorVec-aggregateKinds.begin(); - StatsMergeAction mergeAction = queryMergeMode(kind); - totals[pos] = mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); + totals[pos] += stat.queryValue(); updated = true; } } @@ -2031,14 +2063,15 @@ class CStatisticCollection : public CInterfaceOf } if (updated) { + updated=false; std::vector::iterator totalIter = totals.begin(); std::vector::iterator subTotalIter = childTotals.begin(); std::vector::iterator kindIter = aggregateKinds.begin(); while (totalIter != totals.end()) { - StatsMergeAction mergeAction = queryMergeMode(*kindIter); - updateStatistic(*kindIter, *subTotalIter, mergeAction); - (*totalIter) = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); + if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace)) + updated=true; + (*totalIter) += *subTotalIter; ++totalIter; ++subTotalIter; @@ -2048,7 +2081,18 @@ class CStatisticCollection : public CInterfaceOf } return updated; } - + virtual void clearStats() + { + stats.clear(true); + // Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid + SuperHashIteratorOf iter(children, false); + for (iter.first(); iter.isValid(); iter.next()) + iter.query().clearStats(); + } + virtual void addChild(IStatisticCollection *stats) + { + children.add(*LINK(stats)); + } private: StatsScopeId id; IStatisticCollection * parent; @@ -2126,8 +2170,14 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c class CRootStatisticCollection : public CStatisticCollection { public: - CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(nullptr, _id), creatorType(_creatorType), creator(_creator) + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) + : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) + { + whenCreated = getTimeStampNowValue(); + IStatisticCollection * child = ensureSubScope(graphScopeId, true); + child->addChild(sgCollection); + } + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } @@ -2140,13 +2190,13 @@ class CRootStatisticCollection : public CStatisticCollection in.read(whenCreated); } - virtual byte getCollectionType() const { return SCroot; } + virtual byte getCollectionType() const override { return SCroot; } - virtual unsigned __int64 queryWhenCreated() const + virtual unsigned __int64 queryWhenCreated() const override { return whenCreated; } - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { CStatisticCollection::serialize(out); out.append((byte)creatorType); @@ -2168,52 +2218,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - -StatsScopeId globalScopeId(SSTglobal, (unsigned)0); -class GlobalStatisticCollection : public CStatisticCollection -{ -public: - GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId) - { - Owned iter = root->getElements("*"); - ForEach(*iter) - { - IPropertyTree * graphPT = &iter->query(); - - Owned iter2 = graphPT->getElements("./*"); - ForEach(*iter2) - { - IPropertyTree * sgPT = & iter2->query(); - const char * sgName = sgPT->queryName(); - if (strcmp(sgName, "node")==0) - continue; - assertex(strncmp(sgName, "sg", 2)==0); - MemoryBuffer compressed; - sgPT->getPropBin("Stats", compressed); - if (!compressed.length()) - return; - - MemoryBuffer serialized; - decompressToBuffer(serialized, compressed); - unsigned version; - serialized.read(version); - byte kind; - serialized.read(kind); - - StatsScopeId id; - id.deserialize(serialized, version); - IStatisticCollection * collection = ensureSubScope(id, true); - collection->deserialize(serialized, version); - } - } - } -}; - -IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root) -{ - return new GlobalStatisticCollection(root); -} - class StatAggregator : implements IStatisticVisitor { public: @@ -2289,6 +2293,16 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId) +{ + return new CStatisticCollection(parent, scopeId); +} + +IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection) +{ + //creator unused at the moment. + return new CRootStatisticCollection(creatorType, creator, rootScope, graphScopeId, childCollection); +} //-------------------------------------------------------------------------------------------------------------------- @@ -2360,9 +2374,12 @@ class StatisticGatherer : implements CInterfaceOf extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) { - //creator unused at the moment. - Owned rootCollection = new CRootStatisticCollection(creatorType, creator, rootScope); - return new StatisticGatherer(rootCollection); + return new StatisticGatherer(new CRootStatisticCollection(creatorType, creator, rootScope)); +} + +extern IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats) +{ + return new StatisticGatherer(stats); } //-------------------------------------------------------------------------------------------------------------------- @@ -3926,9 +3943,9 @@ void StatisticsFilter::setScopeDepth(unsigned _scopeDepth) scopeFilter.setDepth(_scopeDepth); } -void StatisticsFilter::setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth) +void StatisticsFilter::setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth) { - scopeFilter.setDepth(_minScopeDepth, _maxScopeDepth); + scopeFilter.setDepth(_maxDepthDepth, _maxScopeDepth); } void StatisticsFilter::setScope(const char * _scope) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 138a9e19dff..4958fe01949 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -136,10 +136,15 @@ interface IStatisticCollection : public IInterface virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) = 0; - virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMaxDepth) = 0; + virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, unsigned statsMaxDepth) = 0; + virtual void clearStats() = 0; + virtual void addChild(IStatisticCollection *stats) = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -402,8 +407,8 @@ class jlib_decl StatisticsFilter : public CInterface void setCreator(const char * _creator); void setCreatorType(StatisticCreatorType _creatorType); void setFilter(const char * filter); - void setScopeDepth(unsigned _minScopeDepth); - void setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth); + void setScopeDepth(unsigned _maxDepthDepth); + void setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth); void setScope(const char * _scope); void setScopeType(StatisticScopeType _scopeType); void setValueRange(unsigned __int64 minValue, unsigned __int64 _maxValue); @@ -888,9 +893,12 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); +extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); +extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * childCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); -extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId); +// extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index e96a4ae07c9..8579ba74684 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + GlobalStatisticCollection globalStatsCollection; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -146,7 +147,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -156,6 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + ::updateAggregates(wu, globalStatsCollection); queryServerStatus().commitProperties(); } catch (IException *E) @@ -174,7 +176,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), graph); } @@ -187,7 +189,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -301,6 +302,17 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer reportActiveGraphs(true, false); activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + ::updateAggregates(lockedWu, globalStatsCollection); + } + virtual void loadStats(const char *wuid) override + { + // load stats from GraphProgress + // - ignore stats below subgraph(statsMaxDepth=3) + // - ignore any scopes that have already been loaded into global collection) + globalStatsCollection.load(wuid, 3, true); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..585d7b18dfc 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -30,6 +30,8 @@ interface IDeMonServer : extends IInterface virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadStats(const char *wuid) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 5c114da2c55..5ffcfa4d5ca 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1086,7 +1086,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, } setWuid(workunit.queryWuid(), workunit.queryClusterName()); - + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadStats(workunit.queryWuid()); allDone = job->go(); Owned wu = &workunit.lock(); @@ -1101,7 +1102,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); removeJob(*job); } catch (IException *e)