Skip to content

Commit

Permalink
Relocate GlobalStatisticCollection to workunit.cpp
Browse files Browse the repository at this point in the history
Prepare dynamic stats tracking by GlobalStatisticCollection
Remove unnecessary calls to updateAggregation
Track active graph stats for aggregation
Improve performance by updating aggregates intermittantly and at end of each graph
Load serialized graph stats when resuming job to ensure aggregates are
calculated correctly.
Only update aggregates if they have changed

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 12, 2023
1 parent c2e13ef commit 25bdaa9
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 170 deletions.
241 changes: 172 additions & 69 deletions common/workunit/workunit.cpp

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator
};

//---------------------------------------------------------------------------------------------------------------------

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1204,6 +1203,7 @@ interface IConstWorkUnitInfo : extends IInterface
virtual IConstWUAppValueIterator & getApplicationValues() const = 0;
};

class GlobalStatisticCollection;
interface IConstWorkUnit : extends IConstWorkUnitInfo
{
virtual bool aborting() const = 0;
Expand Down Expand Up @@ -1302,7 +1302,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0;
virtual void clearGraphProgress() const = 0;
virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
virtual unsigned __int64 getAbortTimeStamp() const = 0;
Expand Down Expand Up @@ -1725,7 +1725,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down Expand Up @@ -1780,4 +1779,22 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);


class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StringBuffer wuid;
};

#endif
4 changes: 2 additions & 2 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public:
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override;
void clearGraphProgress() const;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.

Expand Down Expand Up @@ -661,7 +661,7 @@ public:
class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
{
public:
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge);
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats);
virtual void beforeDispose();
virtual IStatisticGatherer & queryStatsBuilder();
protected:
Expand Down
2 changes: 0 additions & 2 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(w);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(wu);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
4 changes: 3 additions & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,14 +1048,15 @@ public:
graphAgentContext.set(&_agent);
agent = &graphAgentContext;
aborted = false;
globalStats.load(*wu, graphName, true, true);
}

void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml);
void execute(const byte * parentExtract);
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down Expand Up @@ -1083,6 +1084,7 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection globalStats;
};


Expand Down
13 changes: 8 additions & 5 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
parent.updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand Down Expand Up @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

updateAggregates(wu);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1348,7 +1346,7 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, &globalStats);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &globalStats);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
globalStats.updateAggregates(lockedwu);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down
8 changes: 4 additions & 4 deletions plugins/cassandra/cassandrawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
class CCassandraWuGraphStats : public CWuGraphStats
{
public:
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge),
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats),
progress(createPTree(_rootScope)), parent(_parent)
{
}
Expand All @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
StringAttr wuid;
};

IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override
IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override
{
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge);
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats);
}


Expand Down
Loading

0 comments on commit 25bdaa9

Please sign in to comment.