diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index 2393b041301..6e58f086494 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -295,7 +295,6 @@ class ActivityTimer else startCycles = 0; } - ~ActivityTimer() { if (likely(enabled)) @@ -314,6 +313,8 @@ class SimpleActivityTimer { cycle_t startCycles; cycle_t &accumulator; + StringBuffer name; + unsigned activityId=0; protected: const bool enabled; public: @@ -325,7 +326,6 @@ class SimpleActivityTimer else startCycles = 0; } - inline ~SimpleActivityTimer() { if (likely(enabled)) diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 3cd75fdfb89..31fcf29da45 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -80,7 +80,10 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface unsigned numRows = 0; try { - funnel.activity.startInput(inputIndex); + { + LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities()); + funnel.activity.startInput(inputIndex); + } started = true; inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) @@ -88,6 +91,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface numRows = 0; for (;numRows < chunkSize; numRows++) { + LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities()); const void * row = inputStream->ungroupedNextRow(); if (!row) break; @@ -354,7 +358,6 @@ class FunnelSlaveActivity : public CSlaveActivity } virtual void start() override { - ActivityTimer s(slaveTimerStats, timeActivities); if (!grouped && parallel) { //NB starts inputs on each thread @@ -376,7 +379,11 @@ class FunnelSlaveActivity : public CSlaveActivity auto startInputNFunc = [&](unsigned i) { - try { startInput(i); } + try + { + LookAheadTimer s(slaveTimerStats, timeActivities); + startInput(i); + } catch (CATCHALL) { ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i); diff --git a/thorlcr/activities/join/thjoinslave.cpp b/thorlcr/activities/join/thjoinslave.cpp index f2262256ab7..5d59f92b3d6 100644 --- a/thorlcr/activities/join/thjoinslave.cpp +++ b/thorlcr/activities/join/thjoinslave.cpp @@ -666,6 +666,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS } virtual void start() override { + ActivityTimer s(slaveTimerStats, timeActivities); CThorNarySlaveActivity::start(); ForEachItemIn(i1, expandedInputs) @@ -682,6 +683,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS } CATCH_NEXTROW() { + ActivityTimer s(slaveTimerStats, timeActivities); OwnedConstThorRow ret = processor.nextRow(); if (ret) { @@ -692,6 +694,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS } virtual const void *nextRowGE(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra) { + ActivityTimer s(slaveTimerStats, timeActivities); try { return nextRowGENoCatch(seek, numFields, wasCompleteMatch, stepExtra); } CATCH_NEXTROWX_CATCH; } diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 69e0651c2bb..64f691ff5a6 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1530,6 +1530,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } virtual void start() override { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); joined = 0; joinCounter = 0; candidateCounter = 0; diff --git a/thorlcr/activities/loop/thloopslave.cpp b/thorlcr/activities/loop/thloopslave.cpp index 722358f0e4d..6550343de82 100644 --- a/thorlcr/activities/loop/thloopslave.cpp +++ b/thorlcr/activities/loop/thloopslave.cpp @@ -866,7 +866,6 @@ class CConditionalActivity : public CSlaveActivity } virtual void start() override { - ActivityTimer s(slaveTimerStats, timeActivities); stopUnselectedInputs(); if (queryInput(branch)) { diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index d1d8f2e2e8e..cb95b41654f 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -207,6 +207,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf } void prepareInput() { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); // NB: called by 1st output to start() CriticalBlock block(prepareInputLock); if (!inputPrepared) @@ -300,7 +301,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf inline const void *nextRow(unsigned outIdx, rowcount_t current) { if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading + { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); return inputStream->nextRow(); + } if (recsReady == current && writeAheadException.get()) throw LINK(writeAheadException); return sharedRowStream->queryOutput(outIdx)->nextRow(); // will block until available diff --git a/thorlcr/activities/project/thprojectslave.cpp b/thorlcr/activities/project/thprojectslave.cpp index 5c9ce6ee953..b455b16933b 100644 --- a/thorlcr/activities/project/thprojectslave.cpp +++ b/thorlcr/activities/project/thprojectslave.cpp @@ -162,6 +162,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity ~CPrefetcher() { stop(); } PrefetchInfo *pullRecord() { + LookAheadTimer t(parent.slaveTimerStats, parent.timeActivities); OwnedConstThorRow row = parent.inputStream->nextRow(); if (row) { diff --git a/thorlcr/activities/when/thwhenslave.cpp b/thorlcr/activities/when/thwhenslave.cpp index 679234da6bb..6f1a2540ecd 100644 --- a/thorlcr/activities/when/thwhenslave.cpp +++ b/thorlcr/activities/when/thwhenslave.cpp @@ -108,6 +108,12 @@ class CWhenSlaveActivity : public CDependencyExecutorSlaveActivity info.fastThrough = false; calcMetaInfoSize(info, queryInput(0)); } + // IThorDataLink + virtual void start() override + { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); + PARENT::start(); + } }; //////////////////// diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index d0c87d8e7f0..b05bdd56eb0 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -588,12 +588,15 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const } unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); if (processCycles < inputCycles) // not sure how/if possible, but guard against + { + ActPrintLog("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < inputCycles %" I64F "u", processCycles, inputCycles); return 0; + } processCycles -= inputCycles; const unsigned __int64 blockedCycles = queryBlockedCycles(); if (processCycles < blockedCycles) { - IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles); + ActPrintLog("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles); return 0; } return processCycles-blockedCycles; @@ -731,6 +734,7 @@ void CThorStrandedActivity::strandedStop() //For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time. void CThorStrandedActivity::start() { + SimpleActivityTimer s(startTime, timeActivities); CSlaveActivity::start(); startJunction(splitter); onStartStrands(); @@ -854,7 +858,7 @@ IStrandJunction *CThorStrandedActivity::getOutputStreams(CActivityBase &ctx, uns unsigned __int64 CThorStrandedActivity::queryTotalCycles() const { - unsigned __int64 total = 0;; + cycle_t total = startTime; ForEachItemIn(i, strands) { CThorStrandProcessor &strand = strands.item(i); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 685784ddc86..9eeb7139fd0 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -196,7 +196,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres { mutable MemoryBuffer *data; mutable CriticalSection crit; - protected: CThorInputArray inputs; IPointerArrayOf outputs; @@ -475,6 +474,7 @@ class graphslave_decl CThorStrandedActivity : public CSlaveActivity { typedef CSlaveActivity PARENT; protected: + cycle_t startTime = 0; CThorStrandOptions strandOptions; IArrayOf strands; Owned branch;