From 9db0c5fc79963e54be3951498f261725dfa83452 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 8 Nov 2023 12:44:21 +0000 Subject: [PATCH] HPCC-29657 Change following review Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 104 ++-------------- common/workunit/workunit.hpp | 4 +- ecl/eclagent/agentctx.hpp | 2 +- ecl/eclagent/eclagent.ipp | 10 +- ecl/eclagent/eclgraph.cpp | 8 +- system/jlib/jstats.cpp | 192 +++++++++++++----------------- system/jlib/jstats.h | 18 ++- thorlcr/master/thdemonserver.cpp | 4 +- thorlcr/master/thdemonserver.hpp | 2 +- thorlcr/master/thgraphmanager.cpp | 2 +- 10 files changed, 115 insertions(+), 231 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 71bff01c3df..5daca9812df 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2669,7 +2669,7 @@ GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(a statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); } -void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) +void GlobalStatisticCollection::loadExistingAggregates(IConstWorkUnit &workunit) { const char * _wuid = workunit.queryWuid(); if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit @@ -2679,57 +2679,6 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap wuid.set(_wuid); } - loadGlobalAggregates(workunit); - if (isEmptyString(graphName)) - { - Owned root = getWUGraphProgress(wuid, true); - if (root) - { - Owned graphPT = root->getPropTree(graphName); - if (!graphPT) - return; - - StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); - StatsScopeId graphScopeId(SSTgraph, graphName); - - Owned iter = graphPT->getElements("*"); - ForEach(*iter) - { - StatsScopeId sgScopeId; - IPropertyTree * sgPT = & iter->query(); - const char * sgName = sgPT->queryName(); - if (strcmp(sgName, "node")==0) - continue; - verifyex(sgScopeId.setScopeText(sgName)); - MemoryBuffer compressed; - sgPT->getPropBin("Stats", compressed); - if (!compressed.length()) - break; - MemoryBuffer serialized; - decompressToBuffer(serialized, compressed); - - unsigned version; - serialized.read(version); - byte kind; - serialized.read(kind); - - StatsScopeId childId; - childId.deserialize(serialized, version); - int statsMinDepth = 0, statsMaxDepth = INT_MAX; - if (aggregatesOnly) - { - // Only store stats for subgraph level - statsMinDepth = 3; // this is subgraph level - statsMaxDepth = 3; - } - statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); - } - } - } -} - -void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) -{ class StatsCollectionAggregatesLoader : public IWuScopeVisitor { public: @@ -2781,57 +2730,20 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(Statist return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); } -// Recalculate aggregates for global, workflow and graph scopes -bool GlobalStatisticCollection::refreshAggregates() -{ - return statsCollection->refreshAggregates(aggregateKindsMapping); -} - // Recalculate aggregates and then write the aggregates to global stats (dali) void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) { - class StatisticsAggregatesWriter : implements IStatisticVisitor + struct AggregateUpdatedCallBackFunc : implements IWhenAggregateUpdatedCallBack { - const StatisticsMapping & aggregateKindsMapping; - const unsigned numStats; Linked wu; - public: - StatisticsAggregatesWriter(IWorkUnit * _wu, const StatisticsMapping & _aggregateKindsMapping): wu(_wu), aggregateKindsMapping(_aggregateKindsMapping), numStats(aggregateKindsMapping.numStatistics()) {} - - virtual bool visitScope(const IStatisticCollection & cur) + AggregateUpdatedCallBackFunc(IWorkUnit *_wu) : wu(_wu) {} + void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) { - switch (cur.queryScopeType()) - { - case SSTglobal: - case SSTworkflow: - case SSTgraph: - for (unsigned i=0; isetStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); - } - } - } - if (cur.queryScopeType()==SSTgraph) - return false; - else - return true; - default: - return false; - } + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); } - }; - if (refreshAggregates()) // Only serialize if the aggregates have changed - { - StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping); - statsCollection->visit(statsAggregatorWriter); - } + } aggregateUpdatedCallBackFunc(wu); + + statsCollection->refreshAggregates(aggregateKindsMapping, aggregateUpdatedCallBackFunc); } // Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 544478d1b72..5140b4d4309 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1788,10 +1788,8 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface public: GlobalStatisticCollection(); - void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly); - void loadGlobalAggregates(IConstWorkUnit &workunit); + void loadExistingAggregates(IConstWorkUnit &workunit); IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats); - bool refreshAggregates(); IStatisticCollection * queryCollection() { return statsCollection; } void updateAggregates(IWorkUnit *wu); void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId); diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index 9d142312269..f55331677dc 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,7 +124,7 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; - virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; virtual void updateAggregates(IWorkUnit* lockedwu) = 0; }; diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 401732d36e0..8ffcc2b9cfb 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -250,9 +250,9 @@ public: { return ctx->queryAgentMachineCost(); }; - virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override { - return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph); + return ctx->updateStats(activeWfid, graphName, subgraph); }; virtual void updateAggregates(IWorkUnit* lockedwu) override { @@ -714,10 +714,10 @@ public: { return agentMachineCost; } - virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override { - Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats - return wuRead->updateStats(graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection); + Owned sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, graphName, subgraph, true); // true=>clear existing stats + return wuRead->updateStats(graphName, queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, subgraph, false, sgCollection); } virtual void updateAggregates(IWorkUnit* lockedwu) override { diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index fc153cb5011..224f4dd0a9b 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -925,7 +925,7 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) subgraphs.item(i2).updateProgress(progress); Owned statsCollection = progress.getResult(); - const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection); + const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess); if (costDiskAccess) progress.addStatistic(StCostFileAccess, costDiskAccess); } @@ -1347,7 +1347,7 @@ void EclGraph::updateLibraryProgress() EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id); + Owned progress = agent->updateStats(wfid, queryGraphName(), cur.id); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1490,7 +1490,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph); + return agent->updateStats(activeWfid, queryGraphName(), subgraph); } void EclGraph::updateAggregates(IWorkUnit* lockedwu) @@ -1547,7 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); - globalStats.load(*wu, nullptr, true); + globalStats.loadExistingAggregates(*wu); return eclGraph.getClear(); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index a3d9c494604..dd2874b9e77 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1750,7 +1750,7 @@ enum }; class CStatisticCollection; -static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version); +static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version); //MORE: Create an implementation with no children typedef StructArrayOf StatsArray; @@ -1786,24 +1786,6 @@ class CStatisticCollection : public CInterfaceOf { friend class CollectionHashTable; - // deserialize without storing the stats - void deserializeNoStats(MemoryBuffer & in, unsigned version) - { - unsigned numStats; - in.read(numStats); - while (numStats-- > 0) - Statistic next (in, version); - unsigned numChildren; - in.read(numChildren); - while (numChildren-- > 0) - { - byte kind; - in.read(kind); - StatsScopeId childId; - childId.deserialize(in, version); - deserializeNoStats(in, version); - } - } public: CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) { @@ -1812,12 +1794,11 @@ class CStatisticCollection : public CInterfaceOf CStatisticCollection(IStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) { id.deserialize(in, version); - deserialize(in, version, 0, INT_MAX); + deserialize(in, version); } virtual byte getCollectionType() const { return SCintermediate; } - //interface IStatisticCollection: virtual StringBuffer &toXML(StringBuffer &out) const override; virtual StatisticScopeType queryScopeType() const override @@ -1836,7 +1817,7 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent && queryScopeType()!=SSTglobal) + if (parent) { parent->getFullScope(str); if (!str.isEmpty()) @@ -1868,7 +1849,21 @@ class CStatisticCollection : public CInterfaceOf } return false; } - virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) override + virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const override + { + bool found = false; + ForEachItemIn(i, stats) + { + const Statistic & cur = stats.item(i); + if (cur.kind == kind) + { + value += cur.value; + found = true; + } + } + return found; + } + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) override { if (*scope=='\0') { @@ -1877,13 +1872,14 @@ class CStatisticCollection : public CInterfaceOf else { StatsScopeId childScopeId; - if (!childScopeId.setScopeText(scope, &scope) || (*scope!=':' && *scope!='\0')) + const char * next; + if (!childScopeId.setScopeText(scope, &next) || (*next!=':' && *next!='\0')) throw makeStringExceptionV(JLIBERR_UnexpectedValue, "'%s' does not appear to be a valid scope id", scope); IStatisticCollection * child = ensureSubScope(childScopeId, true); - if (*scope==':') - scope++; - return child->setStatistic(scope, kind, value); + if (*next==':') + next++; + return child->setStatistic(next, kind, value); } } virtual unsigned getNumStatistics() const override @@ -2034,7 +2030,7 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } - virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + virtual void deserialize(MemoryBuffer & in, unsigned version) override { unsigned numStats; in.read(numStats); @@ -2042,8 +2038,7 @@ class CStatisticCollection : public CInterfaceOf while (numStats-- > 0) { Statistic next(in, version); - if (minDepth <= 0) - stats.append(next); + stats.append(next); } unsigned numChildren; in.read(numChildren); @@ -2054,21 +2049,14 @@ class CStatisticCollection : public CInterfaceOf in.read(kind); StatsScopeId childId; childId.deserialize(in, version); - deserializeChild(childId, in, version, minDepth, maxDepth); + deserializeChild(childId, in, version); } } - virtual void deserializeChild(const StatsScopeId & childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + virtual void deserializeChild(const StatsScopeId & childId, MemoryBuffer & in, unsigned version) override { - if (maxDepth > 0) - { - IStatisticCollection * childCollection = ensureSubScope(childId, true); - childCollection->deserialize(in, version, (minDepth-1), (maxDepth-1)); - } - else - { - deserializeNoStats(in, version); - } + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version); } inline const StatsScopeId & queryScopeId() const { return id; } @@ -2098,21 +2086,20 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } - virtual bool refreshAggregates(const StatisticsMapping & mapping) override + virtual bool refreshAggregates(const StatisticsMapping & mapping, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) override { if (isDirty==false) return false; CRuntimeStatisticCollection totals(mapping); Owned totalUpdated = createBitSet(mapping.numStatistics()); - return refreshAggregates(totals, *totalUpdated); + return refreshAggregates(totals, *totalUpdated, fWhenAggregateUpdated); } - virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) override + virtual bool refreshAggregates(CRuntimeStatisticCollection & parentTotals, IBitSet & isTotalUpdated, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) override { - const StatisticsMapping & mapping = totals.queryMapping(); - bool updated = false; + const StatisticsMapping & mapping = parentTotals.queryMapping(); // if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend) - // Also if at sg scope, do not descend as aggregates does not need to be generated from below sg level + // Also if at sg scope, do not descend as aggregates do not need to be generated from below sg level // (if aggregates should be calculated from descendants of sg scope, then remove the second test) if (isDirty==false || id.queryScopeType()==SSTsubgraph) { @@ -2126,38 +2113,53 @@ class CStatisticCollection : public CInterfaceOf unsigned index = mapping.getIndex(kind); if (index!=mapping.numStatistics()) { - totals.mergeStatistic(kind, stat.queryValue()); - isTotalUpdated.set(index); - updated = true; + // Totals required from this level by parent, even if they are not dirty + parentTotals.mergeStatistic(kind, stat.queryValue()); + if (isDirty) + isTotalUpdated.set(index); } } + if (isDirty) + { + isDirty=false; + return true; + } + else + return false; } else { - // descend down to lower level to obtain totals required for aggregates and then aggregate + // descend down to lower level to obtain totals required for aggregation and then aggregate CRuntimeStatisticCollection childTotals(mapping); Owned childTotalUpdated = createBitSet(mapping.numStatistics()); for (auto & child : children) { - if (child.refreshAggregates(childTotals, *childTotalUpdated)) - updated = true; + child.refreshAggregates(childTotals, *childTotalUpdated, fWhenAggregateUpdated); } - if (updated) + // 1) Set any values that has changed for this scope and 2) update ALL totals for parent + const unsigned numStats = mapping.numStatistics(); + for (unsigned i=0; iscan(0, true); - while (NotFound != i) + StatisticKind kind = mapping.getKind(i); + unsigned __int64 value = childTotals.queryStatisticByIndex(i).get(); + if (childTotalUpdated->test(i)) { - StatisticKind kind = childTotals.getKind(i); - unsigned __int64 value = childTotals.queryStatisticByIndex(i).get(); - updateStatistic(kind, value, StatsMergeReplace); - totals.mergeStatistic(kind, value); - isTotalUpdated.set(i); - i = childTotalUpdated->scan(i+1, true); + if (updateStatistic(kind, value, StatsMergeReplace)) + { + if (value || includeStatisticIfZero(kind)) + { + StringBuffer s; + (fWhenAggregateUpdated)(getFullScope(s).str(), queryScopeType(), kind, value); + isTotalUpdated.set(i); + } + } } + // All totals still required at parent level (even unchanged totals) + parentTotals.mergeStatistic(kind, value); } + isDirty=false; + return true; } - isDirty=false; - return updated; } virtual void clearStats() override @@ -2180,6 +2182,18 @@ class CStatisticCollection : public CInterfaceOf if (parent) parent->markDirty(); } + virtual stat_type aggregateStatistic(StatisticKind kind) const override + { + stat_type sum; + if (!getStatisticSum(kind, sum)) // get sum of statistics at this level + { + // if no stats at this level, then get sum of stats from children + for (auto & child : children) + sum += child.aggregateStatistic(kind); + } + return sum; + } + private: StatsScopeId id; IStatisticCollection * parent; @@ -2204,8 +2218,7 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const SuperHashIteratorOf iter(children, false); for (iter.first(); iter.isValid(); iter.next()) iter.query().toXML(out); - out.append(" // Scope id=\""); - id.getScopeText(out).append("\"\n"); + out.append("\n"); return out; } @@ -2307,51 +2320,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; -class StatAggregator : implements IStatisticVisitor -{ -public: - StatAggregator(StatisticKind _kind) : kind(_kind) {} - - virtual bool visitScope(const IStatisticCollection & cur) - { - switch (cur.queryScopeType()) - { - //If there is a match for the stat in any of these containers, then avoid summing any child scopes - case SSTglobal: - case SSTgraph: - case SSTsubgraph: - case SSTsection: - case SSTchildgraph: - case SSTworkflow: - { - stat_type value; - if (cur.getStatistic(kind, value)) - { - total += value; - return false; - } - return true; - } - //Default is to sum the value for this scope and children => recurse. E.g. activity and any child activities. - default: - total += cur.queryStatistic(kind); - return true; - } - } - stat_type getTotal() const { return total; } -private: - stat_type total = 0; - StatisticKind kind; -}; - - -stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection) -{ - StatAggregator aggregator(kind); - statsCollection->visit(aggregator); - return aggregator.getTotal(); -} - //--------------------------------------------------------------------------------------------------------------------- void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection) @@ -2360,7 +2328,7 @@ void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * col collection->serialize(out); } -static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version) +static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version) { byte kind; in.read(kind); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 95b78f7d0a0..c6a4b9aec1d 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -117,6 +117,11 @@ enum StatsMergeAction StatsMergeLast, }; +interface IWhenAggregateUpdatedCallBack +{ + virtual void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) = 0; +}; + class StatisticsMapping; class CRuntimeStatisticCollection; interface IStatisticCollection : public IInterface @@ -129,7 +134,8 @@ interface IStatisticCollection : public IInterface virtual unsigned getNumStatistics() const = 0; virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0; virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0; - virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) = 0; + virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const = 0; + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) = 0; virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; @@ -147,13 +153,14 @@ interface IStatisticCollection : public IInterface virtual void pruneChildStats() = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; - virtual bool refreshAggregates(const StatisticsMapping & mapping) = 0; - virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) = 0; - virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; - virtual void deserializeChild(const StatsScopeId & scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual bool refreshAggregates(const StatisticsMapping & mapping, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) = 0; + virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated, IWhenAggregateUpdatedCallBack & fWhenAggregateUpdated) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; + virtual void deserializeChild(const StatsScopeId & scopeId, MemoryBuffer & in, unsigned version) = 0; virtual void clearStats() = 0; virtual void addChild(IStatisticCollection *stats) = 0; virtual void markDirty() = 0; + virtual stat_type aggregateStatistic(StatisticKind kind) const = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -967,6 +974,5 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget }; extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value); -extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root); #endif diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index dc4fc3f4068..92f861d23f8 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -318,9 +318,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { globalStatsCollection.updateAggregates(lockedWu); } - virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) override + virtual void loadExistingAggregates(IConstWorkUnit &workunit) override { - globalStatsCollection.load(workunit, graphName, aggregatesOnly); + globalStatsCollection.loadExistingAggregates(workunit); } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 70cfa5a97b7..3931b540ded 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -32,7 +32,7 @@ interface IDeMonServer : extends IInterface virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; virtual void updateAggregates(IWorkUnit * lockedWu) = 0; - virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) = 0; + virtual void loadExistingAggregates(IConstWorkUnit &workunit) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 07456b72f39..c1c4d6ac3a9 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1118,7 +1118,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, podInfo.report(wu); } if (globals->getPropBool("@watchdogProgressEnabled")) - queryDeMonServer()->loadStats(workunit, nullptr, true); //graphName==nullptr so that sg stats are not loaded(at present, partial graph resumption not possible) + queryDeMonServer()->loadExistingAggregates(workunit); setWuid(workunit.queryWuid(), workunit.queryClusterName());