Skip to content

Commit

Permalink
HPCC-29657 Address minor code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Dec 1, 2023
1 parent ed6acf6 commit 73b7bcd
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 46 deletions.
12 changes: 6 additions & 6 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2659,7 +2659,7 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit)
void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit)
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(globalScopeId));
Expand All @@ -2682,9 +2682,9 @@ void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit)

WuScopeFilter filter;
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph);
const unsigned numStats = aggregateKindsMapping.numStatistics();
const unsigned numStats = mapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(aggregateKindsMapping.getKind(i));
filter.addOutputStatistic(mapping.getKind(i));
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
Expand All @@ -2703,7 +2703,7 @@ void StatisticsAggregator::recordStats(IStatisticCollection * sourceStats, unsig
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);
statsCollection->recordStats(aggregateKindsMapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId});
statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId});
}

// Recalculate aggregates and then write the aggregates to global stats (dali)
Expand All @@ -2712,12 +2712,12 @@ void StatisticsAggregator::updateAggregates(IWorkUnit *wu)
if (!statsCollection)
return;

std::function f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
};

statsCollection->refreshAggregates(aggregateKindsMapping, f);
statsCollection->refreshAggregates(mapping, f);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1786,13 +1786,13 @@ extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, u
class WORKUNIT_API StatisticsAggregator : public CInterface
{
public:
StatisticsAggregator() : aggregateKindsMapping(aggregateKindStatistics) {}
void loadExistingAggregates(IConstWorkUnit &workunit);
StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {}
void loadExistingAggregates(const IConstWorkUnit &workunit);
void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId);
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
const StatisticsMapping & aggregateKindsMapping;
const StatisticsMapping & mapping;
};

#endif
2 changes: 1 addition & 1 deletion ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ struct IAgentContext : extends IGlobalCodeContext
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0;
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0;
};

#endif // AGENTCTX_HPP_INCL
2 changes: 1 addition & 1 deletion ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class EclAgentPluginCtx : public SimplePluginCtx
//=======================================================================================

EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), statsAggregator(stdAggregateKindStatistics)
{
isAborting = false;
isStandAloneExe = false;
Expand Down
7 changes: 3 additions & 4 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public:
{
ctx->updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) override
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
ctx->mergeAggregatorStats(stats, wfid, graphname, sgId);
}
Expand Down Expand Up @@ -718,10 +718,9 @@ public:
{
statsAggregator.updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticGatherer & stats, unsigned wfid, const char *graphname, unsigned sgId) override
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
Linked<IStatisticCollection> statsCollection = stats.getResult();
statsAggregator.recordStats(statsCollection, wfid, graphname, sgId);
statsAggregator.recordStats(&stats, wfid, graphname, sgId);
}
};

Expand Down
6 changes: 4 additions & 2 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,8 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);
agent->mergeAggregatorStats(stats, parent.queryWfid(), parent.queryGraphName(), id);
Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id);
if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
Expand Down Expand Up @@ -1351,7 +1352,8 @@ void EclGraph::updateLibraryProgress()
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
IStatisticGatherer & stats = progress->queryStatsBuilder();
cur.updateProgress(stats);
agent->mergeAggregatorStats(stats, wfid, queryGraphName(), cur.id);
Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id);
}
}

Expand Down
32 changes: 9 additions & 23 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk
const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles});
const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles});
const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});
const StatisticsMapping stdAggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});

const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode)
{
Expand Down Expand Up @@ -1799,7 +1799,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
isDirty=true;
if (parent) parent->markDirty();
}
void refreshAggregates(CRuntimeStatisticCollection & parentTotals, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated)
void refreshAggregates(CRuntimeStatisticCollection & parentTotals, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated)
{
const StatisticsMapping & mapping = parentTotals.queryMapping();
// if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend)
Expand All @@ -1811,8 +1811,8 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
Statistic & stat = stats.element(i);
StatisticKind kind = stat.queryKind();
if (kind != (StatisticKind)(kind & StKindMask))
continue; // ignore variants
if (queryStatsVariant(kind) != 0)
continue; // ignore variants (shouldn't happen as the mapping ensure only the aggregator kinds are present)
if (mapping.hasKind(kind))
{
// Totals required from this level by parent, even if they are not dirty
Expand Down Expand Up @@ -1941,20 +1941,6 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
kind = cur.kind;
value = cur.value;
}
virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const override
{
bool found = false;
ForEachItemIn(i, stats)
{
const Statistic & cur = stats.item(i);
if (cur.kind == kind)
{
value += cur.value;
found = true;
}
}
return found;
}
virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) override
{
assertex(!filter);
Expand Down Expand Up @@ -2092,7 +2078,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
for (auto const & cur : children)
cur.visit(visitor);
}
virtual void refreshAggregates(const StatisticsMapping & mapping, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated) override
virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) override
{
if (isDirty)
{
Expand All @@ -2102,8 +2088,8 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
virtual stat_type aggregateStatistic(StatisticKind kind) const override
{
stat_type sum;
if (!getStatisticSum(kind, sum)) // get sum of statistics at this level
stat_type sum = 0;
if (!getStatistic(kind, sum)) // get sum of statistics at this level
{
// if no stats at this level, then get sum of stats from children
for (auto & child : children)
Expand All @@ -2115,7 +2101,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
CStatisticCollection * curSrcCollection = static_cast<CStatisticCollection *>(sourceStatsCollection);
const StatsScopeId * scopeItem = path.begin();

// n.b. sourceStatsCollection has workflow as root but this collection has global as root
// Locate the collection with the stats and make curSrcCollection point to that
if (!curSrcCollection || curSrcCollection->queryScopeId().compare(*scopeItem)!=0)
return; // Required path doesn't exist in source collection so nothing more to do here
Expand All @@ -2134,7 +2120,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
ForEachItemIn(i, curSrcCollection->stats)
{
Statistic & cur = curSrcCollection->stats.element(i);
if (cur.kind != (StatisticKind)(cur.kind & StKindMask))
if (queryStatsVariant(cur.kind) != 0)
continue; // ignore variants
if (mapping.hasKind(cur.kind))
{
Expand Down
7 changes: 4 additions & 3 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ interface IStatisticGatherer;
interface IStatisticVisitor;

class jlib_decl StatisticsMapping;
typedef std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> AggregateUpdatedCallBackFunc;

interface IStatisticCollection : public IInterface
{
public:
Expand All @@ -115,7 +117,6 @@ interface IStatisticCollection : public IInterface
virtual unsigned getNumStatistics() const = 0;
virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0;
virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0;
virtual bool getStatisticSum(StatisticKind kind, unsigned __int64 & value) const = 0;
virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0;
virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0;
virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0;
Expand All @@ -126,7 +127,7 @@ interface IStatisticCollection : public IInterface
virtual StringBuffer &toXML(StringBuffer &out) const = 0;
virtual void visit(IStatisticVisitor & target) const = 0;
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual void refreshAggregates(const StatisticsMapping & mapping, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated) = 0;
virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) = 0;
virtual stat_type aggregateStatistic(StatisticKind kind) const = 0;
virtual void recordStats(const StatisticsMapping & mapping, IStatisticCollection * statsCollection, std::initializer_list<const StatsScopeId> path) = 0;
};
Expand Down Expand Up @@ -503,7 +504,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics;
extern const jlib_decl StatisticsMapping diskReadRemoteStatistics;
extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics;
extern const jlib_decl StatisticsMapping jhtreeCacheStatistics;
extern const jlib_decl StatisticsMapping aggregateKindStatistics;
extern const jlib_decl StatisticsMapping stdAggregateKindStatistics;

//---------------------------------------------------------------------------------------------------------------------

Expand Down
6 changes: 3 additions & 3 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false);
IStatisticGatherer & statsBuilder = stats->queryStatsBuilder();
reportGraph(statsBuilder, & graph);
reportGraph(statsBuilder, &graph);
// Merge only the stats at the specified scope level
Linked<IStatisticCollection> statsCollection = statsBuilder.getResult();
Owned<IStatisticCollection> statsCollection = statsBuilder.getResult();
statsAggregator.recordStats(statsCollection, wfid, graphName, graph.queryGraphId());
}
void reportActiveGraphs(bool finished, bool success=true)
Expand Down Expand Up @@ -201,7 +201,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

DeMonServer()
DeMonServer() : statsAggregator(stdAggregateKindStatistics)
{
lastReport = msTick();
reportRate = globals->getPropInt("@watchdogProgressInterval", 30);
Expand Down

0 comments on commit 73b7bcd

Please sign in to comment.