Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #18048

Merged
merged 1 commit into from
Dec 7, 2023

Conversation

shamser
Copy link
Contributor

@shamser shamser commented Nov 17, 2023

… running

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Cloud-compatibility
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Smoketest:

  • Send notifications about my Pull Request position in Smoketest queue.
  • Test my draft Pull Request.

Testing:

@shamser shamser changed the title HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is… HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running Nov 17, 2023
@shamser shamser force-pushed the issue29657new2 branch 2 times, most recently from 05dece3 to eddf198 Compare November 17, 2023 17:52
@shamser shamser marked this pull request as ready for review November 17, 2023 17:54
@shamser shamser force-pushed the issue29657new2 branch 3 times, most recently from 118afc7 to b663a50 Compare November 17, 2023 18:04
@shamser shamser changed the base branch from master to candidate-9.4.x November 17, 2023 18:04
@shamser shamser force-pushed the issue29657new2 branch 2 times, most recently from 2768811 to ed6acf6 Compare November 17, 2023 18:13
Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - please see comments.

I'm not sure the structure is quite right.
I think StatsAggregator should be moved into jstats, and more of the aggregation functionality should be encapsulated in it, including recordStats and refreshAggregates.
With only those methods that rely on a IConstWorkUnit/IWorkUnit implemented workunit.*

