Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30590 Report warning on any activity that has a large skew in execution worktime #18121

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/wuanalysis/anaerrorcodes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ typedef enum
ANA_DISTRIB_SKEW_INPUT_ID,
ANA_DISTRIB_SKEW_OUTPUT_ID,
ANA_IOSKEW_RECORDS_ID,
ANA_UNUSED_ID, /* May re-use but don't remove to avoid changing later id's */
ANA_EXECUTE_SKEW_ID,
ANA_KJ_EXCESS_PREFILTER_ID
} AnalyzerErrorCode;

Expand Down
52 changes: 52 additions & 0 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,57 @@ class IoSkewRule : public AActivityRule
const char * category;
};

class LocalExecuteSkewRule : public AActivityRule
{
public:
virtual bool isCandidate(IWuActivity & activity) const override
{
switch (activity.getAttr(WaKind))
{
case TAKfirstn: // skew is expected, so ignore
case TAKtopn:
case TAKsort:
return false;
}
return true;
}

virtual bool check(PerformanceIssue & result, IWuActivity & activity, const IAnalyserOptions & options) override
{
stat_type localExecuteMaxSkew = activity.getStatRaw(StTimeLocalExecute, StSkewMax);
if (localExecuteMaxSkew<options.queryOption(watOptSkewThreshold))
return false;

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: double ;

if (timePenalty<options.queryOption(watOptMinInterestingTime))
return false;

bool inputSkewed = false;
for(unsigned edgeNo = 0; IWuEdge *wuInputEdge = activity.queryInput(edgeNo); edgeNo++)
{
if (wuInputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
{
inputSkewed = true;
break;
}
}
bool outputSkewed = false;
IWuEdge *wuOutputEdge = activity.queryOutput(0);
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
return true;
}
};

class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
{
public:
Expand Down Expand Up @@ -221,4 +272,5 @@ void gatherRules(CIArrayOf<AActivityRule> & rules)
rules.append(*new IoSkewRule(StTimeDiskWriteIO, "disk write"));
rules.append(*new IoSkewRule(StTimeSpillElapsed, "spill"));
rules.append(*new KeyedJoinExcessRejectedRowsRule);
rules.append(*new LocalExecuteSkewRule);
}
Loading