Skip to content

Commit

Permalink
Relocate GlobalStatisticCollection to workunit.cpp
Browse files Browse the repository at this point in the history
Prepare dynamic stats tracking by GlobalStatisticCollection
Remove unnecessary calls to updateAggregation
Track active graph stats for aggregation
Improve performance by updating aggregates intermittantly and at end of each graph
Load serialized graph stats when resuming job to ensure aggregates are
calculated correctly.
Only update aggregates if they have changed

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 12, 2023
1 parent c2e13ef commit 0184f96
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 148 deletions.
169 changes: 118 additions & 51 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,27 @@ void doDescheduleWorkkunit(char const * wuid)
* Graph progress support
*/

CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge)
CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection)
: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge)
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId wfScopeId(SSTworkflow,wfid);

StatsScopeId rootScopeId(SSTworkflow,wfid);
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
if (globalStatsCollection)
{
StatsScopeId sgScopeId(SSTsubgraph, id);
collector.setown(createStatisticsGatherer(globalStatsCollection->getCollection(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true)));
}
else
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId));
collector->beginScope(graphScopeId);
}

void CWuGraphStats::beforeDispose()
{
collector->endScope();

StringBuffer tag;
tag.append("sg").append(id);

Expand Down Expand Up @@ -2659,6 +2666,87 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

GlobalStatisticCollection::GlobalStatisticCollection()
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly)
{
if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit
{
if (!wuid.isEmpty())
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}
wuid.set(_wuid);
}
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (!root)
return;
Owned<IPropertyTreeIterator> iter = root->getElements("*");
ForEach(*iter)
{
IPropertyTree * graphPT = &iter->query();
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphPT->queryName()));
StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt64("@wfid",0));
Owned<IPropertyTreeIterator> iter2 = graphPT->getElements("./*");
ForEach(*iter2)
{
StatsScopeId sgScopeId;
IPropertyTree * sgPT = & iter2->query();
const char * sgName = sgPT->queryName();
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));

if (missingScopesOnly) // Skip any scopes that are already in the stats collection
{
IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId});
if (sgCollection)
continue;
}

MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
break;

MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);
unsigned version;
serialized.read(version);
byte kind;
serialized.read(kind);

StatsScopeId childId;
childId.deserialize(serialized, version);
statsCollection->deserializeChild(childId, serialized, version, statsMaxDepth);
}
}
}

bool GlobalStatisticCollection::refreshAggregates(std::vector<StatisticKind> & aggregateKinds)
{
std::vector<unsigned __int64> totals(aggregateKinds.size());
return statsCollection->refreshAggregates(aggregateKinds, totals);
}

void GlobalStatisticCollection::visit(IStatisticVisitor & target) const
{
statsCollection->visit(target);
}

IStatisticCollection * GlobalStatisticCollection::getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats)
{
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId});
if (clearStats)
sgScopeCollection->clearStats();
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
}

class StatisticsAggregatesWriter : implements IStatisticVisitor
{
Expand All @@ -2669,6 +2757,8 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor

virtual bool visitScope(const IStatisticCollection & cur)
{
StringBuffer s2, s3;
DBGLOG("visitScope %s -> %s", cur.getScope(s2).str(), cur.getFullScope(s3).str() );
switch (cur.queryScopeType())
{
case SSTglobal:
Expand All @@ -2683,51 +2773,28 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace);
}
}
if (cur.queryScopeType()==SSTgraph)
return false;
else
//if (cur.queryScopeType()==SSTgraph)
// return false;
//else
return true;
default:
return false;
//return false;
return true;
}
}
};

void updateAggregates(IWorkUnit *wu)
{
// updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics.
//
// The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor:
// it could do this by:
// 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer
// 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats
// 2) Replace existing code related to gathering stats from CMasterGraph and serializing with
// a new method that updates the statistics to GlobalStatisticCollection
// 3) New method for gathering and serializing graph stats:
// (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection
// - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case)
// - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children.
// This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph.
// (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly,
// (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized.
// (b) this member function would be called by DeMonServer at fixed intevals
// 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates
// - this is need because multiple calls to refreshAggregates should not add to existing aggregates
// Subsequent iteration:
// 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats
Owned<IPropertyTree> root = getWUGraphProgress(wu->queryWuid(), true);
if (root)
{
Owned<IStatisticCollection> stats = createGlobalStatisticCollection(root);
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
std::vector<unsigned __int64> totals(aggregateKinds.size());

stats->refreshAggregates(aggregateKinds, totals);

StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
stats->visit(statsAggregatorWriter);
}
void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection)
{
// Further improvements:
// 1) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized.
// 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
if(statsCollection.refreshAggregates(aggregateKinds)) // Only serialize if the aggregates has changed
statsCollection.visit(statsAggregatorWriter);
}

//---------------------------------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -3797,8 +3864,8 @@ class CDaliWorkUnit;
class CDaliWuGraphStats : public CWuGraphStats
{
public:
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid)
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid)
{
}
protected:
Expand All @@ -3812,8 +3879,8 @@ class CDaliWuGraphStats : public CWuGraphStats
class CLocalWuGraphStats : public CWuGraphStats
{
public:
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p)
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p)
{
}
protected:
Expand Down Expand Up @@ -4117,9 +4184,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit
}
}
}
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override
{
return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge);
return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats);
}
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
{
Expand Down Expand Up @@ -4429,8 +4496,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ c->setGraphState(graphName, wfid, state); }
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
{ c->setNodeState(graphName, nodeId, state); }
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override
{ return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); }
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override
{ return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); }
virtual void clearGraphProgress() const
{ c->clearGraphProgress(); }
virtual IStringVal & getAbortBy(IStringVal & str) const
Expand Down Expand Up @@ -10260,9 +10327,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W
{
throwUnexpected(); // Should only be used for persisted workunits
}
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const
{
return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge);
return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats);
}

void CLocalWUGraph::setName(const char *str)
Expand Down
21 changes: 19 additions & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,23 @@ interface IConstWUScopeIterator : extends IScmIterator

//---------------------------------------------------------------------------------------------------------------------

class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection();
bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds);
void visit(IStatisticVisitor & target) const;
// getCollection() returns IStatisticCollection for given rootScope
// if clearStats==true then the existing stats are cleared for the given scope
IStatisticCollection * getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats);
// statsMaxDepth = load stats up until this depth (e.g. 3 means loads stats up until sg scope )
void load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly);
IStatisticCollection * queryCollection() { return statsCollection; }
private:
Owned<IStatisticCollection> statsCollection;
StringBuffer wuid;
};

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1302,7 +1319,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0;
virtual void clearGraphProgress() const = 0;
virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
virtual unsigned __int64 getAbortTimeStamp() const = 0;
Expand Down Expand Up @@ -1725,7 +1742,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down
4 changes: 2 additions & 2 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public:
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override;
void clearGraphProgress() const;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.

Expand Down Expand Up @@ -661,7 +661,7 @@ public:
class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
{
public:
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge);
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats);
virtual void beforeDispose();
virtual IStatisticGatherer & queryStatsBuilder();
protected:
Expand Down
2 changes: 0 additions & 2 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(w);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(wu);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
3 changes: 2 additions & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ public:
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down Expand Up @@ -1083,6 +1083,7 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection statsCache;
};


Expand Down
Loading

0 comments on commit 0184f96

Please sign in to comment.