thorlcr/master/thdemonserver.cpp Outdated Show resolved Hide resolved
common/workunit/workunit.hpp Outdated Show resolved Hide resolved
reportGraph(statsBuilder, & graph);
// Merge only the stats at the specified scope level
Linked<IStatisticCollection> statsCollection = statsBuilder.getResult();
statsAggregator.recordStats(statsCollection, wfid, graphName, graph.queryGraphId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a shame to pass in { wfid, grpahName, graph.queryGraphId() }, to relocate the IStatisticCollection (inside recordStats), when updatteStaats has preprepared the gatherer at the correct scope (before reportGraph is called).
Could add a IStatisticCollection *IStatisiticGather::getCurrentScopeCollection(), to get the sgStatsCollection directly, and pass that in.
Perhaps recordStats should also get the scope chain from the collection rather than pass it here (would need something similar to getFullScope, but returned list of scope ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateStats creates gatherer at the graph scope not sg scope. Thinking about it, it should be possible to achieve the recordStats without the path.

ecl/eclagent/eclagent.ipp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Show resolved Hide resolved
ecl/eclagent/agentctx.hpp Outdated Show resolved Hide resolved
@@ -879,10 +879,11 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

agent->mergeAggregatorStats(stats, parent.queryWfid(), parent.queryGraphName(), id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comments. I think with other changes, this could be something like:

        IStatisticGatherer & stats = progress->queryStatsBuilder();
        Owned<IStatisticCollection> sgStatsCollection = stats.getCurrentScopeCollection();
        cur.updateProgress(stats);
        agent->mergeAggregatorStats(*sgStatsCollection);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That may not be true for roxie "subgraph" stats which are for a whole graph.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as per the other comments, if the "copy stats" method in the StatisticAggregator is generic and is passed an arbitrary IStatisticCollection, then uses that to discover its source scope + calculate the target to copy to, it should be able to be generic code which works in the roxie case too (with roxie similarly passing through its builder current scope level).

isDirty=true;
if (parent) parent->markDirty();
}
void refreshAggregates(CRuntimeStatisticCollection & parentTotals, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to other comments.
I think this can and should be moved outside of CStatisticCollection and into the StatisticAggregator class, and for it to be implemented using IStatisticCollection methods.
As per other comments, IStatisticCollection would still need to expose some methods, but it would be separate out the aggregate implementation, and should be cleaner.

@@ -1915,15 +1995,33 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
for (iter.first(); iter.isValid(); iter.next())
iter.query().getMinMaxActivity(minValue, maxValue);
}
virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is essentially a 'ensureScope' function + a scope->setStatisitic(kind, value); implementation.
It would be cleaner (and enable other changes) if it was separated out to:

IStatisticCollecton::ensureScope(const char *fullScope) = 0;
void setStatistic(StatisticKind kind, unsigned __int64 value) = 0;

Although at that point, setStatistic, is very much like updateStatistic, so it may be better to just expose updateStatistic and have the aggregator class call it directly.

virtual void visitChildren(IStatisticVisitor & visitor) const
{
for (auto const & cur : children)
cur.visit(visitor);
}
virtual void refreshAggregates(const StatisticsMapping & mapping, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated) override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be encapsulated in StatisticAggregator class.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is much easier to understand, and functionality is better encapsulated. A few comments, most minor, but I don't think the disk cost aggregation would currently be working (uninitialised variable).

One thing to discuss: The impact of moving file costs from the global stats to the subgraph stats.

One potential issue that I have spotted, but should be addressed in a later PR.
This code is assuming the only file costs and execution costs are in the graphs. However, if a query contains file copy/spray/despray operations then they should be recorded against the workflow item.

That suggests that the stats aggregator should possible record a new stats variant of the accumulated costs, rather than reusing the same stats value for the aggregated value.

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means that the cost will be recorded in the subgraphs stats rather than the global stats as it was before. Does that mean it will not be picked up by the stats aggregator?
If you cannot restart a graph I don't think it makes a difference, but worth discussing - it may make a difference for eclwatch responsiveness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the aggregated total should still be published at the global level after this change (unless I'm missing something),
but instead of it being calculated and set directly into global stats, it is calculated and set indirectly via the statsAggregator (inside updateProgress).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this statistic wasn't picked up by the aggregator. It needs to be in the subgraph scope. However, the aggregator will add the statistic back in as a global statistic.

if (cur.kind == kind)
{
value += cur.value;
found = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this return true at this point? I don't think there can be more than once instance of a stat.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See further comments about deleting this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can't return at this point. It needs to continue summing as there may be duplicate kinds in the stats array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can cause multiple entries for the same kind in the array? That sounds like a bug.

ForEachItemIn(i, curSrcCollection->stats)
{
Statistic & cur = curSrcCollection->stats.element(i);
if (cur.kind != (StatisticKind)(cur.kind & StKindMask))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

easier to read as:

if (queryStatsVariant(cur.kind) != 0)

virtual stat_type aggregateStatistic(StatisticKind kind) const override
{
stat_type sum;
if (!getStatisticSum(kind, sum)) // get sum of statistics at this level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could call getStatistic instead() and avoid the need for a new function.
Actually the existing code is wrong because sum is not initialised on entry - even more reason to change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStatistic returns the first value for the kind. There could be multiple kinds in the stats array. I missed initialization of sum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See question/comment elsewhere. There should only be a single entry for each kind in the stats collection at each level.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there should never be duplicates (and https://track.hpccsystems.com/browse/HPCC-30949 to fix place where could be), the new getStatisticSum() method should now be removed from this PR, and getStatistic() used here instead.

IStatisticGatherer & statsBuilder = stats->queryStatsBuilder();
reportGraph(statsBuilder, & graph);
// Merge only the stats at the specified scope level
Linked<IStatisticCollection> statsCollection = statsBuilder.getResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: I don't think you need to assign to a Linked ptr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Jake

++scopeItem;
}

CStatisticCollection * tgtScopeCollection = ensureSubScopePath(path);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to note as unusual. The source stats have a root item of a workflow item, the target have a root of the global scope. I suspect the subgraph stats should be changed, but snot something to consider at this point.

}
virtual stat_type aggregateStatistic(StatisticKind kind) const override
{
stat_type sum;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing initialiser (both both true and false branches).

{
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false);
IStatisticGatherer & statsBuilder = stats->queryStatsBuilder();
reportGraph(statsBuilder, & graph);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: extra space before graph.

{
Statistic & stat = stats.element(i);
StatisticKind kind = stat.queryKind();
if (kind != (StatisticKind)(kind & StKindMask))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to read as

queryStatsVariant(kind) != 0

(mentioned elsewhere). Possibly better would be to move that test into hasKind()

@ghalliday
Copy link
Member

I agree with many of Jake's comments - e.g. the aggregator living in jstats and the location of several of the functions. They might be better to address in a separate PR though. I think the structure is now broadly correct, and once it works if may be better to merge as-is.
(It might be better for someone else to refactor the code, and then for Shamser to review it, rather than Shamser trying to second-guess what the reviewers are trying to get at. Something to discuss more generally I suspect.)

@jakesmith
Copy link
Member

I agree with many of Jake's comments - e.g. the aggregator living in jstats and the location of several of the functions. They might be better to address in a separate PR though. I think the structure is now broadly correct, and once it works if may be better to merge as-is. (It might be better for someone else to refactor the code, and then for Shamser to review it, rather than Shamser trying to second-guess what the reviewers are trying to get at. Something to discuss more generally I suspect.)

@shamser - ok, with this in mind, let's avoid significant refactoring/changes for now in this PR, and focus on the more minor issues/fixes, with the aim of merging this soon. Then we can revisit after that.

virtual void serialize(MemoryBuffer & out) const = 0;
virtual unsigned __int64 queryWhenCreated() const = 0;
virtual void mergeInto(IStatisticGatherer & target) const = 0;
virtual StringBuffer &toXML(StringBuffer &out) const = 0;
virtual void visit(IStatisticVisitor & target) const = 0;
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual void refreshAggregates(const StatisticsMapping & mapping, std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)> & fWhenAggregateUpdated) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This callback definition (std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)>) would benefit from a typedef to clarify.

@shamser
Copy link
Contributor Author

shamser commented Nov 23, 2023

@jakesmith I've addressed most of the minor suggestions. @ghalliday suggested that it may be easier for you to refactor.

@shamser shamser requested a review from jakesmith November 23, 2023 11:08
@jakesmith
Copy link
Member

{quote}
One potential issue that I have spotted, but should be addressed in a later PR.
This code is assuming the only file costs and execution costs are in the graphs. However, if a query contains file copy/spray/despray operations then they should be recorded against the workflow item.

That suggests that the stats aggregator should possible record a new stats variant of the accumulated costs, rather than reusing the same stats value for the aggregated value.
{quote}

@shamser - not sure if a JIRA been opened for this yet? Can you reference here and link as a related issue to this JIRA?

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser changes look good I think.

I will make a final pass after @ghalliday has reviewed changes based on his previous comments.

@jakesmith jakesmith requested a review from ghalliday November 23, 2023 13:16
Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser please can you reply to the question about duplicate stats. Otherwise looks ok to merge.

virtual stat_type aggregateStatistic(StatisticKind kind) const override
{
stat_type sum;
if (!getStatisticSum(kind, sum)) // get sum of statistics at this level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See question/comment elsewhere. There should only be a single entry for each kind in the stats collection at each level.

if (cur.kind == kind)
{
value += cur.value;
found = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can cause multiple entries for the same kind in the array? That sounds like a bug.

@shamser
Copy link
Contributor Author

shamser commented Nov 28, 2023

RE:See question/comment elsewhere. There should only be a single entry for each kind in the stats collection at each level.
RE: What can cause multiple entries for the same kind in the array? That sounds like a bug.

I think the roxie code uses addStatistic. I didn't delve too deeply into the code to investigate if it is possible for the same kind to exist multiple times. As there are no comments to suggest otherwise and CStatisticCollection allows multiple stats of the same kind to be added, I have had assume that same kind multiple times is supported.

I'm going to run some tests to see if addStatistic is ever used to create the same kind multiple times in CStatisticCollection - I'll post the result here tomorrows. @ghalliday

@shamser
Copy link
Contributor Author

shamser commented Nov 29, 2023

RE:See question/comment elsewhere. There should only be a single entry for each kind in the stats collection at each level. RE: What can cause multiple entries for the same kind in the array? That sounds like a bug.

I think the roxie code uses addStatistic. I didn't delve too deeply into the code to investigate if it is possible for the same kind to exist multiple times. As there are no comments to suggest otherwise and CStatisticCollection allows multiple stats of the same kind to be added, I have had assume that same kind multiple times is supported.

I'm going to run some tests to see if addStatistic is ever used to create the same kind multiple times in CStatisticCollection - I'll post the result here tomorrows. @ghalliday

It seems that Roxie creates multiple stats for the same kind. Please see regression results for this PR: #18090. In this PR, I added an assert to make sure the same kind wasn't add multiple times.
The assert was triggered multiple times. @ghalliday @jakesmith

@ghalliday
Copy link
Member

RE:See question/comment elsewhere. There should only be a single entry for each kind in the stats collection at each level. RE: What can cause multiple entries for the same kind in the array? That sounds like a bug.
I think the roxie code uses addStatistic. I didn't delve too deeply into the code to investigate if it is possible for the same kind to exist multiple times. As there are no comments to suggest otherwise and CStatisticCollection allows multiple stats of the same kind to be added, I have had assume that same kind multiple times is supported.
I'm going to run some tests to see if addStatistic is ever used to create the same kind multiple times in CStatisticCollection - I'll post the result here tomorrows. @ghalliday

It seems that Roxie creates multiple stats for the same kind. Please see regression results for this PR: #18090. In this PR, I added an assert to make sure the same kind wasn't add multiple times. The assert was triggered multiple times. @ghalliday @jakesmith

Thanks - that is very useful, and should be fixed as a separate issue.

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - please see comments

@@ -2659,83 +2659,67 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified)
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void StatisticsAggregator::loadExistingAggregates(IConstWorkUnit &workunit)

picky: IConstWorkUnit could be a const IConstWorkUnit

wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
};

statsCollection->refreshAggregates(mapping, f);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see: https://github.com/hpcc-systems/HPCC-Platform/actions/runs/6968880782/job/18963669852
Failing to compile in Windows.

The std::function vs 'std::function<void(const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)>' I think.

It would be better if the callback function type was a typedef defined in jstats.h, and then used here.

virtual stat_type aggregateStatistic(StatisticKind kind) const override
{
stat_type sum;
if (!getStatisticSum(kind, sum)) // get sum of statistics at this level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there should never be duplicates (and https://track.hpccsystems.com/browse/HPCC-30949 to fix place where could be), the new getStatisticSum() method should now be removed from this PR, and getStatistic() used here instead.

@shamser shamser force-pushed the issue29657new2 branch 3 times, most recently from 17dcaea to 73b7bcd Compare December 1, 2023 16:26
@shamser
Copy link
Contributor Author

shamser commented Dec 1, 2023

@jakesmith The changes related to the last comments have been merged to the last commit.

@shamser shamser requested a review from jakesmith December 1, 2023 16:30
Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't spot any problems.
My only concern is moving the cost from the global stats to the subgraph stats and whether that breaks any wudetails processing. I will look at that separately.

@shamser
Copy link
Contributor Author

shamser commented Dec 4, 2023

I didn't spot any problems. My only concern is moving the cost from the global stats to the subgraph stats and whether that breaks any wudetails processing. I will look at that separately.

The aggregator will create the same cost aggregates as before in the global stats.

@ghalliday
Copy link
Member

I didn't spot any problems. My only concern is moving the cost from the global stats to the subgraph stats and whether that breaks any wudetails processing. I will look at that separately.

The aggregator will create the same cost aggregates as before in the global stats.

But it will not create a cost for a subgraph in the global stats, which is the difference.

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - looks good, please squash.

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - looks good.

@ghalliday ghalliday changed the base branch from candidate-9.4.x to master December 7, 2023 16:44
@ghalliday ghalliday merged commit 2a959b9 into hpcc-systems:master Dec 7, 2023
45 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants