Skip to content

Commit

Permalink
HPCC-29657 Load existing stats so resumes aggregation stats are correct
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 4, 2023
1 parent 4930b80 commit 013479a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 26 deletions.
26 changes: 7 additions & 19 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2777,12 +2777,13 @@ class SubGraphUpdaterCollection : public CInterfaceOf<IStatisticCollection>
}
};

GlobalStatisticCollection::GlobalStatisticCollection(IPropertyTree * root, StatisticScopeType minScope)
void GlobalStatisticCollection::load(const char *wuid, StatisticScopeType minScope)
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (!root)
return;
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
Owned<IPropertyTreeIterator> iter = root->getElements("*");
ForEach(*iter)
{
Expand Down Expand Up @@ -2867,28 +2868,15 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor
}
};

void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection *statsCollection)
void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection)
{
// Further improvements:
// 1) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized.
// 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
if (statsCollection)
{
statsCollection->refreshAggregates(aggregateKinds);
statsCollection->visit(statsAggregatorWriter);
}
else
{
Owned<IPropertyTree> root = getWUGraphProgress(wu->queryWuid(), true);
if (root)
{
Owned<GlobalStatisticCollection> stats = new GlobalStatisticCollection(root, SSTsubgraph);
stats->refreshAggregates(aggregateKinds);
stats->visit(statsAggregatorWriter);
}
}
statsCollection.refreshAggregates(aggregateKinds);
statsCollection.visit(statsAggregatorWriter);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1178,12 +1178,12 @@ interface IConstWUScopeIterator : extends IScmIterator
class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection(IPropertyTree * root=nullptr, StatisticScopeType minScope=SSTnone);
bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds);
void visit(IStatisticVisitor & target) const;
// getCollection() returns IStatisticCollection for given rootScope
// if clear==true, clears the stats at this and below scope
IStatisticCollection * getCollection(const StatsScopeId & wfScope, const StatsScopeId & graphScope, const StatsScopeId & sgScope, StatisticCreatorType creatorType, const char * creator);
void load(const char *wuid, StatisticScopeType minScope=SSTnone);
private:
Owned<IStatisticCollection> statsCollection;
};
Expand Down Expand Up @@ -1738,7 +1738,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection *statsCollection=nullptr);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down
4 changes: 2 additions & 2 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void EclSubGraph::updateProgress()
if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
updateAggregates(lockedwu);
parent.updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand Down Expand Up @@ -1494,7 +1494,7 @@ IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const cha

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
::updateAggregates(lockedwu, &statsCache);
::updateAggregates(lockedwu, statsCache);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
unsigned startTime = graphStarts.item(g2);
reportStatus(wu, graph, startTime, finished, success);
}
::updateAggregates(wu, &statsCache);
::updateAggregates(wu, statsCache);
queryServerStatus().commitProperties();
}
catch (IException *E)
Expand Down Expand Up @@ -304,7 +304,11 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
}
virtual void updateAggregates(IWorkUnit * lockedWu) override
{
::updateAggregates(lockedWu, &statsCache);
::updateAggregates(lockedWu, statsCache);
}
virtual void loadStats(const char *wuid) override
{
statsCache.load(wuid, SSTsubgraph);
}
};

Expand Down
1 change: 1 addition & 0 deletions thorlcr/master/thdemonserver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IDeMonServer : extends IInterface
virtual void endGraph(CGraphBase *graph, bool success) = 0;
virtual void endGraphs() = 0;
virtual void updateAggregates(IWorkUnit * lockedWu) = 0;
virtual void loadStats(const char *wuid) = 0;
};


Expand Down
3 changes: 2 additions & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
}

setWuid(workunit.queryWuid(), workunit.queryClusterName());

if (job->queryResumed() && globals->getPropBool("@watchdogProgressEnabled"))
queryDeMonServer()->loadStats(workunit.queryWuid());
allDone = job->go();

Owned<IWorkUnit> wu = &workunit.lock();
Expand Down

0 comments on commit 013479a

Please sign in to comment.