Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #17786

Closed
wants to merge 2 commits into from

Conversation

shamser
Copy link
Contributor

@shamser shamser commented Sep 18, 2023

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Cloud-compatibility
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Smoketest:

  • Send notifications about my Pull Request position in Smoketest queue.
  • Test my draft Pull Request.

Testing:

@github-actions
Copy link

@shamser shamser force-pushed the issue29657new branch 3 times, most recently from d34c72e to c2e13ef Compare September 20, 2023 12:17
@shamser
Copy link
Contributor Author

shamser commented Sep 22, 2023

Note: Work in progress hence draft PR

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser looks good. A few comments.

The main question in my mind is where should stats that are going to be aggregated be stored? This change has moved them from the global stats to the subgraph. I think that makes logical sense, but I don't know what implications that has for efficiency. (Logically the execution times should also move, but that would have a performance impact.)

The other question in my mind is what is the minimal set of changes which would allow this change to be merged?

The other thought is

@@ -1825,7 +1813,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
virtual StringBuffer & getFullScope(StringBuffer & str) const override
{
if (parent)
if (parent && queryScopeType()!=SSTworkflow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better test would be parent->queryscopeType() != SSTglobal. (Would work with compile stages as well.)

@@ -1988,9 +2000,58 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
cur.visit(visitor);
}

virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a CRuntimeStatisticCollection (instead of aggregateKinds, totals) simplify this code? The dual arrays seem to implement the same idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRuntimeStatisticCollection cannot store the variants. When the stats are deserialized, we'd have to ignore all the variants. We could ignore all the variants. Losing the variants would mean that we could not have a single in-memory object for the stats as we'd need one for aggregation (GlobalStatisticCollection ) and another for serialization(CStatisticGatherer). We would also not be able to extend the functionality to use GlobalStatisticCollection to cache runtime stats and allow periodic updates to dali.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not that a new CRuntimeStatisticCollection will need to be created for each scope and each one will have a StatisticMapping. With the current implementation, a single vector with the StatisticKinds is used as it traverses the entire StatisticsCollection. I think using CRuntimeStatisticCollection will be slower and use more memory.

StatsScopeId id;
id.deserialize(serialized, version);
IStatisticCollection * collection = ensureSubScope(id, true);
collection->deserialize(serialized, version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization for later. Indicate to deserialize that it does not need to deserialize beyond a subgraph. Would avoid all the activities being deserialized.
That option might be usable by other stats iterators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I'll avoid deserialize unnecessary scopes.

class GlobalStatisticCollection : public CStatisticCollection
{
public:
GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code belong in workunit.cpp, rather than jlib, because it is specific to the way stats are represented in workunits. It should be possible to implement using the public interface for the stats collection.

while (totalIter != totals.end())
{
StatsMergeAction mergeAction = queryMergeMode(*kindIter);
updateStatistic(*kindIter, *subTotalIter, mergeAction);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably replace rather than merge - since this is the new aggregated value.

@shamser
Copy link
Contributor Author

shamser commented Sep 25, 2023

I think that makes logical sense, but I don't know what implications that has for efficiency. (Logically the execution times should also move, but that would have a performance impact.)

The other question in my mind is what is the minimal set of changes which would allow this change to be merged?

The other thought is

I didn't move execution times because there was some special aggregation code that handled the case where workflow and hthor was on the same cpu. Also, it wasn't the case the aggregation of execution time at sub-graph level should be the same as the execution of the parent scopes. For example, there was also small gaps between subgraphs which would be lost if we simply aggregated the subgraphs.

I did move StSizeSpillFile, StSizeGraphSpill and StCostFileAccess which I thought wouldn't impact performance too much. Are there instances that you are aware of that would be impacted by these changes? @ghalliday

@shamser shamser requested a review from ghalliday October 3, 2023 09:12
@shamser
Copy link
Contributor Author

shamser commented Oct 3, 2023

@ghalliday I pushed 2 new commits:

  1. Relocate GlobalStatisticCollection to workunit.cpp
    Prepare dynamic stats tracking by GlobalStatisticCollection
    Remove unnecessary calls to updateAggregation
  2. Track active graph stats in GlobalStatisticCollection (meaning that the entire graph does not need to loaded from dali for stats aggregration)

With the second commit, I eliminated the loading of stats from Dali for aggregation in Thor. I'm making the similar changes in hThor.

@shamser shamser marked this pull request as ready for review October 4, 2023 15:13
@shamser shamser force-pushed the issue29657new branch 3 times, most recently from f6cac47 to 1347444 Compare October 9, 2023 14:03
cost_type costDiskAccess = graph->getDiskAccessCost();
if (costDiskAccess)
stats.addStatistic(StCostFileAccess, costDiskAccess);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really move to MasterGraph::getStats()

@shamser shamser marked this pull request as draft October 11, 2023 08:53
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved
common/workunit/workunit.cpp Show resolved Hide resolved
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
{
if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace))
updated=true;
(*totalIter) += *subTotalIter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as on line 2030 - how does this work for e.g. StatsMergeMax

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this has been done already.

std::vector<unsigned __int64>::iterator totalIter = totals.begin();
std::vector<unsigned __int64>::iterator subTotalIter = childTotals.begin();
std::vector<StatisticKind>::iterator kindIter = aggregateKinds.begin();
while (totalIter != totals.end())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think may need to conditionally look at totals - not all aggregateKinds may have have been found in the collection, therefore totals will not have been set, but will contain 0, and unconditionally setting 0 for all aggregate types may not be correct.

common/workunit/workunit.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved

void GlobalStatisticCollection::load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly)
{
if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably clearer if load is the route that always creates the collection, i.e. no need for ctor route and code looks like:

    if (!streq(_wuid, wuid)) // Make sure stats collection is for this workunit
    {
        StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
        statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
        wuid.set(_wuid);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also highlights that a Thor instance ping-ponging between multiple jobs will not reuse existing load stats (and have to reload all prior stats each time).

When it only loads dirty levels, it won't matter much. And/or we could keep several GlobalStatisticCollection objects around for the last N jobs, to avoid recreating them.

@jakesmith
Copy link
Member

@shamser - please see inlne comments, and comments below.

If I understand it correctly, the current implementation does:

  1. load stats from subgraphs only (i.e. does not load previously published aggregates that have been pushed to workunit at the moment).

  2. use statsMaxDepth to skip over intermediate levels until it reaches depth 3 (this constant isn't very clear, could it use a StatsScopeId instead?).
    The comment says ignore stats below subgraph, but afaics, it doesn't append until it gets to 3, then adds everything below?

  3. use missingScopesOnly to check if a subgraph has already been deserialized (already a child collection).

Goals:

  • we want to avoid recalculating aggregates if nothing has changed.
  • load existing stats if they will be needed to upgrade an incomplete parent aggregate
  • avoid loading stats and totals below a level, unless a parent is marked dirty.

As it stands, when a new thor instance picks up a running workunit, let's say to run graph100, I think it will:

  • load - loads all stats from all subgraphs from all graphs, in all workflow items.
  • updateAggregates - call will recalculate all aggregates for all levels, but, not publish them if they are equal to what is already published.
  • missingScopesOnly avoids re-fetching/re-deserializing the binary blobs for subgraphs if the current Thor instance has already done so, but doesn't help if the Thor instance hasn't seen this workunit before (or it had, but the last workunit was different).

As it stands, there would be too much of a performance hit on medium to big size jobs, unless you had just 1 Thor instance that worked on the whole job from start to finish.

To overcome that, as we've discussed and as you've alluded to in one of your comments, we need a mechanism which tracks what levels are dirty.
And, the implementation needs to load existing aggregates at startup too, and avoid loading nested stats. unless a level is dirty (i.e. the totals for it need recalculating because it's not yet done).
Published aggregates will change and need recalculating until the level is done, e.g. the aggregates in WF1 will need to be constantly recalculated until all the graphs under it are complete, i.e. when WF1 is complete.
Same for graphs, e.g. aggregates for graph59 will need recalculating until all its subgraphs are done. NB: in practice atm, there won't be a situation where the engines start processing a graph with some subgraphs done and some not - but that could change, e.g. with checkpointing. However, the logic is the same for dirty graphs/subgraphs as it is for workflow items/graphs.

I think this can be achieved by adding a dirty flag (not sure whether best under or ) to each level that is active, e.g. when WF2 starts, it is marked dirty, and the flag is only cleared when it completes.
As GlobalStatisticCollection::load walks the stats, it can stop descending to load lower levels if there is no dirty flag. e.g. when loading the stats for a job running WF2, WF1 totals are loaded, but since it has no dirty flag, none of the graph aggregates, or subgraph stats below WF1 need loading (or need processing later when it comes to updateAggregates).
NB: there's probably little point in trying to avoid any aggregates under from being loaded, and this will be particularly true if we move them to a single serialized blob when the whole blob will need writing back, but it can use this logic to avoid descending into graphs/subgraphs that won't be needed when it's walking .

updateAggregates needs to recalculate and push all aggregates for any dirty layer (the top/global level is implicitly dirty if the job is running anything).

In practice that means, that when it's e.g. running graph100 in wf5 and the job lands on a new Thor instance, global+wf5+graph100 should be the only things that are marked dirty.
load() will need to load all existing aggregates for wf1, wf2, wf3, wf4, but none of their child graphs stats, and because wf5 is dirty, load all aggregates for graph1-graph99, but none of their children because graph1-graph99 are not dirty.
And because graph100 is dirty, logically it should load all of the published subgraph aggregates, but, since a graph is re-entrant at the moment (can't run half of it then resume on another instance), it is always going to redo the complete graph. Which means that for now, we should consider all subgraph aggregates and stats as missing on load I think (for the purposes of calculating new aggregates) - they are of no use. That will change if we start supporting checkpointing again, so we should comment clearly why we don't bother loading existing subgraph aggregates and stats.

@shamser
Copy link
Contributor Author

shamser commented Oct 13, 2023

tes. As GlobalStatisticCollection::load walks the stats, it can stop descending to load lower levels if there is no dirty flag. e.g. when loading the stats for a job running WF2, WF1 totals are loaded, but since it has no dirty flag, none of the graph aggregates, or subgraph stats below WF1 need loading (or need processing later when it comes to updateAggregates). NB: there's probably little point in trying to avoid any aggregates under from being loaded, and this will be particularly true if we move them to a single serialized blob when the whole blob will need writing back, but it can use this logic to avoid descending into graphs/subgraphs that won't be needed when it's walking .

updateAggregates needs to recalculate and push all ag

I've modified the design since this comment was posted. I was actually testing the changes before I saw these code review comments. Although the design is different from what you've suggested, I feel that it achieves that efficiency goal. Please can you have a look at the PR again. @jakesmith

@shamser shamser force-pushed the issue29657new branch 4 times, most recently from 68305df to ceef1ae Compare October 13, 2023 16:17
@shamser shamser marked this pull request as ready for review October 13, 2023 16:17
@shamser shamser force-pushed the issue29657new branch 2 times, most recently from fa65242 to 3f71b3f Compare October 18, 2023 16:03
Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - looks good and close, there's a couple of leaks that needs fixing, some trivial things and a couple of questions, but I suspect those could be done in a subsequent PR.

Please address the leak/trivials for now.

system/jlib/jstats.cpp Outdated Show resolved Hide resolved
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
thorlcr/master/thdemonserver.cpp Outdated Show resolved Hide resolved
ecl/eclagent/eclgraph.cpp Outdated Show resolved Hide resolved
ecl/eclagent/eclgraph.cpp Outdated Show resolved Hide resolved
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
@@ -1134,7 +1136,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will these graph CostExecute stats. be aggregated at the workflow level, and global level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow level cost includes not only all the subgraph costs but also workflow engine cost. There is special code to handle this calculation: when calculating workflow level cost, it adds the additional workflow engine cost to the subgraph cost aggregates. Unfortunately, this also means that the graph aggregates cannot be calculated using the new method, as it would override workflow cost calculated with the special cost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have looked into making use of the global stats collection object to handle CostExecute. It may make the code cleaner. However, this is not a trivial change and, probably, isn't worth doing in this PR. (The workflow engine costs complicates things too much).

common/workunit/workunit.cpp Outdated Show resolved Hide resolved
{
StatisticKind kind = aggregateKindsMapping.getKind(i);
stat_type value;
if (cur.getStatistic(kind, value) && value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do sometimes want to set if 0, it depends, should be using includeStatisticIfZero(kind) I think.

if (refreshAggregates()) // Only serialize if the aggregates has changed
{
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to the includeStatisticIfZero comment, should this be doing something like : ?

Owned<IStatisticGatherer> globalStaatsGatherer = createGlobalStatisticGatherer(wu);
statsCollection->recordStatistics(globalStatsGatherer);

CRuntimeStatisticCollection::recordStatistics handles things like includeStatisticIfZero, merge mode and measure conversion.
createGlobalStatisticGatherer builds up a scope as it's descending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statsAggregatorWriter is filtering out the required stats and writing out only those stats. It filters out some global stats that have been loaded in global stats such as dfu stats, compile stats etc. Stats at the subgraph level should also be ignored (not written to global wu stats).

Also, IStatisticCollection/CStatisticCollection doesn't have a recordStatistics function.

@shamser shamser force-pushed the issue29657new branch 4 times, most recently from e786af8 to 1925dc9 Compare October 31, 2023 09:23
@shamser shamser requested a review from jakesmith October 31, 2023 10:09
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
common/workunit/workunit.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved
{
const StatisticsMapping & mapping = totals.queryMapping();
bool updated = false;
if (queryScopeType()==SSTsubgraph || isDirty==false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct?

  1. Unconditionally if subgraph (even if not marked dirty)
    ||
  2. if isDirty==false (so skip if markDirty() called for this subgraph)

I guess I'm not clear why the whole thing is not wrapped in a if (isDirty) instead?
e.g. shouldn't refreshAggregates be a noop if were to be called before getCollectionForUpdate used that calls markDirty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree: checking isDirty is sufficient. I had the subgraph test because the isDirty flag was implemented later and I didn't reassess the need for the subgraph test.

I modified the code so that if isDirty is false at the global level, the operation is a nop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore the previous comment. The unconditional sg test is required. The first block (if condition true) returns the totals at this level. The second block (if condition false) descends to a lower scope level to get totals from lower level.

It should return totals from the current level under 2 circumstances

  1. isDirty==false: the stats at this level and descendant levels haven't changed, so it is ok to use the aggregates from this level
  2. it is at subgraph scope: this condition is required because even if the dirty flag is true, aggregates should only be generated from subgraph level. As per the agreed requirements, the aggregates generated automatically should not be generated from activities->subgraph. (At a later date, if it is determined that aggregates should be generated from aggregates, this condition may be removed but it is beyond the scope of this jira for this change now.)

return new CStatisticCollection(parent, scopeId);
}

IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think childCollection would be clearer if called sgCollection (as it is in CRootStatisticCollection)
(and also in jstats.h)

system/jlib/jstats.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved
system/jlib/jstats.cpp Outdated Show resolved Hide resolved
Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - please see new comments.

Mostly cosmetic and 1 question re. isDirty handling: https://github.com/hpcc-systems/HPCC-Platform/pull/17786/files#r1377617622

@shamser shamser force-pushed the issue29657new branch 4 times, most recently from 0621714 to fb9a872 Compare October 31, 2023 16:37
@shamser shamser requested a review from jakesmith November 3, 2023 13:39
Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser Broadly the change looks good. There are two general comments:

  • I expected the collection code to stay as it is, and then have a function that merges the subgraph(/graph) stats into the aggregator as a separate call. Storing the stats in the global stats aggregator seems to be overloading the meaning of that class.
  • The public interface to the IStatisticsCollection class expanded by more than I expected. Some could be implemented as global functions, some indicate logic ins't quite in the right place.

Also a few bits of code where the logic does not look right/should be deleted.

  • non-memberMore information is leaked out of the jstats files that I would expect. is

: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge)
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId wfScopeId(SSTworkflow,wfid);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does this imply root scope has changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it hasn't. It was previously called "rootScopeId" but it was actually a workflow scope. Line 192 of the original code has "StatsScopeId rootScopeId(SSTworkflow,wfid)". I changed the name from rootScopeId to workflowScopeId as it was being confused with _rootScope.

common/workunit/workunit.cpp Show resolved Hide resolved
}

loadGlobalAggregates(workunit);
if (isEmptyString(graphName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost certainly should be if (!isEmptyString()). Suggests this code hasn't been walked through, or graphName is always empty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like graphName is always empty. Better to check and assert rather than include untested code in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was originally, if (*graphName) and that was tested. However, at the last minute I changed the code to use isEmptyString. I know I shouldn't have done that - I should have tested every changed, even seemingly minor non-functional change.

if (!compressed.length())
break;
MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self about inefficiency of lzw...


StatsScopeId childId;
childId.deserialize(serialized, version);
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be outside the main loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the code to load sg stats in global collection- this entire section removed.

else
{
StatsScopeId childScopeId;
if (!childScopeId.setScopeText(scope, &scope) || (*scope!=':' && *scope!='\0'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearer to use different variables for input and output. E.g. next.

@@ -1862,6 +1868,24 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
return false;
}
virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really need to be a member function - could be implemented using the existing public interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used from workunit.cpp by the aggregate loader. It could be done using the existing member functions but it would be cumbersome to add code to the aggregate loader to split the scopes and navigate down the tree.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it should be in a function. The comment is that it could be a global function - which would reduce the number of functions in the interface. Debatable style wise, but was connected to the large number of new functions in the interface.

}
else
{
deserializeNoStats(in, version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a shame. Can it be avoided for the common case i.e. where you are only reading the subgraph stats which are the root for hthor/thor graphs.

{
// descend down to lower level to obtain totals required for aggregates and then aggregate
CRuntimeStatisticCollection childTotals(mapping);
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic does not seem correct. total will only updated with any values that have changed - but in the not-dirty case all values are updated.

Copy link
Member

@jakesmith jakesmith Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I don't follow the logic (related to previous comment here: #17786 (comment)).

The true condition will be followed unconditionally for all subgraphs (regardless of whether isDirty = true or false), which looks like it means everything above every subgraph will also be updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it achieves the goal, but am still finding how it is doing it a bit confusing.
What I'd expect I think, is for each level to do:

  • if at level and dirty, do nothing
  • If at level and is subgraph, do nothing.
  • If dirty, recurse down each scope, and then merge the child values into new totals at current level.
  • When finished recursing down each child scope and merging into new totals, update what has changed into current level stats, and push totals (via callback if necessasry).

I had a stab at what that would look like, and I also think it should be done without adding the two quite implementation orientated 'refreshAggregates' methods to IStatisticCollection :

bool refreshAggregates(IStatisticCollection &stats, const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated)
{
    if (stats.queryScopeType() == SSTsubgraph) // don't descend any further
        return false;
    if (!stats.isDirty())
        return false;
    bool anyChanges = false;
    CRuntimeStatisticCollection totals(mapping);
    Owned<IBitSet> updated = createBitSet(mapping.numStatistics());
    Owned<IStatisticCollectionIterator> childIter = &stats.getScopes(nullptr, false);
    ForEach(*childIter)
    {
        IStatisticCollection &child = childIter->query();
        refreshAggregates(child, mapping, fWhenAggregateUpdated);

        // merge in child level stats into current level, and mark which aggregate kinds have updated
        unsigned numChildStats = child.getNumStatistics();
        for (unsigned s=0; s<numChildStats; s++)
        {
            StatisticKind kind;
            unsigned __int64 value;
            child.getStatistic(kind, value, s);
            if (kind != (StatisticKind)(kind & StKindMask))
                continue; // ignore variants->not supported by CRuntimeStatisticCollection
            unsigned index = mapping.getIndex(kind);
            if (index != mapping.numStatistics())
            {
                totals.mergeStatistic(kind, value);
                updated->set(index); // NB: may already have been set whilst merging in another child, but that's okay.
            }
        }
    }

    // any updates? Set new totals into current level, and call callback
    unsigned nextUpdateIndex = 0;
    while (true)
    {
        nextUpdateIndex = updated->scan(nextUpdateIndex, true);
        if (NotFound == nextUpdateIndex)
            break;
        
        StatisticKind kind = mapping.getKind(nextUpdateIndex);
        unsigned __int64 value = totals.queryStatisticByIndex(nextUpdateIndex).get();
        if (stats.updateStatistic(kind, value, StatsMergeReplace))
        {
            if (value || includeStatisticIfZero(kind))
            {
                StringBuffer s;
                fWhenAggregateUpdated(stats.getFullScope(s).str(), stats.queryScopeType(), kind, value);
                anyChanges = true;
            }
        }
    }
    stats.clearDirty();
    return anyChanges;
}

@@ -2183,7 +2191,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const
SuperHashIteratorOf<CStatisticCollection> iter(children, false);
for (iter.first(); iter.isValid(); iter.next())
iter.query().toXML(out);
out.append("</Scope>\n");
out.append("</Scope> // Scope id=\"");
id.getScopeText(out).append("\"\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except it isn't valid xml any more. The information is available in the opening tag why duplicate it here?

const StatisticsMapping & mapping = totals.queryMapping();
bool updated = false;
// if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend)
// Also if at sg scope, do not descend as aggregates does not need to be generated from below sg level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: "..as aggregates do not need.."

@@ -2183,7 +2191,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const
SuperHashIteratorOf<CStatisticCollection> iter(children, false);
for (iter.first(); iter.isValid(); iter.next())
iter.query().toXML(out);
out.append("</Scope>\n");
out.append("</Scope> // Scope id=\"");
id.getScopeText(out).append("\"\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolving this comment - missed fact that it is not a valid XML comment syntax, and Gavin's other Q re. duplicate info.

{
// descend down to lower level to obtain totals required for aggregates and then aggregate
CRuntimeStatisticCollection childTotals(mapping);
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics());
Copy link
Member

@jakesmith jakesmith Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I don't follow the logic (related to previous comment here: #17786 (comment)).

The true condition will be followed unconditionally for all subgraphs (regardless of whether isDirty = true or false), which looks like it means everything above every subgraph will also be updated.

@shamser
Copy link
Contributor Author

shamser commented Nov 9, 2023

@ghalliday @jakesmith I have addressed most of the review comments. However, I haven't addressed keeping the existing gatherer code and having a separate Aggregator object. I think the reason for the current design of using a gatherer to write director to the global stats object, is that I was

  1. trying to avoid copying values,
  2. provide the possibility of some improvements in the future (such as caching sg stats and writing them out in batches)
  3. possibly generate aggregates from sg activities, removing the need of having sg level aggregates scattered throughout the graph code.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser I think this is looking better. I am still not convinced by doubling up the structure for two purposes - but let's discuss before making any changes.

Linked<IStatisticCollection> statsCollection;
};

WuScopeFilter filter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it could be made more efficient by directly walking the stats list. If this works I wouldn't change it for this PR though - something to return to and simplify the code.

@@ -883,8 +912,11 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist
extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft);

extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope);
extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats);
extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * sgCollection=nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. A RootStatisticCollection is meant to have no parent. I think this is another result of mixing up gathering the statisics with calculating aggregates.

// Marking the collection dirty here is not ideal. It would be better to have a call to IStatisticCollection::setStatistic mark the scope as dirty.
// However, this would be inefficient as each call to IStatisticCollection::setStatistic would require the dirty flag to be set for all parent scopes.
sgScopeCollection->markDirty();
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. It is adding a "Root" scope with an id of wfid underneath a scope of workflow/graph/subgraph. So effectively in the collection it is global->wfid->graph->subgraph->wfid->.
It make me more convinced it is a mistake to try and merge these two concepts.
It also means roxie works rather differently from eclagent/thor.

}
}

void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType)
// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation)
void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Copying the stats is trivial. You are copying ~4 stats from one structure to another.
  2. That can be solved in a different way. Saving a queue of blobs to commit.

Let's arrange a walkthrough to discuss before heading off an making any changes though.

scopes.append(*tos.ensureSubScope(scopeId, true));
}
virtual void beginChannelScope(unsigned id) override
{
StatsScopeId scopeId(SSTchannel, id);
CStatisticCollection & tos = scopes.tos();
IStatisticCollection & tos = scopes.tos();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still lots of unrequired changes from CStatisticCollection to IStatisticCollection throught the file that complicate the compare.

if (value || includeStatisticIfZero(kind))
{
StringBuffer s;
(fWhenAggregateUpdated)(getFullScope(s).str(), queryScopeType(), kind, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial. I don't think the brackets are unneeded and confusing.

}
if (updated)
// 1) Set any values that has changed for this scope and 2) update ALL totals for parent
const unsigned numStats = mapping.numStatistics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial/style: unusual to make a scalar a const (e.g. not done elsewhere in this function).

StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
}
} aggregateUpdatedCallBackFunc(wu);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be simpler to use a std::function, e.g.:

    AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
    {
        wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
    };
    statsCollection->refreshAggregates(aggregateKindsMapping, f);

{
// descend down to lower level to obtain totals required for aggregates and then aggregate
CRuntimeStatisticCollection childTotals(mapping);
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it achieves the goal, but am still finding how it is doing it a bit confusing.
What I'd expect I think, is for each level to do:

  • if at level and dirty, do nothing
  • If at level and is subgraph, do nothing.
  • If dirty, recurse down each scope, and then merge the child values into new totals at current level.
  • When finished recursing down each child scope and merging into new totals, update what has changed into current level stats, and push totals (via callback if necessasry).

I had a stab at what that would look like, and I also think it should be done without adding the two quite implementation orientated 'refreshAggregates' methods to IStatisticCollection :

bool refreshAggregates(IStatisticCollection &stats, const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated)
{
    if (stats.queryScopeType() == SSTsubgraph) // don't descend any further
        return false;
    if (!stats.isDirty())
        return false;
    bool anyChanges = false;
    CRuntimeStatisticCollection totals(mapping);
    Owned<IBitSet> updated = createBitSet(mapping.numStatistics());
    Owned<IStatisticCollectionIterator> childIter = &stats.getScopes(nullptr, false);
    ForEach(*childIter)
    {
        IStatisticCollection &child = childIter->query();
        refreshAggregates(child, mapping, fWhenAggregateUpdated);

        // merge in child level stats into current level, and mark which aggregate kinds have updated
        unsigned numChildStats = child.getNumStatistics();
        for (unsigned s=0; s<numChildStats; s++)
        {
            StatisticKind kind;
            unsigned __int64 value;
            child.getStatistic(kind, value, s);
            if (kind != (StatisticKind)(kind & StKindMask))
                continue; // ignore variants->not supported by CRuntimeStatisticCollection
            unsigned index = mapping.getIndex(kind);
            if (index != mapping.numStatistics())
            {
                totals.mergeStatistic(kind, value);
                updated->set(index); // NB: may already have been set whilst merging in another child, but that's okay.
            }
        }
    }

    // any updates? Set new totals into current level, and call callback
    unsigned nextUpdateIndex = 0;
    while (true)
    {
        nextUpdateIndex = updated->scan(nextUpdateIndex, true);
        if (NotFound == nextUpdateIndex)
            break;
        
        StatisticKind kind = mapping.getKind(nextUpdateIndex);
        unsigned __int64 value = totals.queryStatisticByIndex(nextUpdateIndex).get();
        if (stats.updateStatistic(kind, value, StatsMergeReplace))
        {
            if (value || includeStatisticIfZero(kind))
            {
                StringBuffer s;
                fWhenAggregateUpdated(stats.getFullScope(s).str(), stats.queryScopeType(), kind, value);
                anyChanges = true;
            }
        }
    }
    stats.clearDirty();
    return anyChanges;
}

@shamser shamser closed this Nov 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants