Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed May 31, 2024
2 parents ef5575b + 02a4d7f commit 10cee79
Show file tree
Hide file tree
Showing 51 changed files with 857 additions and 301 deletions.
2 changes: 1 addition & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
stats.setStatistic(kind, value);
}
virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
virtual void mergeStats(unsigned activityId, const CRuntimeStatisticCollection &from) const override
{
stats.merge(from);
}
Expand Down
22 changes: 21 additions & 1 deletion common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <unordered_set>

#include "jlib.hpp"
#include "jconfig.hpp"
#include "jcontainerized.hpp"
#include "workunit.hpp"
#include "jprop.hpp"
Expand Down Expand Up @@ -14313,17 +14314,36 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
CCycleTimer elapsedTimer;

bool multiJobLinger = config.getPropBool("@multiJobLinger", defaultThorMultiJobLinger);
bool thisThor = true;
const char *queue = config.queryProp("@queue");
const char *tgt = workunit.queryClusterName();
if (!isEmptyString(tgt)) // don't think should ever happen
{
if (!streq(tgt, queue))
{
Owned<IStringIterator> thorTarget = config::getContainerTargets("thor", tgt);
if (!thorTarget->first())
throw makeStringExceptionV(0, "Thor target not found: %s", tgt);
thisThor = false;
queue = tgt;
}
}

// NB: executeGraphOnLingeringThor looks for existing Thor instance that has been used for the same job,
// and communicates with it directly
if (!multiJobLinger && executeGraphOnLingeringThor(workunit, wfid, graphName))
{
if (!thisThor)
throw makeStringExceptionV(0, "multiJobLinger mode required to target other thor instances. Target: %s", tgt);
PROGLOG("Existing lingering Thor handled graph: %s", graphName);
}
else
{
// If no existing Thor instance, or for a multi linger configuration,
// queue the graph, either the thor agent will pick it up and launch a new Thor (up to maxGraphs),
// or an existing idle Thor listening on the same queue will pick it up.
VStringBuffer queueName("%s.thor", config.queryProp("@queue"));

VStringBuffer queueName("%s.thor", queue);
DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);

{
Expand Down
29 changes: 18 additions & 11 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class WorkunitAnalyserBase
public:
WorkunitAnalyserBase();

void analyse(IConstWorkUnit * wu);
void analyse(IConstWorkUnit * wu, const char * optGraph);
WuScope * getRootScope() { return LINK(root); }

protected:
Expand Down Expand Up @@ -1282,10 +1282,17 @@ WorkunitAnalyserBase::WorkunitAnalyserBase() : root(new WuScope("", nullptr))
{
}

void WorkunitAnalyserBase::analyse(IConstWorkUnit * wu)
void WorkunitAnalyserBase::analyse(IConstWorkUnit * wu, const char * optGraph)
{
WuScopeFilter filter;
filter.addOutputProperties(PTstatistics).addOutputProperties(PTattributes);
if (optGraph)
{
//Only include the specified graph, and include everything that matches below that graph
filter.addScopeType(SSTgraph);
filter.addId(optGraph);
filter.setIncludeNesting((unsigned)-1);
}
filter.finishedFilter();
collateWorkunitStats(wu, filter);
root->connectActivities();
Expand Down Expand Up @@ -2079,11 +2086,11 @@ void WorkunitStatsAnalyser::traceDependencies()

//---------------------------------------------------------------------------------------------------------------------

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, IPropertyTree *options, double costPerMs)
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(options, wu);
analyser.analyse(wu);
analyser.analyse(wu, optGraph);
analyser.applyRules();
analyser.update(wu, costPerMs);
}
Expand All @@ -2092,7 +2099,7 @@ void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costRate,
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(nullptr, wu);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);
analyser.applyRules();
analyser.print();
if (updatewu)
Expand All @@ -2114,7 +2121,7 @@ void analyseActivity(IConstWorkUnit * wu, IPropertyTree * cfg, const StringArray
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);
analyser.adjustTimestamps();
analyser.reportActivity(args);
}
Expand All @@ -2123,7 +2130,7 @@ void analyseDependencies(IConstWorkUnit * wu, IPropertyTree * cfg, const StringA
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);
analyser.adjustTimestamps();
analyser.calcDependencies();
analyser.spotCommonPath(args);
Expand All @@ -2137,7 +2144,7 @@ void analyseOutputDependencyGraph(IConstWorkUnit * wu, IPropertyTree * cfg)
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);
analyser.adjustTimestamps();
analyser.calcDependencies();
analyser.traceDependencies();
Expand All @@ -2147,7 +2154,7 @@ void analyseCriticalPath(IConstWorkUnit * wu, IPropertyTree * cfg, const StringA
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);
analyser.adjustTimestamps();
analyser.calcDependencies();
analyser.traceCriticalPaths(args);
Expand All @@ -2157,7 +2164,7 @@ void analyseHotspots(IConstWorkUnit * wu, IPropertyTree * cfg, const StringArray
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);

const char * rootScope = nullptr;
if (args.ordinality())
Expand All @@ -2177,7 +2184,7 @@ void analyseHotspots(WuHotspotResults & results, IConstWorkUnit * wu, IPropertyT
{
WorkunitStatsAnalyser analyser;
analyser.applyOptions(cfg);
analyser.analyse(wu);
analyser.analyse(wu, nullptr);

analyser.findHotspots(cfg->queryProp("@rootScope"), results.totalTime, results.hotspots);
results.root.setown(analyser.getRootScope());
Expand Down
2 changes: 1 addition & 1 deletion common/wuanalysis/anawu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

#include "anacommon.hpp"

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, IPropertyTree *options, double costPerMs);
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs);
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerMs, bool updatewu);

//---------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,9 @@ void FileSprayer::analyseFileHeaders(bool setcurheadersize)
unsigned numEmptyXml = 0;
ForEachItemIn(idx, sources)
{
if (isAborting())
throwError(DFTERR_CopyAborted);

FilePartInfo & cur = sources.item(idx);
StringBuffer s;
cur.filename.getPath(s);
Expand Down Expand Up @@ -1748,8 +1751,7 @@ void FileSprayer::analyseFileHeaders(bool setcurheadersize)
// Despray from distributed file

// Check XMLheader/footer in file level
DistributedFilePropertyLock lock(distributedSource);
IPropertyTree &curProps = lock.queryAttributes();
IPropertyTree &curProps = distributedSource->queryAttributes();
if (curProps.hasProp(FPheaderLength) && curProps.hasProp(FPfooterLength))
{
cur.xmlHeaderLength = curProps.getPropInt(FPheaderLength, 0);
Expand Down
40 changes: 34 additions & 6 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,7 @@ char *EclAgent::getEnv(const char *name, const char *defaultValue) const

void EclAgent::selectCluster(const char *newCluster)
{
const char *oldCluster = queryWorkUnit()->queryClusterName();
StringAttr oldCluster = queryWorkUnit()->queryClusterName();
if (getClusterType(clusterType)==HThorCluster)
{
// If the current cluster is an hthor cluster, it's an error to change it...
Expand Down Expand Up @@ -1857,6 +1857,27 @@ void EclAgent::setRetcode(int code)
retcode = code;
}


void EclAgent::runWorkunitAnalyser(IWorkUnit * w, const char * optGraph)
{
if (w->getDebugValueBool("analyzeWorkunit", agentTopology->getPropBool("@analyzeWorkunit", true)))
{
double costPerMs = calculateThorCost(1, getNodes());
IPropertyTree *analyzerOptions = agentTopology->queryPropTree("analyzerOptions");
analyseWorkunit(w, optGraph, analyzerOptions, costPerMs);
}
}

static constexpr bool defaultAnalyzeWhenComplete = true;
void EclAgent::runWorkunitAnalyserAfterGraph(const char * graph)
{
if (!wuRead->getDebugValueBool("analyzeWhenComplete", agentTopology->getPropBool("@analyzeWhenComplete", defaultAnalyzeWhenComplete)))
{
Owned<IWorkUnit> wu(updateWorkUnit());
runWorkunitAnalyser(wu, graph);
}
}

void EclAgent::doProcess()
{
#ifdef _DEBUG
Expand Down Expand Up @@ -2023,15 +2044,22 @@ void EclAgent::doProcess()
break;
}

if (w->getState() == WUStateCompleted && getClusterType(clusterType)==ThorLCRCluster)

if (getClusterType(clusterType)==ThorLCRCluster)
{
if (w->getDebugValueBool("analyzeWorkunit", agentTopology->getPropBool("@analyzeWorkunit", true)))
if (w->getDebugValueBool("analyzeWhenComplete", agentTopology->getPropBool("@analyzeWhenComplete", defaultAnalyzeWhenComplete)))
{
double costPerMs = calculateThorCost(1, getNodes());
IPropertyTree *analyzerOptions = agentTopology->queryPropTree("analyzerOptions");
analyseWorkunit(w.get(), analyzerOptions, costPerMs);
switch (w->getState())
{
case WUStateFailed:
case WUStateAborted:
case WUStateCompleted:
runWorkunitAnalyser(w, nullptr);
break;
}
}
}

if(w->queryEventScheduledCount() > 0)
switch(w->getState())
{
Expand Down
2 changes: 2 additions & 0 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ private:
EclAgentQueryLibrary * loadEclLibrary(const char * libraryName, unsigned expectedInterfaceHash, const char * embeddedGraphName);
virtual bool getWorkunitResultFilename(StringBuffer & diskFilename, const char * wuid, const char * name, int seq);
virtual IDebuggableContext *queryDebugContext() const { return debugContext; };
void runWorkunitAnalyser(IWorkUnit * w, const char * optGraph);
void runWorkunitAnalyserAfterGraph(const char * optGraph);

//protected by critical section
EclGraph * addGraph(const char * graphName);
Expand Down
11 changes: 10 additions & 1 deletion ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,16 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
{
if (isStandAloneExe)
throw MakeStringException(0, "Cannot execute Thor Graph in standalone mode");
executeThorGraph(graphName, *wuRead, *agentTopology);
try
{
executeThorGraph(graphName, *wuRead, *agentTopology);
runWorkunitAnalyserAfterGraph(graphName);
}
catch (...)
{
runWorkunitAnalyserAfterGraph(graphName);
throw;
}
}
else
{
Expand Down
44 changes: 44 additions & 0 deletions ecl/regress/issue31961.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */



strRec := { string x; };

myRec := RECORD
STRING a;
STRING b;
STRING c;
STRING d;
DATASET(strRec) e;
END;


makeDs(unsigned start) := DATASET(100, TRANSFORM(strRec, SELF.x := (STRING)(start + counter)));

myRec t(unsigned cnt) := TRANSFORM
SELF.a := (STRING)cnt;
SELF.b := (STRING)HASH32(cnt);
SELF.c := (STRING)HASH64(cnt);
SELF.d := (STRING)HASH64(cnt+1);
SELF.e := makeDs(cnt);
END;

i := DATASET(1000000, t(COUNTER));
s := SORT(i, a, HINT(sortCompBlkSz(1000)), LOCAL);
c := COUNT(NOFOLD(s));
output(c);
Loading

0 comments on commit 10cee79

Please sign in to comment.