Skip to content

Commit

Permalink
HPCC-29657 Produce aggregate stats
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 13, 2023
1 parent c2e13ef commit e522b06
Show file tree
Hide file tree
Showing 12 changed files with 387 additions and 169 deletions.
240 changes: 171 additions & 69 deletions common/workunit/workunit.cpp

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator
};

//---------------------------------------------------------------------------------------------------------------------

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1204,6 +1203,7 @@ interface IConstWorkUnitInfo : extends IInterface
virtual IConstWUAppValueIterator & getApplicationValues() const = 0;
};

class GlobalStatisticCollection;
interface IConstWorkUnit : extends IConstWorkUnitInfo
{
virtual bool aborting() const = 0;
Expand Down Expand Up @@ -1302,7 +1302,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0;
virtual void clearGraphProgress() const = 0;
virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
virtual unsigned __int64 getAbortTimeStamp() const = 0;
Expand Down Expand Up @@ -1725,7 +1725,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down Expand Up @@ -1780,4 +1779,22 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);


class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StringBuffer wuid;
};

#endif
4 changes: 2 additions & 2 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public:
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override;
void clearGraphProgress() const;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.

Expand Down Expand Up @@ -661,7 +661,7 @@ public:
class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
{
public:
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge);
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats);
virtual void beforeDispose();
virtual IStatisticGatherer & queryStatsBuilder();
protected:
Expand Down
2 changes: 0 additions & 2 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(w);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(wu);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
4 changes: 3 additions & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,14 +1048,15 @@ public:
graphAgentContext.set(&_agent);
agent = &graphAgentContext;
aborted = false;
globalStats.load(*wu, graphName, true, true);
}

void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml);
void execute(const byte * parentExtract);
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down Expand Up @@ -1083,6 +1084,7 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection globalStats;
};


Expand Down
13 changes: 8 additions & 5 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
parent.updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand Down Expand Up @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

updateAggregates(wu);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1348,7 +1346,7 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, &globalStats);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &globalStats);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
globalStats.updateAggregates(lockedwu);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down
8 changes: 4 additions & 4 deletions plugins/cassandra/cassandrawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
class CCassandraWuGraphStats : public CWuGraphStats
{
public:
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge),
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats),
progress(createPTree(_rootScope)), parent(_parent)
{
}
Expand All @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
StringAttr wuid;
};

IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override
IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override
{
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge);
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats);
}


Expand Down
Loading

0 comments on commit e522b06

Please sign in to comment.